Skip to content

Commit

Permalink
chore: keep cache after switching from db to mem commit (#1458)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe committed Jul 12, 2023
1 parent 3f53f9a commit 9e41cd8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 30 deletions.
17 changes: 6 additions & 11 deletions lib/ae_mdw/sync/mutations_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,28 @@ defmodule AeMdw.Sync.MutationsCache do
"""

alias AeMdw.Db.Mutation
alias AeMdw.EtsCache

@hashes_table :sync_hashes
@expiration_minutes 120

@typep hash :: AeMdw.Blocks.block_hash()

@spec init() :: :ok
def init do
:ets.new(@hashes_table, [:named_table, :set, :public])
EtsCache.new(@hashes_table, @expiration_minutes)
end

@spec get_mbs_mutations(hash()) :: {[Mutation.t()], AeMdw.Txs.txi()} | nil
def get_mbs_mutations(mb_hash) do
case :ets.lookup(@hashes_table, mb_hash) do
[{^mb_hash, mutations_txi}] -> mutations_txi
[] -> nil
with {mutations_txi, _time} <- EtsCache.get(@hashes_table, mb_hash) do
mutations_txi
end
end

@spec put_mbs_mutations(hash(), {[Mutation.t()], AeMdw.Txs.txi()}) :: :ok
def put_mbs_mutations(mb_hash, mutations_txi) do
:ets.insert(@hashes_table, {mb_hash, mutations_txi})
:ok
end

@spec clear() :: :ok
def clear do
:ets.delete_all_objects(@hashes_table)
EtsCache.put(@hashes_table, mb_hash, mutations_txi)
:ok
end
end
58 changes: 39 additions & 19 deletions lib/ae_mdw/sync/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ defmodule AeMdw.Sync.Server do
alias AeMdw.Db.Sync.Block
alias AeMdw.Log
alias AeMdw.Sync.AsyncTasks.WealthRankAccounts
alias AeMdw.Sync.MutationsCache
alias AeMdwWeb.Websocket.Broadcaster
alias AeMdwWeb.Websocket.BroadcasterCache

Expand Down Expand Up @@ -250,8 +249,6 @@ defmodule AeMdw.Sync.Server do
{exec_time, new_state} =
:timer.tc(fn -> exec_db_mutations(gens_mutations, db_state, clear_mem?) end)

MutationsCache.clear()

gens_per_min = (to_height + 1 - from_height) * 60_000_000 / exec_time
Status.set_gens_per_min(gens_per_min)

Expand All @@ -271,7 +268,7 @@ defmodule AeMdw.Sync.Server do
end

gens_mutations = Block.blocks_mutations(from_height, from_mbi, from_txi, last_hash)
_new_state = exec_mem_mutations(gens_mutations, mem_state)
_new_state = exec_mem_mutations(mem_state, gens_mutations, from_height)

last_hash
end)
Expand All @@ -282,8 +279,7 @@ defmodule AeMdw.Sync.Server do
|> Enum.reduce(initial_state, fn {height, blocks_mutations}, state ->
{ts, new_state} = :timer.tc(fn -> exec_db_height(state, blocks_mutations, clear_mem?) end)

mutations = Enum.map(blocks_mutations, &elem(&1, 2))
:ok = profile_sync("sync_db", height, ts, mutations)
:ok = profile_sync("sync_db", height, ts, blocks_mutations)

new_state
end)
Expand Down Expand Up @@ -327,16 +323,41 @@ defmodule AeMdw.Sync.Server do
{block_index, mb_hash}
end

defp exec_mem_mutations(gens_mutations, initial_state) do
new_state =
Enum.reduce(gens_mutations, initial_state, fn {height, gen_mutations}, state ->
{ts, {commited_state, mutations}} =
:timer.tc(fn -> exec_mem_height(state, gen_mutations) end)
defp exec_mem_mutations(empty_state, gens_mutations, from_height) do
# start syncing from memory, then anticipates commit to avoid committing only after 10 gens
if from_height == AeMdw.Db.Util.synced_height(State.mem_state()) do
Enum.reduce(gens_mutations, empty_state, fn {height, gen_mutations}, state ->
{ts, commited_state} = :timer.tc(fn -> exec_mem_height(state, gen_mutations) end)

:ok = profile_sync("sync_mem", height, ts, mutations)
:ok = profile_sync("sync_mem", height, ts, gen_mutations)

commited_state
end)
else
exec_all_mem_mutations(empty_state, gens_mutations)
end
end

defp exec_all_mem_mutations(state, gens_mutations) do
blocks_mutations =
Enum.flat_map(gens_mutations, fn {_height, blocks_mutations} -> blocks_mutations end)

all_mutations =
Enum.flat_map(blocks_mutations, fn {_block_index, _block, block_mutations} ->
block_mutations
end)

{{height, _mbi}, _block, _mutations} = List.last(blocks_mutations)
Log.info("[sync_mem] exec until height=#{height}")

{ts, new_state} =
:timer.tc(fn ->
state
|> maybe_enqueue_accounts_balance(blocks_mutations)
|> State.commit_mem(all_mutations)
end)

:ok = profile_sync("sync_mem", height, ts, blocks_mutations)

broadcast_blocks(gens_mutations)

Expand All @@ -347,12 +368,9 @@ defmodule AeMdw.Sync.Server do
blocks_mutations =
Enum.map(gen_mutations, fn {_block_index, _block, mutations} -> mutations end)

new_state =
state
|> maybe_enqueue_accounts_balance(gen_mutations)
|> State.commit_mem(blocks_mutations)

{new_state, blocks_mutations}
state
|> maybe_enqueue_accounts_balance(gen_mutations)
|> State.commit_mem(blocks_mutations)
end

defp spawn_task(fun) do
Expand Down Expand Up @@ -394,7 +412,9 @@ defmodule AeMdw.Sync.Server do
end
end

defp profile_sync(sync_type, height, exec_ts, mutations) do
defp profile_sync(sync_type, height, exec_ts, blocks_mutations) do
mutations = Enum.map(blocks_mutations, &elem(&1, 2))

with [{_key, dryrun_ts}] <- :ets.lookup(:sync_profiling, {:dryrun, height}),
[{_key, txs_ts}] <- :ets.lookup(:sync_profiling, {:txs, height}) do
true = :ets.delete(:sync_profiling, {:dryrun, height - 1})
Expand Down

0 comments on commit 9e41cd8

Please sign in to comment.