Skip to content

Commit

Permalink
fix: increase long tasks throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe authored and thepiwo committed Aug 11, 2022
1 parent 26b4bd4 commit f93d72b
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 181 deletions.
74 changes: 19 additions & 55 deletions lib/ae_mdw/sync/async_tasks/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,21 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
require Model
require Logger

import AeMdw.Util, only: [ok!: 1]

@base_sleep_msecs 100
@yield_timeout_msecs 100
@task_timeout_msecs 40_000
@wait_msecs 1_000

@type_mod %{
update_aex9_state: UpdateAex9State
}

@type task_type() :: Model.async_task_type()

# @max_retries 2
# @backoff_msecs 10_000

defmodule State do
@moduledoc """
GenServer state
"""
@type t :: %__MODULE__{}

defstruct task: nil, m_task: nil, timer_ref: nil
defstruct task: nil, m_task: nil
end

@spec start_link(any()) :: GenServer.on_start()
Expand All @@ -45,7 +38,7 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do

@impl GenServer
def init(:ok) do
schedule_demand()
schedule_demand(@wait_msecs)
{:ok, %State{}}
end

Expand All @@ -57,36 +50,12 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
{:noreply, demand()}
end

@doc """
Handle async task timeout.
"""
def handle_info(:timeout, %State{task: task, m_task: m_task}) do
case Task.yield(task, @yield_timeout_msecs) do
nil ->
Task.shutdown(task, :brutal_kill)
Producer.notify_timeout(m_task)

_not_running ->
:noop
end

schedule_demand()

{:noreply, %State{}}
end

@doc """
When the task finishes, demonitor and demands next task.
"""
def handle_info({ref, ok_res}, %State{task: current_task, timer_ref: timer_ref}) do
def handle_info({ref, _ok_res}, _state) do
Process.demonitor(ref, [:flush])

if ok_res != :ok do
Log.warn("Async task returned #{ok_res}, task=#{inspect(current_task)}")
end

if nil != timer_ref, do: :timer.cancel(timer_ref)

schedule_demand()

