Skip to content

Commit

Permalink
chore: log mem and db sync profiling (#1449)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe committed Jul 7, 2023
1 parent ca01622 commit bde081d
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 39 deletions.
2 changes: 2 additions & 0 deletions lib/ae_mdw/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ defmodule AeMdw.Application do
BroadcasterCache.init()
MutationsCache.init()

_table = :ets.new(:sync_profiling, [:named_table, :set, :public])

AeMdw.Sync.AsyncTasks.Stats.init()
AeMdw.Sync.AsyncTasks.Store.init()
AeMdw.Sync.AsyncTasks.WealthRank.init()
Expand Down
37 changes: 21 additions & 16 deletions lib/ae_mdw/db/sync/block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,32 @@ defmodule AeMdw.Db.Sync.Block do
height = :aec_blocks.height(mblock)
mb_time = :aec_blocks.time_in_msecs(mblock)
mb_txs = :aec_blocks.txs(mblock)
events = AeMdw.Contract.get_grouped_events(mblock)
{ts, events} = :timer.tc(fn -> AeMdw.Contract.get_grouped_events(mblock) end)
_sum = :ets.update_counter(:sync_profiling, {:dryrun, height}, ts, {{:dryrun, height}, 0})
mb_model = Model.block(index: {height, mbi}, tx_index: first_txi, hash: mb_hash)
block_mutation = WriteMutation.new(Model.Block, mb_model)

mutations =
mb_txs
|> Enum.with_index(first_txi)
|> Enum.reduce([block_mutation], fn {signed_tx, txi}, mutations ->
transaction_mutations =
Transaction.transaction_mutations(
signed_tx,
txi,
{height, mbi},
mb_hash,
mb_time,
events
)

mutations ++ transaction_mutations
{ts, mutations} =
:timer.tc(fn ->
mb_txs
|> Enum.with_index(first_txi)
|> Enum.reduce([block_mutation], fn {signed_tx, txi}, mutations ->
transaction_mutations =
Transaction.transaction_mutations(
signed_tx,
txi,
{height, mbi},
mb_hash,
mb_time,
events
)

mutations ++ transaction_mutations
end)
end)

_sum = :ets.update_counter(:sync_profiling, {:txs, height}, ts, {{:txs, height}, 0})

{mutations, first_txi + length(mb_txs)}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/ae_mdw/sync/async_tasks/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ defmodule AeMdw.Sync.AsyncTasks.Consumer do
end
)

Log.info("[#{inspect(task.ref)}] #{inspect(m_task)}")
Log.info("[task_run] #{inspect(Model.async_task(m_task, :index))}")

task
end
Expand Down
83 changes: 62 additions & 21 deletions lib/ae_mdw/sync/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -277,25 +277,30 @@ defmodule AeMdw.Sync.Server do
end)
end

defp exec_db_mutations(gens_mutations, state, clear_mem?) do
blocks_mutations =
Enum.flat_map(gens_mutations, fn {_height, blocks_mutations} -> blocks_mutations end)
defp exec_db_mutations(gens_mutations, initial_state, clear_mem?) do
gens_mutations
|> Enum.reduce(initial_state, fn {height, blocks_mutations}, state ->
{ts, new_state} = :timer.tc(fn -> exec_db_height(state, blocks_mutations, clear_mem?) end)

enqueue_state = maybe_enqueue_accounts_balance(state, blocks_mutations)
mutations = Enum.map(blocks_mutations, &elem(&1, 2))
:ok = profile_sync("sync_db", height, ts, mutations)

new_state =
Enum.reduce(blocks_mutations, enqueue_state, fn {_bi, _block, block_mutations}, state ->
State.commit_db(state, block_mutations, clear_mem?)
end)
new_state
end)
|> tap(fn _state -> broadcast_blocks(gens_mutations) end)
end

broadcast_blocks(gens_mutations)
defp exec_db_height(state, blocks_mutations, clear_mem?) do
enqueue_state = maybe_enqueue_accounts_balance(state, blocks_mutations)

new_state
Enum.reduce(blocks_mutations, enqueue_state, fn {_bi, _block, block_mutations}, state ->
State.commit_db(state, block_mutations, clear_mem?)
end)
end

