From 8afcb9f5711c3a7856655b017afbb4e69f66251c Mon Sep 17 00:00:00 2001 From: Rogerio Pontual <44991200+jyeshe@users.noreply.github.com> Date: Thu, 18 Aug 2022 11:18:54 +0100 Subject: [PATCH] fix: rerun failed task and fix processing state (#848) * fix: rerun failed task and fix processing state * fix: demand only when doesn't rerun --- lib/ae_mdw/sync/async_tasks/consumer.ex | 18 +++-- lib/ae_mdw/sync/async_tasks/producer.ex | 12 +++- lib/ae_mdw/sync/async_tasks/store.ex | 8 ++- .../db/sync/async_tasks/consumer_test.exs | 68 +++++++++++++++++++ .../async_tasks/producer_consumer_test.exs | 14 +++- test/ae_mdw/sync/async_tasks/store_test.exs | 1 + test/support/ae_mdw/async_task_test_util.ex | 17 +++++ 7 files changed, 129 insertions(+), 9 deletions(-) create mode 100644 test/ae_mdw/db/sync/async_tasks/consumer_test.exs diff --git a/lib/ae_mdw/sync/async_tasks/consumer.ex b/lib/ae_mdw/sync/async_tasks/consumer.ex index c72566104..3dc03d787 100644 --- a/lib/ae_mdw/sync/async_tasks/consumer.ex +++ b/lib/ae_mdw/sync/async_tasks/consumer.ex @@ -62,12 +62,20 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do end @doc """ - Just acknowledge (ignore) the DOWN event. + Rerun failed task or put it back to queue. """ - def handle_info({:DOWN, _ref, :process, _pid, _reason}, %State{} = state) do - schedule_demand() - - {:noreply, state} + def handle_info( + {:DOWN, ref, :process, _pid, _reason}, + %State{task: task, m_task: m_task} = state + ) do + if task != nil and task.ref == ref do + new_task = run_supervised(m_task) + {:noreply, %State{task: new_task, m_task: m_task}} + else + schedule_demand() + Producer.notify_error(m_task) + {:noreply, state} + end end # diff --git a/lib/ae_mdw/sync/async_tasks/producer.ex b/lib/ae_mdw/sync/async_tasks/producer.ex index d15644684..f9c1df598 100644 --- a/lib/ae_mdw/sync/async_tasks/producer.ex +++ b/lib/ae_mdw/sync/async_tasks/producer.ex @@ -19,7 +19,7 @@ defmodule AeMdw.Sync.AsyncTasks.Producer do @impl GenServer def init(:ok) do Store.reset() - {:ok, %{dequeue_buffer: []}} + {:ok, :no_state} end @spec enqueue(atom(), list(), list(), only_new: boolean()) :: :ok @@ -54,4 +54,14 @@ defmodule AeMdw.Sync.AsyncTasks.Producer do :ok end + + @spec notify_error(Model.async_task_index()) :: :ok + def notify_error(task_index) do + Store.set_unprocessed(task_index) + + Store.count_unprocessed() + |> Stats.update_buffer_len() + + :ok + end end diff --git a/lib/ae_mdw/sync/async_tasks/store.ex b/lib/ae_mdw/sync/async_tasks/store.ex index c50aa3010..7f0edd9e6 100644 --- a/lib/ae_mdw/sync/async_tasks/store.ex +++ b/lib/ae_mdw/sync/async_tasks/store.ex @@ -99,10 +99,16 @@ defmodule AeMdw.Sync.AsyncTasks.Store do end end + @spec set_unprocessed(Model.async_task_index()) :: :ok + def set_unprocessed(task_index) do + :ets.delete(@processing_tab, task_index) + :ok + end + @spec set_done(Model.async_task_index(), Model.async_task_args()) :: :ok def set_done({_ts, task_type} = task_index, args) do Database.dirty_delete(Model.AsyncTask, task_index) - :ets.delete_object(@processing_tab, task_index) + :ets.delete(@processing_tab, task_index) :ets.delete(@pending_tab, {task_type, args}) :ok diff --git a/test/ae_mdw/db/sync/async_tasks/consumer_test.exs b/test/ae_mdw/db/sync/async_tasks/consumer_test.exs new file mode 100644 index 000000000..18e43967d --- /dev/null +++ b/test/ae_mdw/db/sync/async_tasks/consumer_test.exs @@ -0,0 +1,68 @@ +defmodule AeMdw.Sync.AsyncTasks.ConsumerTest do + use ExUnit.Case + + alias AeMdw.AsyncTaskTestUtil + alias AeMdw.Db.Aex9BalancesCache + alias AeMdw.Db.Model + alias AeMdw.Sync.AsyncTasks + + import Mock + + require Model + + test "enqueue and dequeue with failed task" do + contract_pk = :crypto.strong_rand_bytes(32) + {kbi, mbi} = block_index = {543_210, 10} + kb_hash = :crypto.strong_rand_bytes(32) + next_mb_hash = :crypto.strong_rand_bytes(32) + + Aex9BalancesCache.put(contract_pk, block_index, next_mb_hash, %{ + {:address, :crypto.strong_rand_bytes(32)} => <<>> + }) + + with_mocks [ + {AeMdw.Node.Db, [], + [ + get_key_block_hash: fn height -> + assert height == kbi + 1 + Process.sleep(1_000) + kb_hash + end, + get_next_hash: fn ^kb_hash, ^mbi -> next_mb_hash end + ]} + ] do + AsyncTasks.Supervisor.start_link([]) + + AsyncTasks.Producer.enqueue( + :update_aex9_state, + [contract_pk], + [ + block_index, + Enum.random(1_000_000..99_000_000) + ], + only_new: true + ) + + consumer_pid = AsyncTaskTestUtil.wakeup_consumer() + + task_pid1 = + Enum.reduce_while(1..100, nil, fn _i, _acc -> + Process.sleep(10) + + case :sys.get_state(consumer_pid) do + %{task: %Task{pid: task_pid}} -> {:halt, task_pid} + _no_task -> {:cont, nil} + end + end) + + Process.exit(task_pid1, :kill) + + assert Enum.any?(1..100, fn _i -> + case :sys.get_state(consumer_pid) do + %{task: %Task{pid: task_pid2}} -> task_pid2 != task_pid1 + _no_task -> false + end + end) + end + end +end diff --git a/test/ae_mdw/db/sync/async_tasks/producer_consumer_test.exs b/test/ae_mdw/db/sync/async_tasks/producer_consumer_test.exs index 7a34c4b82..c45fd1666 100644 --- a/test/ae_mdw/db/sync/async_tasks/producer_consumer_test.exs +++ b/test/ae_mdw/db/sync/async_tasks/producer_consumer_test.exs @@ -3,10 +3,13 @@ defmodule AeMdw.Sync.AsyncTasks.ProducerConsumerTest do alias AeMdw.AsyncTaskTestUtil alias AeMdw.Db.Aex9BalancesCache + alias AeMdw.Db.Model alias AeMdw.Sync.AsyncTasks import Mock + require Model + test "enqueue and dequeue" do contract_pk = :crypto.strong_rand_bytes(32) {kbi, mbi} = block_index = {543_210, 10} @@ -29,12 +32,14 @@ defmodule AeMdw.Sync.AsyncTasks.ProducerConsumerTest do ] do AsyncTasks.Supervisor.start_link([]) + call_txi = Enum.random(1_000_000..99_000_000) + AsyncTasks.Producer.enqueue( :update_aex9_state, [contract_pk], [ block_index, - Enum.random(1_000_000..99_000_000) + call_txi ], only_new: true ) @@ -43,7 +48,12 @@ defmodule AeMdw.Sync.AsyncTasks.ProducerConsumerTest do assert Enum.any?(1..20, fn _i -> Process.sleep(50) - %{dequeue_buffer: []} == :sys.get_state(AsyncTasks.Producer) + + nil == + AsyncTaskTestUtil.list_pending() + |> Enum.find(fn Model.async_task(args: args, extra_args: extra_args) -> + args == [contract_pk] and extra_args == [block_index, call_txi] + end) end) end end diff --git a/test/ae_mdw/sync/async_tasks/store_test.exs b/test/ae_mdw/sync/async_tasks/store_test.exs index d27bbbc93..d227995c9 100644 --- a/test/ae_mdw/sync/async_tasks/store_test.exs +++ b/test/ae_mdw/sync/async_tasks/store_test.exs @@ -182,6 +182,7 @@ defmodule AeMdw.Sync.AsyncTasks.StoreTest do refute Enum.find(tasks, &(&1 == m_task1)) refute Database.exists?(Model.AsyncTask, task_index1) + refute :ets.member(:async_tasks_processing, task_index1) assert Enum.find(tasks, &(&1 == m_task2)) end end diff --git a/test/support/ae_mdw/async_task_test_util.ex b/test/support/ae_mdw/async_task_test_util.ex index 9782e7653..042920fc6 100644 --- a/test/support/ae_mdw/async_task_test_util.ex +++ b/test/support/ae_mdw/async_task_test_util.ex @@ -23,6 +23,23 @@ defmodule AeMdw.AsyncTaskTestUtil do end) end + @spec wakeup_consumer() :: pid() + def wakeup_consumer do + AsyncTasks.Supervisor.start_link([]) + Process.sleep(100) + + {_id, consumer_pid, _type, _mod} = + AsyncTasks.Supervisor + |> Supervisor.which_children() + |> Enum.find(fn {id, _pid, _type, _mod} -> + is_binary(id) and String.starts_with?(id, "Elixir.AeMdw.Sync.AsyncTasks.Consumer") + end) + + Process.send(consumer_pid, :demand, [:noconnect]) + + consumer_pid + end + @spec list_pending() :: [Model.async_task_record()] def list_pending do :async_tasks_pending