Skip to content

Commit

Permalink
feat: synchronize async tasks write (#818)
Browse files Browse the repository at this point in the history
- Runs aex9 tasks asynchronously
- Purges aex9 in-memory and persisted records belonging to invalidated blocks
- Synchronizes the writing of async task result with the main sync database transaction so that if the task finishes later it writes directly to the database.
- Avoid rerunning the tasks for on-db sync (use previous in-memory task result to write to database)
  • Loading branch information
jyeshe committed Aug 5, 2022
1 parent 278e5ee commit 407576e
Show file tree
Hide file tree
Showing 30 changed files with 1,125 additions and 670 deletions.
1 change: 1 addition & 0 deletions .iex.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
alias AeMdw.Db.Model
alias AeMdw.Db.Name
alias AeMdw.Db.Util
alias AeMdw.Db.State

alias AeMdw.Contract
alias AeMdw.Database
Expand Down
6 changes: 3 additions & 3 deletions lib/ae_mdw/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ defmodule AeMdw.Application do
AeMdw.Sync.AsyncTasks.Stats.init()
AeMdw.Sync.AsyncTasks.Store.init()

AeMdw.Db.AsyncStore.init()
AeMdw.Db.Aex9BalancesCache.init()

AeMdw.Db.RocksDbCF.init_tables()
end

Expand All @@ -248,9 +251,6 @@ defmodule AeMdw.Application do
end

def start_phase(:start_sync, _start_type, []) do
AeMdw.Db.AsyncStore.init()
AeMdw.Db.Aex9BalancesCache.init()

if Application.fetch_env!(:ae_mdw, :sync) do
Watcher.start_sync()
end
Expand Down
14 changes: 8 additions & 6 deletions lib/ae_mdw/db/aex9_balances_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ defmodule AeMdw.Db.Aex9BalancesCache do
end

@spec put(NodeDb.pubkey(), Blocks.block_index(), Blocks.block_hash(), balances()) :: :ok
def put(contract_pk, height, block_hash, balances) do
EtsCache.put(@table, {contract_pk, height, block_hash}, balances)
def put(contract_pk, block_index, block_hash, balances) do
EtsCache.put(@table, {contract_pk, block_index, block_hash}, balances)
:ok
end

@spec purge(NodeDb.pubkey(), Blocks.block_index()) :: :ok
@spec purge(NodeDb.pubkey(), Blocks.block_index()) :: balances()
def purge(contract_pk, block_index) do
with {^contract_pk, ^block_index, hash} <-
EtsCache.next(@table, {contract_pk, block_index, <<>>}) do
EtsCache.next(@table, {contract_pk, block_index, <<>>}),
{balances, _time} <- EtsCache.get(@table, {contract_pk, block_index, hash}) do
EtsCache.del(@table, {contract_pk, block_index, hash})
balances
else
_other_or_nil -> %{}
end

:ok
end
end
14 changes: 12 additions & 2 deletions lib/ae_mdw/db/async_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule AeMdw.Db.AsyncStore do
"""

alias AeMdw.Database
alias AeMdw.Db.Mutation
alias AeMdw.EtsCache

@derive AeMdw.Db.Store
Expand All @@ -31,6 +32,15 @@ defmodule AeMdw.Db.AsyncStore do
%__MODULE__{tid: @table_id}
end

@spec mutations(t()) :: [Mutation.t()]
def mutations(%__MODULE__{tid: tid}) do
tid
|> EtsCache.all()
|> Enum.map(fn {{table, _key}, record, _time} ->
AeMdw.Db.WriteMutation.new(table, record)
end)
end

@spec put(t(), table(), record()) :: t()
def put(%__MODULE__{tid: tid} = store, table, record) do
EtsCache.put(tid, {table, elem(record, 1)}, record)
Expand Down Expand Up @@ -81,8 +91,8 @@ defmodule AeMdw.Db.AsyncStore do
end
end

@spec clear_tables(t()) :: :ok
def clear_tables(%__MODULE__{tid: tid}) do
@spec clear(t()) :: :ok
def clear(%__MODULE__{tid: tid}) do
EtsCache.clear(tid)
:ok
end
Expand Down
14 changes: 10 additions & 4 deletions lib/ae_mdw/db/contract.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ defmodule AeMdw.Db.Contract do
@spec aex9_write_presence(state(), pubkey(), integer(), pubkey()) :: state()
def aex9_write_presence(state, contract_pk, txi, account_pk) do
m_acc_presence = Model.aex9_account_presence(index: {account_pk, contract_pk}, txi: txi)
m_idx_presence = Model.idx_aex9_account_presence(index: {txi, account_pk, contract_pk})

state
|> State.put(Model.Aex9AccountPresence, m_acc_presence)
|> State.put(Model.IdxAex9AccountPresence, m_idx_presence)
State.put(state, Model.Aex9AccountPresence, m_acc_presence)
end

@spec aex9_delete_presence(state(), pubkey(), pubkey()) :: state()
def aex9_delete_presence(state, account_pk, contract_pk) do
if State.exists?(state, Model.Aex9AccountPresence, {account_pk, contract_pk}) do
State.delete(state, Model.Aex9AccountPresence, {account_pk, contract_pk})
else
state
end
end

@spec aex9_transfer_balance(state(), pubkey(), pubkey(), pubkey(), non_neg_integer()) :: state()
Expand Down
23 changes: 23 additions & 0 deletions lib/ae_mdw/db/mutations/async_store_mutation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule AeMdw.Db.AsyncStoreMutation do
@moduledoc """
Writes all AsyncStore records to a state.
"""

alias AeMdw.Db.State
alias AeMdw.Sync.AsyncStoreServer

@derive AeMdw.Db.Mutation
defstruct []

@opaque t() :: %__MODULE__{}

@spec new() :: t()
def new() do
%__MODULE__{}
end

@spec execute(t(), State.t()) :: State.t()
def execute(%__MODULE__{}, state) do
AsyncStoreServer.write_async_store(state)
end
end
51 changes: 51 additions & 0 deletions lib/ae_mdw/db/mutations/purge_aex9_state_mutation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule AeMdw.Db.PurgeAex9StateMutation do
@moduledoc """
Deletes aex9 balances and presence for invalidated balances.
"""

alias AeMdw.Db.Contract
alias AeMdw.Db.Model
alias AeMdw.Db.State
alias AeMdw.Node.Db

require Model

@derive AeMdw.Db.Mutation
defstruct [:contract_pk, :accounts_pks]

@opaque t() :: %__MODULE__{
contract_pk: Db.pubkey(),
accounts_pks: [Db.pubkey()]
}

@spec new(Db.pubkey(), [Db.pubkey()]) :: t()
def new(contract_pk, accounts_pks) do
%__MODULE__{
contract_pk: contract_pk,
accounts_pks: accounts_pks
}
end

@spec execute(t(), State.t()) :: State.t()
def execute(
%__MODULE__{
contract_pk: contract_pk,
accounts_pks: accounts_pks
},
state
) do
Enum.reduce(accounts_pks, state, fn account_pk, state ->
state
|> Contract.aex9_delete_presence(account_pk, contract_pk)
|> safe_delete_balance(contract_pk, account_pk)
end)
end

defp safe_delete_balance(state, contract_pk, account_pk) do
if State.exists?(state, Model.Aex9Balance, {contract_pk, account_pk}) do
State.delete(state, Model.Aex9Balance, {contract_pk, account_pk})
else
state
end
end
end
59 changes: 20 additions & 39 deletions lib/ae_mdw/db/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ defmodule AeMdw.Db.State do

alias AeMdw.Blocks
alias AeMdw.Database
alias AeMdw.Db.Mutation
alias AeMdw.Db.AsyncStore
alias AeMdw.Db.AsyncStoreMutation
alias AeMdw.Db.DbStore
alias AeMdw.Db.MemStore
alias AeMdw.Db.Mutation
alias AeMdw.Db.Store
alias AeMdw.Db.TxnDbStore
alias AeMdw.Db.Util, as: DbUtil
Expand All @@ -26,7 +26,7 @@ defmodule AeMdw.Db.State do
@typep stat_name() :: atom()
@typep cache_name() :: atom()
@typep height() :: Blocks.height()
@typep job_type() :: Consumer.job_type()
@typep job_type() :: Consumer.task_type()

@opaque t() :: %__MODULE__{
store: Store.t(),
Expand All @@ -36,7 +36,6 @@ defmodule AeMdw.Db.State do
}

@state_pm_key :global_state
@async_task_mem_timeout 1_000

@spec new(Store.t()) :: t()
def new(store \\ DbStore.new()),
Expand All @@ -52,23 +51,29 @@ defmodule AeMdw.Db.State do
TxnDbStore.transaction(fn store ->
state2 = %__MODULE__{state | store: store}

mutations
[mutations, AsyncStoreMutation.new()]
|> List.flatten()
|> Enum.reject(&is_nil/1)
|> Enum.reduce(state2, &Mutation.execute/2)
end)

enqueue_jobs(jobs, only_new: true)

if clear_mem? do
:persistent_term.erase(@state_pm_key)
end

queue_jobs(jobs)

%__MODULE__{new_state | store: prev_store}
%__MODULE__{new_state | store: prev_store, jobs: %{}}
end

@spec commit_db(t(), [Mutation.t()], boolean()) :: t()
def commit_db(state, mutations, clear_mem? \\ true), do: commit(state, mutations, clear_mem?)
def commit_db(state, mutations, clear_mem? \\ true) do
state2 = commit(state, mutations, clear_mem?)

Producer.save_enqueued()

state2
end

@spec commit_mem(t(), [Mutation.t()]) :: t()
def commit_mem(state, mutations) do
Expand All @@ -79,11 +84,11 @@ defmodule AeMdw.Db.State do
|> Enum.reject(&is_nil/1)
|> Enum.reduce(state, &Mutation.execute/2)

exec_jobs(jobs)
enqueue_jobs(jobs, only_new: false)

:persistent_term.put(@state_pm_key, state2)

state2
%__MODULE__{state2 | jobs: %{}}
end

@spec mem_state() :: t()
Expand Down Expand Up @@ -178,37 +183,13 @@ defmodule AeMdw.Db.State do
def clear_cache(state), do: %__MODULE__{state | cache: %{}}

@spec enqueue(t(), job_type(), list(), list()) :: t()
def enqueue(%__MODULE__{jobs: jobs} = state, job_type, dedup_args, extra_args \\ []) do
%__MODULE__{state | jobs: Map.put(jobs, {job_type, dedup_args}, extra_args)}
end

defp exec_jobs(jobs) do
state = new(AsyncStore.instance())

jobs
|> Task.async_stream(
fn {{job_type, dedup_args}, extra_args} ->
Consumer.mutations(job_type, dedup_args ++ extra_args)
end,
ordered: false,
timeout: @async_task_mem_timeout,
on_timeout: :kill_task
)
|> Enum.flat_map(fn
{:ok, mutations} -> mutations
# Ignoring long tasks for in-memory computations
{:exit, :timeout} -> []
end)
|> List.flatten()
|> Enum.reject(&is_nil/1)
|> Enum.reduce(state, &Mutation.execute/2)
def enqueue(%__MODULE__{jobs: jobs} = state, task_type, dedup_args, extra_args \\ []) do
%__MODULE__{state | jobs: Map.put(jobs, {task_type, dedup_args}, extra_args)}
end

defp queue_jobs(jobs) do
defp enqueue_jobs(jobs, opts) do
Enum.each(jobs, fn {{job_type, dedup_args}, extra_args} ->
Producer.enqueue(job_type, dedup_args, extra_args)
Producer.enqueue(job_type, dedup_args, extra_args, opts)
end)

Producer.commit_enqueued()
end
end
10 changes: 8 additions & 2 deletions lib/ae_mdw/ets_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ defmodule AeMdw.EtsCache do
end

@spec put(table(), key(), val()) :: true
def put(table, key, val),
do: :ets.insert(table, {key, val, time()})
def put(table, key, val), do: :ets.insert(table, {key, val, time()})

@spec all(table()) :: [tuple()]
def all(table), do: :ets.tab2list(table)

@spec get(table(), key()) :: val() | nil
def get(table, key) do
Expand All @@ -42,6 +44,10 @@ defmodule AeMdw.EtsCache do
def del(table, key),
do: :ets.delete(table, key)

@spec member(table(), key()) :: boolean()
def member(table, key),
do: :ets.member(table, key)

@spec next(table(), key()) :: key() | nil
def next(table, key) do
case :ets.next(table, key) do
Expand Down
Loading

0 comments on commit 407576e

Please sign in to comment.