Skip to content

Commit

Permalink
Add a queue in DB
Browse files Browse the repository at this point in the history
Add a queue in DB to prevent writing the same transaction many times when there is mutiple write request on the same file
  • Loading branch information
Neylix committed Jul 13, 2022
1 parent 433f53c commit 59c4dd3
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 3 deletions.
2 changes: 1 addition & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Config

config :logger, level: System.get_env("ARCHETHIC_LOGGER_LEVEL", "debug") |> String.to_atom()
config :logger, level: System.get_env("ARCHETHIC_LOGGER_LEVEL", "error") |> String.to_atom()

config :archethic,
:mut_dir,
Expand Down
23 changes: 22 additions & 1 deletion lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Archethic.DB.EmbeddedImpl do
alias __MODULE__.ChainWriter
alias __MODULE__.P2PView
alias __MODULE__.StatsInfo
alias __MODULE__.Queue

alias Archethic.TransactionChain.Transaction

Expand Down Expand Up @@ -51,6 +52,10 @@ defmodule Archethic.DB.EmbeddedImpl do
first_tx = List.first(sorted_chain)
genesis_address = Transaction.previous_address(first_tx)

Queue.push(genesis_address, fn -> do_write_transaction_chain(genesis_address, chain) end)
end

defp do_write_transaction_chain(genesis_address, sorted_chain) do
Enum.each(sorted_chain, fn tx ->
unless ChainIndex.transaction_exists?(tx.address, db_path()) do
ChainWriter.append_transaction(genesis_address, tx)
Expand All @@ -70,20 +75,36 @@ defmodule Archethic.DB.EmbeddedImpl do

case ChainIndex.get_tx_entry(previous_address, db_path()) do
{:ok, %{genesis_address: genesis_address}} ->
ChainWriter.append_transaction(genesis_address, tx)
Queue.push(genesis_address, fn -> do_write_transaction(genesis_address, tx) end)

{:error, :not_exists} ->
ChainWriter.append_transaction(previous_address, tx)
end
end
end

defp do_write_transaction(genesis_address, tx) do
if ChainIndex.transaction_exists?(tx.address, db_path()) do
{:error, :transaction_already_exists}
else
ChainWriter.append_transaction(genesis_address, tx)
end
end

@doc """
Write a transaction if a specific genesis address
"""
@spec write_transaction_at(Transaction.t(), binary()) :: :ok
def write_transaction_at(tx = %Transaction{}, genesis_address)
when is_binary(genesis_address) do
if ChainIndex.transaction_exists?(tx.address, db_path()) do
{:error, :transaction_already_exists}
else
Queue.push(genesis_address, fn -> do_write_transaction_at(genesis_address, tx) end)
end
end

defp do_write_transaction_at(genesis_address, tx) do
if ChainIndex.transaction_exists?(tx.address, db_path()) do
{:error, :transaction_already_exists}
else
Expand Down
54 changes: 54 additions & 0 deletions lib/archethic/db/embedded_impl/queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Archethic.DB.EmbeddedImpl.Queue do
@moduledoc false

use GenServer

def start_link(args \\ [], opts \\ [name: __MODULE__]) do
GenServer.start_link(__MODULE__, args, opts)
end

def init(_args) do
{:ok, Map.new()}
end

@doc """
Add a process in the queue for a specific genesis_address
"""
@spec push(binary(), fun()) :: :ok
def push(genesis_address, func) do
GenServer.call(__MODULE__, {:push, genesis_address, func})
end

def handle_call({:push, genesis_address, func}, from, state) do
new_state =
Map.update(
state,
genesis_address,
:queue.in({from, func}, :queue.new()),
&:queue.in({from, func}, &1)
)

{:noreply, new_state, {:continue, genesis_address}}
end

def handle_continue(genesis_address, state) do
case Map.get(state, genesis_address) do
nil ->
{:noreply, state}

queue ->
new_state =
if :queue.is_empty(queue) do
Map.delete(state, genesis_address)
else
Map.update!(state, genesis_address, fn _ ->
{{:value, {from, func}}, queue} = :queue.out(queue)
GenServer.reply(from, func.())
queue
end)
end

{:noreply, new_state, {:continue, genesis_address}}
end
end
end
4 changes: 3 additions & 1 deletion lib/archethic/db/embedded_impl/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Archethic.DB.EmbeddedImpl.Supervisor do
alias Archethic.DB.EmbeddedImpl.ChainIndex
alias Archethic.DB.EmbeddedImpl.P2PView
alias Archethic.DB.EmbeddedImpl.StatsInfo
alias Archethic.DB.EmbeddedImpl.Queue

require Logger

Expand All @@ -25,7 +26,8 @@ defmodule Archethic.DB.EmbeddedImpl.Supervisor do
{ChainWriter, path: path},
{BootstrapInfo, path: path},
{P2PView, path: path},
{StatsInfo, path: path}
{StatsInfo, path: path},
Queue
]

Supervisor.init(children, strategy: :rest_for_one)
Expand Down

0 comments on commit 59c4dd3

Please sign in to comment.