Skip to content

Commit

Permalink
chore: add mistakenly removed async in-mem tasks (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
sborrazas committed Jun 30, 2022
1 parent bff7d0f commit b607abb
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 65 deletions.
9 changes: 4 additions & 5 deletions lib/ae_mdw/db/contract.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule AeMdw.Db.Contract do
alias AeMdw.Log
alias AeMdw.Node
alias AeMdw.Node.Db
alias AeMdw.Sync.AsyncTasks
alias AeMdw.Validate

require Ex2ms
Expand Down Expand Up @@ -376,12 +375,12 @@ defmodule AeMdw.Db.Contract do
with false <- State.exists?(state, Model.AexnContract, {:aex9, contract_pk}),
{:ok, extensions} <- AexnContracts.call_extensions(:aex9, contract_pk),
{:ok, aex9_meta_info} <- AexnContracts.call_meta_info(:aex9, contract_pk) do
AsyncTasks.Producer.enqueue(:derive_aex9_presence, [contract_pk, kbi, mbi, txi])
aexn_creation_write(state, :aex9, aex9_meta_info, contract_pk, txi, extensions)
state
|> State.enqueue(:derive_aex9_presence, [contract_pk, kbi, mbi, txi])
|> aexn_creation_write(:aex9, aex9_meta_info, contract_pk, txi, extensions)
else
true ->
AsyncTasks.Producer.enqueue(:update_aex9_state, [contract_pk], [block_index, txi])
state
State.enqueue(state, :update_aex9_state, [contract_pk], [block_index, txi])

:error ->
state
Expand Down
7 changes: 3 additions & 4 deletions lib/ae_mdw/db/mutations/aexn_create_contract_mutation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule AeMdw.Db.AexnCreateContractMutation do
alias AeMdw.Db.Contract, as: DBContract
alias AeMdw.Db.Model
alias AeMdw.Db.State
alias AeMdw.Sync.AsyncTasks
alias AeMdw.Txs

@derive AeMdw.Db.Mutation
Expand Down Expand Up @@ -75,9 +74,9 @@ defmodule AeMdw.Db.AexnCreateContractMutation do
)

if aexn_type == :aex9 do
AsyncTasks.Producer.enqueue(:derive_aex9_presence, [contract_pk, kbi, mbi, create_txi])
State.enqueue(state, :derive_aex9_presence, [contract_pk, kbi, mbi, create_txi])
else
state
end

state
end
end
10 changes: 6 additions & 4 deletions lib/ae_mdw/db/mutations/contract_call_mutation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ defmodule AeMdw.Db.ContractCallMutation do
alias AeMdw.Db.Contract, as: DBContract
alias AeMdw.Db.Origin
alias AeMdw.Db.State
alias AeMdw.Sync.AsyncTasks
alias AeMdw.Txs

@derive AeMdw.Db.Mutation
Expand Down Expand Up @@ -60,9 +59,12 @@ defmodule AeMdw.Db.ContractCallMutation do
state
) do
# update balance on any aex9 call
if AexnContracts.is_aex9?(contract_pk) do
AsyncTasks.Producer.enqueue(:update_aex9_state, [contract_pk], [block_index, txi])
end
state =
if AexnContracts.is_aex9?(contract_pk) do
State.enqueue(state, :update_aex9_state, [contract_pk], [block_index, txi])
else
state
end

create_txi =
case State.cache_get(state, :ct_create_sync_cache, contract_pk) do
Expand Down
53 changes: 48 additions & 5 deletions lib/ae_mdw/db/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ defmodule AeMdw.Db.State do
alias AeMdw.Db.Store
alias AeMdw.Db.TxnDbStore
alias AeMdw.Db.Util, as: DbUtil
alias AeMdw.Sync.AsyncTasks.Consumer
alias AeMdw.Sync.AsyncTasks.Producer

defstruct [:store, :stats, :cache]
defstruct [:store, :stats, :cache, :jobs]

@typep key() :: Database.key()
@typep record() :: Database.record()
Expand All @@ -23,25 +25,29 @@ defmodule AeMdw.Db.State do
@typep stat_name() :: atom()
@typep cache_name() :: atom()
@typep height() :: Blocks.height()
@typep job_type() :: Consumer.job_type()

@opaque t() :: %__MODULE__{
store: Store.t(),
stats: %{stat_name() => non_neg_integer()},
cache: %{cache_name() => map()}
cache: %{cache_name() => map()},
jobs: %{{job_type(), list()} => list()}
}

@state_pm_key :global_state
@async_task_mem_timeout 1_000

@spec new(Store.t()) :: t()
def new(store \\ DbStore.new()),
do: %__MODULE__{store: store, stats: %{}, cache: %{}}
do: %__MODULE__{store: store, stats: %{}, cache: %{}, jobs: %{}}

@spec height(t()) :: height()
def height(state), do: DbUtil.synced_height(state)

