Skip to content

Commit

Permalink
fix: rerun failed task and fix processing state (#848)
Browse files Browse the repository at this point in the history
* fix: rerun failed task and fix processing state

* fix: demand only when doesn't rerun
  • Loading branch information
jyeshe committed Aug 18, 2022
1 parent 5868472 commit 8afcb9f
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 9 deletions.
18 changes: 13 additions & 5 deletions lib/ae_mdw/sync/async_tasks/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
end

@doc """
Just acknowledge (ignore) the DOWN event.
Rerun failed task or put it back to queue.
"""
def handle_info({:DOWN, _ref, :process, _pid, _reason}, %State{} = state) do
schedule_demand()

{:noreply, state}
def handle_info(
{:DOWN, ref, :process, _pid, _reason},
%State{task: task, m_task: m_task} = state
) do
if task != nil and task.ref == ref do
new_task = run_supervised(m_task)
{:noreply, %State{task: new_task, m_task: m_task}}
else
schedule_demand()
Producer.notify_error(m_task)
{:noreply, state}
end
end

#
Expand Down
12 changes: 11 additions & 1 deletion lib/ae_mdw/sync/async_tasks/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule AeMdw.Sync.AsyncTasks.Producer do
@impl GenServer
def init(:ok) do
Store.reset()
{:ok, %{dequeue_buffer: []}}
{:ok, :no_state}
end

@spec enqueue(atom(), list(), list(), only_new: boolean()) :: :ok
Expand Down Expand Up @@ -54,4 +54,14 @@ defmodule AeMdw.Sync.AsyncTasks.Producer do

:ok
end

@spec notify_error(Model.async_task_index()) :: :ok
def notify_error(task_index) do
Store.set_unprocessed(task_index)

Store.count_unprocessed()
|> Stats.update_buffer_len()

:ok
end
end
8 changes: 7 additions & 1 deletion lib/ae_mdw/sync/async_tasks/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,16 @@ defmodule AeMdw.Sync.AsyncTasks.Store do
end
end

@spec set_unprocessed(Model.async_task_index()) :: :ok
def set_unprocessed(task_index) do
:ets.delete(@processing_tab, task_index)
:ok
end

@spec set_done(Model.async_task_index(), Model.async_task_args()) :: :ok
def set_done({_ts, task_type} = task_index, args) do
Database.dirty_delete(Model.AsyncTask, task_index)
:ets.delete_object(@processing_tab, task_index)
:ets.delete(@processing_tab, task_index)
:ets.delete(@pending_tab, {task_type, args})

:ok
Expand Down
68 changes: 68 additions & 0 deletions test/ae_mdw/db/sync/async_tasks/consumer_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule AeMdw.Sync.AsyncTasks.ConsumerTest do
use ExUnit.Case

alias AeMdw.AsyncTaskTestUtil
alias AeMdw.Db.Aex9BalancesCache
alias AeMdw.Db.Model
alias AeMdw.Sync.AsyncTasks

import Mock

require Model

test "enqueue and dequeue with failed task" do
contract_pk = :crypto.strong_rand_bytes(32)
{kbi, mbi} = block_index = {543_210, 10}
kb_hash = :crypto.strong_rand_bytes(32)
next_mb_hash = :crypto.strong_rand_bytes(32)

Aex9BalancesCache.put(contract_pk, block_index, next_mb_hash, %{
{:address, :crypto.strong_rand_bytes(32)} => <<>>
})

with_mocks [
{AeMdw.Node.Db, [],
[
get_key_block_hash: fn height ->
assert height == kbi + 1
Process.sleep(1_000)
kb_hash
end,
get_next_hash: fn ^kb_hash, ^mbi -> next_mb_hash end
]}
] do
AsyncTasks.Supervisor.start_link([])

AsyncTasks.Producer.enqueue(
:update_aex9_state,
[contract_pk],
[
block_index,
Enum.random(1_000_000..99_000_000)
],
only_new: true
)

consumer_pid = AsyncTaskTestUtil.wakeup_consumer()

task_pid1 =
Enum.reduce_while(1..100, nil, fn _i, _acc ->
Process.sleep(10)

case :sys.get_state(consumer_pid) do
%{task: %Task{pid: task_pid}} -> {:halt, task_pid}
_no_task -> {:cont, nil}
end
end)

Process.exit(task_pid1, :kill)

assert Enum.any?(1..100, fn _i ->
case :sys.get_state(consumer_pid) do
%{task: %Task{pid: task_pid2}} -> task_pid2 != task_pid1
_no_task -> false
end
end)
end
end
end
14 changes: 12 additions & 2 deletions test/ae_mdw/db/sync/async_tasks/producer_consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ defmodule AeMdw.Sync.AsyncTasks.ProducerConsumerTest do

alias AeMdw.AsyncTaskTestUtil
alias AeMdw.Db.Aex9BalancesCache
alias AeMdw.Db.Model
alias AeMdw.Sync.AsyncTasks

import Mock

require Model

test "enqueue and dequeue" do
contract_pk = :crypto.strong_rand_bytes(32)
{kbi, mbi} = block_index = {543_210, 10}
Expand All @@ -29,12 +32,14 @@ defmodule AeMdw.Sync.AsyncTasks.ProducerConsumerTest do
] do
AsyncTasks.Supervisor.start_link([])

call_txi = Enum.random(1_000_000..99_000_000)

AsyncTasks.Producer.enqueue(
:update_aex9_state,
[contract_pk],
[
block_index,
Enum.random(1_000_000..99_000_000)
call_txi
],
only_new: true
)
Expand All @@ -43,7 +48,12 @@ defmodule AeMdw.Sync.AsyncTasks.ProducerConsumerTest do

assert Enum.any?(1..20, fn _i ->
Process.sleep(50)
%{dequeue_buffer: []} == :sys.get_state(AsyncTasks.Producer)

nil ==
AsyncTaskTestUtil.list_pending()
|> Enum.find(fn Model.async_task(args: args, extra_args: extra_args) ->
args == [contract_pk] and extra_args == [block_index, call_txi]
end)
end)
end
end
Expand Down
1 change: 1 addition & 0 deletions test/ae_mdw/sync/async_tasks/store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ defmodule AeMdw.Sync.AsyncTasks.StoreTest do

refute Enum.find(tasks, &(&1 == m_task1))
refute Database.exists?(Model.AsyncTask, task_index1)
refute :ets.member(:async_tasks_processing, task_index1)
assert Enum.find(tasks, &(&1 == m_task2))
end
end
Expand Down
17 changes: 17 additions & 0 deletions test/support/ae_mdw/async_task_test_util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ defmodule AeMdw.AsyncTaskTestUtil do
end)
end

@spec wakeup_consumer() :: pid()
def wakeup_consumer do
AsyncTasks.Supervisor.start_link([])
Process.sleep(100)

{_id, consumer_pid, _type, _mod} =
AsyncTasks.Supervisor
|> Supervisor.which_children()
|> Enum.find(fn {id, _pid, _type, _mod} ->
is_binary(id) and String.starts_with?(id, "Elixir.AeMdw.Sync.AsyncTasks.Consumer")
end)

Process.send(consumer_pid, :demand, [:noconnect])

consumer_pid
end

@spec list_pending() :: [Model.async_task_record()]
def list_pending do
:async_tasks_pending
Expand Down

0 comments on commit 8afcb9f

Please sign in to comment.