From 59c4dd3562bf1e71aa5fe7bc445e553a8db83f89 Mon Sep 17 00:00:00 2001 From: neylix Date: Thu, 14 Jul 2022 00:55:07 +0200 Subject: [PATCH] Add a queue in DB Add a queue in DB to prevent writing the same transaction many times when there is mutiple write request on the same file --- config/dev.exs | 2 +- lib/archethic/db/embedded_impl.ex | 23 ++++++++- lib/archethic/db/embedded_impl/queue.ex | 54 ++++++++++++++++++++ lib/archethic/db/embedded_impl/supervisor.ex | 4 +- 4 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 lib/archethic/db/embedded_impl/queue.ex diff --git a/config/dev.exs b/config/dev.exs index 4f2333e18c..7448bf76b3 100755 --- a/config/dev.exs +++ b/config/dev.exs @@ -1,6 +1,6 @@ import Config -config :logger, level: System.get_env("ARCHETHIC_LOGGER_LEVEL", "debug") |> String.to_atom() +config :logger, level: System.get_env("ARCHETHIC_LOGGER_LEVEL", "error") |> String.to_atom() config :archethic, :mut_dir, diff --git a/lib/archethic/db/embedded_impl.ex b/lib/archethic/db/embedded_impl.ex index 626311a80e..0927fcc61d 100644 --- a/lib/archethic/db/embedded_impl.ex +++ b/lib/archethic/db/embedded_impl.ex @@ -12,6 +12,7 @@ defmodule Archethic.DB.EmbeddedImpl do alias __MODULE__.ChainWriter alias __MODULE__.P2PView alias __MODULE__.StatsInfo + alias __MODULE__.Queue alias Archethic.TransactionChain.Transaction @@ -51,6 +52,10 @@ defmodule Archethic.DB.EmbeddedImpl do first_tx = List.first(sorted_chain) genesis_address = Transaction.previous_address(first_tx) + Queue.push(genesis_address, fn -> do_write_transaction_chain(genesis_address, chain) end) + end + + defp do_write_transaction_chain(genesis_address, sorted_chain) do Enum.each(sorted_chain, fn tx -> unless ChainIndex.transaction_exists?(tx.address, db_path()) do ChainWriter.append_transaction(genesis_address, tx) @@ -70,7 +75,7 @@ defmodule Archethic.DB.EmbeddedImpl do case ChainIndex.get_tx_entry(previous_address, db_path()) do {:ok, %{genesis_address: genesis_address}} -> - ChainWriter.append_transaction(genesis_address, tx) + Queue.push(genesis_address, fn -> do_write_transaction(genesis_address, tx) end) {:error, :not_exists} -> ChainWriter.append_transaction(previous_address, tx) @@ -78,12 +83,28 @@ defmodule Archethic.DB.EmbeddedImpl do end end + defp do_write_transaction(genesis_address, tx) do + if ChainIndex.transaction_exists?(tx.address, db_path()) do + {:error, :transaction_already_exists} + else + ChainWriter.append_transaction(genesis_address, tx) + end + end + @doc """ Write a transaction if a specific genesis address """ @spec write_transaction_at(Transaction.t(), binary()) :: :ok def write_transaction_at(tx = %Transaction{}, genesis_address) when is_binary(genesis_address) do + if ChainIndex.transaction_exists?(tx.address, db_path()) do + {:error, :transaction_already_exists} + else + Queue.push(genesis_address, fn -> do_write_transaction_at(genesis_address, tx) end) + end + end + + defp do_write_transaction_at(genesis_address, tx) do if ChainIndex.transaction_exists?(tx.address, db_path()) do {:error, :transaction_already_exists} else diff --git a/lib/archethic/db/embedded_impl/queue.ex b/lib/archethic/db/embedded_impl/queue.ex new file mode 100644 index 0000000000..e4721b8dbd --- /dev/null +++ b/lib/archethic/db/embedded_impl/queue.ex @@ -0,0 +1,54 @@ +defmodule Archethic.DB.EmbeddedImpl.Queue do + @moduledoc false + + use GenServer + + def start_link(args \\ [], opts \\ [name: __MODULE__]) do + GenServer.start_link(__MODULE__, args, opts) + end + + def init(_args) do + {:ok, Map.new()} + end + + @doc """ + Add a process in the queue for a specific genesis_address + """ + @spec push(binary(), fun()) :: :ok + def push(genesis_address, func) do + GenServer.call(__MODULE__, {:push, genesis_address, func}) + end + + def handle_call({:push, genesis_address, func}, from, state) do + new_state = + Map.update( + state, + genesis_address, + :queue.in({from, func}, :queue.new()), + &:queue.in({from, func}, &1) + ) + + {:noreply, new_state, {:continue, genesis_address}} + end + + def handle_continue(genesis_address, state) do + case Map.get(state, genesis_address) do + nil -> + {:noreply, state} + + queue -> + new_state = + if :queue.is_empty(queue) do + Map.delete(state, genesis_address) + else + Map.update!(state, genesis_address, fn _ -> + {{:value, {from, func}}, queue} = :queue.out(queue) + GenServer.reply(from, func.()) + queue + end) + end + + {:noreply, new_state, {:continue, genesis_address}} + end + end +end diff --git a/lib/archethic/db/embedded_impl/supervisor.ex b/lib/archethic/db/embedded_impl/supervisor.ex index 8838b4beac..67b4afd5f5 100644 --- a/lib/archethic/db/embedded_impl/supervisor.ex +++ b/lib/archethic/db/embedded_impl/supervisor.ex @@ -8,6 +8,7 @@ defmodule Archethic.DB.EmbeddedImpl.Supervisor do alias Archethic.DB.EmbeddedImpl.ChainIndex alias Archethic.DB.EmbeddedImpl.P2PView alias Archethic.DB.EmbeddedImpl.StatsInfo + alias Archethic.DB.EmbeddedImpl.Queue require Logger @@ -25,7 +26,8 @@ defmodule Archethic.DB.EmbeddedImpl.Supervisor do {ChainWriter, path: path}, {BootstrapInfo, path: path}, {P2PView, path: path}, - {StatsInfo, path: path} + {StatsInfo, path: path}, + Queue ] Supervisor.init(children, strategy: :rest_for_one)