{:noreply, %State{}}
Expand All @@ -96,28 +65,27 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
Just acknowledge (ignore) the DOWN event.
"""
def handle_info({:DOWN, _ref, :process, _pid, _reason}, %State{} = state) do
schedule_demand()

{:noreply, state}
end

#
# Used by consumers only
#
@spec run_supervised(Model.async_task_record(), boolean()) ::
{Task.t(), :timer.tref() | nil}
def run_supervised(m_task, is_long? \\ false) do
@spec run_supervised(Model.async_task_record()) :: Task.t()
def run_supervised(m_task) do
task =
Task.Supervisor.async_nolink(
TaskSupervisor,
fn ->
:ok = process(m_task, is_long?)
:ok = process(m_task)
end
)

Log.info("[#{inspect(task.ref)}] #{inspect(m_task)}")

timer_ref = if not is_long?, do: ok!(:timer.send_after(@task_timeout_msecs, :timeout))

{task, timer_ref}
task
end

#
Expand All @@ -128,33 +96,29 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
m_task = Producer.dequeue()

if nil != m_task do
{task, timer_ref} = run_supervised(m_task)
task = run_supervised(m_task)

%State{
task: task,
m_task: m_task,
timer_ref: timer_ref
m_task: m_task
}
else
schedule_demand()
schedule_demand(@wait_msecs)
%State{}
end
end

@spec process(Model.async_task_record(), boolean()) :: :ok
defp process(
Model.async_task(index: {_ts, type} = index, args: args, extra_args: extra_args),
is_long?
) do
@spec process(Model.async_task_record()) :: :ok
defp process(Model.async_task(index: {_ts, type} = index, args: args, extra_args: extra_args)) do
mod = @type_mod[type]
done_fn = fn -> Producer.notify_consumed(index, args, is_long?) end
done_fn = fn -> Producer.notify_consumed(index, args) end
apply(mod, :process, [args ++ extra_args, done_fn])
:ok
end

@spec schedule_demand() :: :ok
defp schedule_demand() do
Process.send_after(self(), :demand, @base_sleep_msecs)
@spec schedule_demand(non_neg_integer()) :: :ok
defp schedule_demand(sleep \\ 0) do
Process.send_after(self(), :demand, sleep)
:ok
end
end
100 changes: 0 additions & 100 deletions lib/ae_mdw/sync/async_tasks/long_task_consumer.ex

This file was deleted.

19 changes: 3 additions & 16 deletions lib/ae_mdw/sync/async_tasks/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ defmodule AeMdw.Sync.AsyncTasks.Producer do
use GenServer

alias AeMdw.Db.Model
alias AeMdw.Log
alias AeMdw.Sync.AsyncTasks.LongTaskConsumer
alias AeMdw.Sync.AsyncTasks.Store
alias AeMdw.Sync.AsyncTasks.Stats

Expand Down Expand Up @@ -44,21 +42,10 @@ defmodule AeMdw.Sync.AsyncTasks.Producer do
GenServer.call(__MODULE__, :dequeue, @long_timeout_ms)
end

@spec notify_consumed(Model.async_task_index(), Model.async_task_args(), boolean()) :: :ok
def notify_consumed(task_index, task_args, is_long?) do
@spec notify_consumed(Model.async_task_index(), Model.async_task_args()) :: :ok
def notify_consumed(task_index, task_args) do
Store.set_done(task_index, task_args)
Stats.update_consumed(is_long?)

if is_long?, do: Log.info("Long task finished: #{inspect(task_index)}")

:ok
end

@spec notify_timeout(Model.async_task_record()) :: :ok
def notify_timeout(m_task) do
Log.warn("Long task enqueued: #{inspect(m_task)}")
LongTaskConsumer.enqueue(m_task)
Stats.inc_long_tasks_count()
Stats.update_consumed(false)

:ok
end
Expand Down
11 changes: 5 additions & 6 deletions lib/ae_mdw/sync/async_tasks/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule AeMdw.Sync.AsyncTasks.Supervisor do
use Supervisor

alias AeMdw.Sync.AsyncTasks.Consumer
alias AeMdw.Sync.AsyncTasks.LongTaskConsumer
alias AeMdw.Sync.AsyncTasks.Producer
alias AeMdw.Sync.AsyncTasks.TaskSupervisor

Expand All @@ -16,11 +15,11 @@ defmodule AeMdw.Sync.AsyncTasks.Supervisor do

@impl Supervisor
def init(:ok) do
children = [
{Task.Supervisor, name: TaskSupervisor},
Producer,
LongTaskConsumer | consumers()
]
children =
[
{Task.Supervisor, name: TaskSupervisor},
Producer
] ++ consumers()

Supervisor.init(children, strategy: :one_for_all)
end
Expand Down
2 changes: 1 addition & 1 deletion test/ae_mdw/db/state_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ defmodule AeMdw.Db.StateTest do
balances = %{{:address, account_pk} => amount}

ct_pks =
Enum.map(1..100, fn _i ->
Enum.map(1..150, fn _i ->
ct_pk = :crypto.strong_rand_bytes(32)
Aex9BalancesCache.put(ct_pk, block_index, @next_hash, balances)
ct_pk
Expand Down
6 changes: 3 additions & 3 deletions test/ae_mdw/sync/async_tasks/update_aex9_state_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ defmodule AeMdw.Sync.AsyncTasks.UpdateAex9StateTest do
amount1 = @amount1
amount2 = @amount2

UpdateAex9State.process([@contract_pk1, block_index, call_txi], fn -> :ok end)
assert :ok = UpdateAex9State.process([@contract_pk1, block_index, call_txi], fn -> :ok end)

state = State.new(AsyncStore.instance())

Expand All @@ -94,7 +94,7 @@ defmodule AeMdw.Sync.AsyncTasks.UpdateAex9StateTest do
block_index = {@kbi, @mbi}
call_txi = 12_345_680

UpdateAex9State.process([@contract_pk2, block_index, call_txi], fn -> :ok end)
assert :ok = UpdateAex9State.process([@contract_pk2, block_index, call_txi], fn -> :ok end)

state = State.new(AsyncStore.instance())

Expand All @@ -119,7 +119,7 @@ defmodule AeMdw.Sync.AsyncTasks.UpdateAex9StateTest do
{:address, account_pk2} => amount2
})

UpdateAex9State.process([@contract_pk3, {@kbi, @mbi}, call_txi], fn -> :ok end)
assert :ok = UpdateAex9State.process([@contract_pk3, {@kbi, @mbi}, call_txi], fn -> :ok end)
state = State.new(AsyncStore.instance())

Enum.any?(1..10, fn _i ->
Expand Down

0 comments on commit f93d72b

Please sign in to comment.