@spec commit(t(), [Mutation.t()]) :: t()
def commit(%__MODULE__{store: prev_store} = state, mutations) do
new_state =
%__MODULE__{jobs: jobs} =
TxnDbStore.transaction(fn store ->
state2 = %__MODULE__{state | store: store}

Expand All @@ -51,6 +57,8 @@ defmodule AeMdw.Db.State do
|> Enum.reduce(state2, &Mutation.execute/2)
end)

queue_jobs(jobs)

:persistent_term.erase(@state_pm_key)

%__MODULE__{new_state | store: prev_store}
Expand All @@ -62,14 +70,17 @@ defmodule AeMdw.Db.State do
@spec commit_mem(t(), [Mutation.t()]) :: t()
def commit_mem(state, mutations) do
state2 =
%__MODULE__{jobs: jobs} =
mutations
|> List.flatten()
|> Enum.reject(&is_nil/1)
|> Enum.reduce(state, &Mutation.execute/2)

:persistent_term.put(@state_pm_key, state2)
state3 = exec_jobs(state2, jobs)

:persistent_term.put(@state_pm_key, state3)

state2
state3
end

@spec mem_state() :: t()
Expand Down Expand Up @@ -159,4 +170,36 @@ defmodule AeMdw.Db.State do
:error -> :not_found
end
end

@spec enqueue(t(), job_type(), list(), list()) :: t()
def enqueue(%__MODULE__{jobs: jobs} = state, job_type, dedup_args, extra_args \\ []),
do: %__MODULE__{state | jobs: Map.put(jobs, {job_type, dedup_args}, extra_args)}

defp exec_jobs(state, jobs) do
jobs
|> Task.async_stream(
fn {{job_type, dedup_args}, extra_args} ->
Consumer.mutations(job_type, dedup_args ++ extra_args)
end,
ordered: false,
timeout: @async_task_mem_timeout,
on_timeout: :kill_task
)
|> Enum.flat_map(fn
{:ok, mutations} -> mutations
# Ignoring long tasks for in-memory computations
{:exit, :timeout} -> []
end)
|> List.flatten()
|> Enum.reject(&is_nil/1)
|> Enum.reduce(state, &Mutation.execute/2)
end

defp queue_jobs(jobs) do
Enum.each(jobs, fn {{job_type, dedup_args}, extra_args} ->
Producer.enqueue(job_type, dedup_args, extra_args)
end)

Producer.commit_enqueued()
end
end
9 changes: 9 additions & 0 deletions lib/ae_mdw/sync/async_tasks/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
use GenServer

alias AeMdw.Db.Model
alias AeMdw.Db.Mutation
alias AeMdw.Log

alias AeMdw.Sync.AsyncTasks.Producer
Expand All @@ -26,6 +27,8 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
update_aex9_state: UpdateAex9State
}

@type job_type() :: Model.async_task_type()

# @max_retries 2
# @backoff_msecs 10_000

Expand Down Expand Up @@ -121,6 +124,12 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
{task, timer_ref}
end

@spec mutations(Model.async_task_type(), Model.async_task_args()) :: [Mutation.t()]
def mutations(task_type, args) do
mod = @type_mod[task_type]
apply(mod, :mutations, [args])
end

#
# Private functions
#
Expand Down
38 changes: 23 additions & 15 deletions lib/ae_mdw/sync/async_tasks/derive_aex9_presence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ defmodule AeMdw.Sync.AsyncTasks.DeriveAex9Presence do

alias AeMdw.Node.Db, as: DBN

alias AeMdw.Database
alias AeMdw.Db.DeriveAex9PresenceMutation
alias AeMdw.Db.Model
alias AeMdw.Db.Mutation
alias AeMdw.Db.WriteMutation
alias AeMdw.Db.State
alias AeMdw.Log

Expand All @@ -19,33 +20,40 @@ defmodule AeMdw.Sync.AsyncTasks.DeriveAex9Presence do
@microsecs 1_000_000

@spec process(args :: list()) :: :ok
def process([contract_pk, kbi, mbi, create_txi]) do
def process([contract_pk, _kbi, _mbi, _create_txi] = args) do
Log.info("[:derive_aex9_presence] #{inspect(contract_pk)} ...")

{time_delta, {balances, _last_block_tuple}} =
:timer.tc(fn ->
next_kb_hash = DBN.get_key_block_hash(kbi + 1)
next_hash = DBN.get_next_hash(next_kb_hash, mbi)

DBN.aex9_balances(contract_pk, {nil, kbi, next_hash})
end)
{time_delta, mutations} = :timer.tc(fn -> mutations(args) end)

Log.info("[:derive_aex9_presence] #{inspect(contract_pk)} after #{time_delta / @microsecs}s")

State.commit(State.new(), mutations)

:ok
end

@spec mutations(args :: list()) :: [Mutation.t()]
def mutations([contract_pk, kbi, mbi, create_txi]) do
next_kb_hash = DBN.get_key_block_hash(kbi + 1)
next_hash = DBN.get_next_hash(next_kb_hash, mbi)