defp maybe_enqueue_accounts_balance(state, block_mutations) do
defp maybe_enqueue_accounts_balance(state, gen_mutations) do
accounts_set =
Enum.reduce(block_mutations, MapSet.new(), fn
Enum.reduce(gen_mutations, MapSet.new(), fn
{{_height, -1}, _block, _mutations}, set ->
set

Expand All @@ -304,16 +309,16 @@ defmodule AeMdw.Sync.Server do
end)

if MapSet.size(accounts_set) > 0 do
{block_index, mb_hash} = last_microblock(block_mutations)
{block_index, mb_hash} = last_microblock(gen_mutations)
State.enqueue(state, :store_acc_balance, [mb_hash, block_index], [accounts_set])
else
state
end
end

defp last_microblock(block_mutations) do
defp last_microblock(blocks_mutations) do
{block_index, mblock, _mutations} =
block_mutations
blocks_mutations
|> Enum.filter(fn {{_height, mbi}, _block, _mutations} -> mbi != -1 end)
|> Enum.max_by(fn {block_index, _block, _mutations} -> block_index end)

Expand All @@ -324,20 +329,32 @@ defmodule AeMdw.Sync.Server do

defp exec_mem_mutations(gens_mutations, initial_state) do
new_state =
Enum.reduce(gens_mutations, initial_state, fn {_height, gen_mutations}, state ->
blocks_mutations =
Enum.map(gen_mutations, fn {_block_index, _block, mutations} -> mutations end)
Enum.reduce(gens_mutations, initial_state, fn {height, gen_mutations}, state ->
{ts, {commited_state, mutations}} =
:timer.tc(fn -> exec_mem_height(state, gen_mutations) end)

state
|> maybe_enqueue_accounts_balance(gen_mutations)
|> State.commit_mem(blocks_mutations)
:ok = profile_sync("sync_mem", height, ts, mutations)

commited_state
end)

broadcast_blocks(gens_mutations)

new_state
end

defp exec_mem_height(state, gen_mutations) do
blocks_mutations =
Enum.map(gen_mutations, fn {_block_index, _block, mutations} -> mutations end)

new_state =
state
|> maybe_enqueue_accounts_balance(gen_mutations)
|> State.commit_mem(blocks_mutations)

{new_state, blocks_mutations}
end

defp spawn_task(fun) do
%Task{ref: ref} = Task.Supervisor.async_nolink(@task_supervisor, fun)

Expand Down Expand Up @@ -376,4 +393,28 @@ defmodule AeMdw.Sync.Server do
count
end
end

defp profile_sync(sync_type, height, exec_ts, mutations) do
with [{_key, dryrun_ts}] <- :ets.lookup(:sync_profiling, {:dryrun, height}),
[{_key, txs_ts}] <- :ets.lookup(:sync_profiling, {:txs, height}) do
true = :ets.delete(:sync_profiling, {:dryrun, height - 1})
true = :ets.delete(:sync_profiling, {:txs, height - 1})

mutations_map =
mutations
|> List.flatten()
|> Enum.frequencies_by(fn
nil -> nil
%mod{} -> mod
end)

mutations_count = length(mutations)

Log.info(
"[#{sync_type}] height=#{height}, exec=#{div(exec_ts, 1_000)}ms, txs=#{div(txs_ts, 1_000)}ms, dryrun=#{div(dryrun_ts, 1_000)}ms, mutations={#{mutations_count}, #{inspect(mutations_map)}}"
)
end

:ok
end
end
2 changes: 1 addition & 1 deletion test/ae_mdw/contracts_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule AeMdw.ContractsTest do
test "lists logs sorted by call txi and log index" do
{height, _mbi} = block_index = {100_000, 11}
contract_pk = :crypto.strong_rand_bytes(32)
<<evt_hash_bigger_int::256>> = evt_hash0 = aexn_event_hash(:transfer)
<<evt_hash_bigger_int::256>> = aexn_event_hash(:transfer)
evt_hash1 = <<evt_hash_bigger_int - 1::256>>
extra_logs = [{contract_pk, [evt_hash1, <<3::256>>, <<4::256>>, <<1::256>>], <<>>}]
call_rec = call_rec("transfer", contract_pk, height, contract_pk, extra_logs)
Expand Down

0 comments on commit bde081d

Please sign in to comment.