{balances, _last_block_tuple} = DBN.aex9_balances(contract_pk, {nil, kbi, next_hash})

if map_size(balances) == 0 do
m_empty_balance = Model.aex9_balance(index: {contract_pk, <<>>})
Database.dirty_write(Model.Aex9Balance, m_empty_balance)

[
WriteMutation.new(Model.Aex9Balance, m_empty_balance)
]
else
balances =
Enum.map(balances, fn {{:address, account_pk}, amount} ->
{account_pk, amount}
end)

mutation = DeriveAex9PresenceMutation.new(contract_pk, {kbi, mbi}, create_txi, balances)

State.commit(State.new(), [mutation])
[
DeriveAex9PresenceMutation.new(contract_pk, {kbi, mbi}, create_txi, balances)
]
end

:ok
end
end
32 changes: 19 additions & 13 deletions lib/ae_mdw/sync/async_tasks/update_aex9_presence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule AeMdw.Sync.AsyncTasks.UpdateAex9State do
alias AeMdw.Node.Db, as: DBN

alias AeMdw.Db.Model
alias AeMdw.Db.Mutation
alias AeMdw.Db.State
alias AeMdw.Db.UpdateAex9PresenceMutation
alias AeMdw.Log
Expand All @@ -17,30 +18,35 @@ defmodule AeMdw.Sync.AsyncTasks.UpdateAex9State do
@microsecs 1_000_000

@spec process(args :: list()) :: :ok
def process([contract_pk, {kbi, mbi} = block_index, call_txi]) do
def process([contract_pk, _block_index, _call_txi] = args) do
Log.info("[update_aex9_state] #{inspect(enc_ct(contract_pk))} ...")

{time_delta, {balances, _height_hash}} =
:timer.tc(fn ->
next_kb_hash = DBN.get_key_block_hash(kbi + 1)
next_hash = DBN.get_next_hash(next_kb_hash, mbi)
type = if next_hash == next_kb_hash, do: :key, else: :micro

DBN.aex9_balances(contract_pk, {type, kbi, next_hash})
end)
{time_delta, mutations} = :timer.tc(fn -> mutations(args) end)

Log.info(
"[update_aex9_state] #{inspect(enc_ct(contract_pk))} after #{time_delta / @microsecs}s"
)

balances = Enum.map(balances, fn {{:address, account_pk}, amount} -> {account_pk, amount} end)

mutation = UpdateAex9PresenceMutation.new(contract_pk, block_index, call_txi, balances)
State.commit(State.new(), [mutation])
State.commit(State.new(), mutations)

:ok
end

@spec mutations(args :: list()) :: [Mutation.t()]
def mutations([contract_pk, {kbi, mbi} = block_index, call_txi]) do
next_kb_hash = DBN.get_key_block_hash(kbi + 1)
next_hash = DBN.get_next_hash(next_kb_hash, mbi)
type = if next_hash == next_kb_hash, do: :key, else: :micro

{balances, _height_hash} = DBN.aex9_balances(contract_pk, {type, kbi, next_hash})

balances = Enum.map(balances, fn {{:address, account_pk}, amount} -> {account_pk, amount} end)

[
UpdateAex9PresenceMutation.new(contract_pk, block_index, call_txi, balances)
]
end

defp enc_ct(<<pk::binary-32>>), do: :aeser_api_encoder.encode(:contract_pubkey, pk)
defp enc_ct(invalid_pk), do: invalid_pk
end
3 changes: 0 additions & 3 deletions lib/ae_mdw/sync/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ defmodule AeMdw.Sync.Server do
alias AeMdw.Db.Status
alias AeMdw.Db.Sync.Block
alias AeMdw.Log
alias AeMdw.Sync.AsyncTasks.Producer
alias AeMdwWeb.Websocket.Broadcaster

require Logger
Expand Down Expand Up @@ -299,8 +298,6 @@ defmodule AeMdw.Sync.Server do

new_state = State.commit_db(state, chunk_mutations)

Producer.commit_enqueued()

Enum.each(blocks_mutations_chunk, fn {{_height, mbi} = _block_index, block,
_block_mutations} ->
broadcast_block(block, mbi == -1)
Expand Down
6 changes: 2 additions & 4 deletions test/ae_mdw/db/contract_create_mutation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ defmodule AeMdw.Db.ContractCreateMutationTest do
assert 2 == State.get_stat(state2, :contracts_created, 0)
assert {:ok, create_txi2} == State.cache_get(state2, :ct_create_sync_cache, contract_pk)

assert %{enqueue_buffer: enqueue_buffer} = :sys.get_state(AsyncTasks.Producer)

assert Enum.any?(
enqueue_buffer,
&(&1 == {:update_aex9_state, [remote_pk], [block_index, create_txi2]})
state2.jobs,
&(&1 == {{:update_aex9_state, [remote_pk]}, [block_index, create_txi2]})
)
end
end
Expand Down
Loading

0 comments on commit b607abb

Please sign in to comment.