Skip to content

Commit

Permalink
fix: avoid erasing mem state when State.commit/2 (#801)
Browse files Browse the repository at this point in the history
* fix: avoid erasing mem state when State.commit/2

* fix: optionally erase mem changes when syncing

Plus, batching all in-mem changes into a single transaction so that
they are executted atomically.
  • Loading branch information
sborrazas committed Jul 25, 2022
1 parent cae1dc3 commit f059238
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
14 changes: 8 additions & 6 deletions lib/ae_mdw/db/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ defmodule AeMdw.Db.State do
@spec height(t()) :: height()
def height(state), do: DbUtil.synced_height(state)

@spec commit(t(), [Mutation.t()]) :: t()
def commit(%__MODULE__{store: prev_store} = state, mutations) do
@spec commit(t(), [Mutation.t()], boolean()) :: t()
def commit(%__MODULE__{store: prev_store} = state, mutations, clear_mem? \\ false) do
new_state =
%__MODULE__{jobs: jobs} =
TxnDbStore.transaction(fn store ->
Expand All @@ -58,15 +58,17 @@ defmodule AeMdw.Db.State do
|> Enum.reduce(state2, &Mutation.execute/2)
end)

queue_jobs(jobs)
if clear_mem? do
:persistent_term.erase(@state_pm_key)
end

:persistent_term.erase(@state_pm_key)
queue_jobs(jobs)

%__MODULE__{new_state | store: prev_store}
end

@spec commit_db(t(), [Mutation.t()]) :: t()
def commit_db(state, mutations), do: commit(state, mutations)
@spec commit_db(t(), [Mutation.t()], boolean()) :: t()
def commit_db(state, mutations, clear_mem? \\ true), do: commit(state, mutations, clear_mem?)

@spec commit_mem(t(), [Mutation.t()]) :: t()
def commit_mem(state, mutations) do
Expand Down
26 changes: 13 additions & 13 deletions lib/ae_mdw/sync/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ defmodule AeMdw.Sync.Server do
db_height < max_db_height ->
from_height = db_height + 1
to_height = min(from_height + @max_sync_gens - 1, max_db_height)
ref = spawn_db_sync(db_state, from_height, to_height)
clear_mem? = to_height != max_db_height

ref = spawn_db_sync(db_state, from_height, to_height, clear_mem?)

{:next_state, {:syncing_db, ref}, state_data}

Expand Down Expand Up @@ -238,7 +240,7 @@ defmodule AeMdw.Sync.Server do
super(event_type, event_content, state, data)
end

defp spawn_db_sync(db_state, from_height, to_height) do
defp spawn_db_sync(db_state, from_height, to_height, clear_mem?) do
from_txi = Block.next_txi(db_state)

from_mbi =
Expand All @@ -253,7 +255,8 @@ defmodule AeMdw.Sync.Server do
Block.blocks_mutations(from_height, from_mbi, from_txi, to_height)
end)

{exec_time, new_state} = :timer.tc(fn -> exec_db_mutations(gens_mutations, db_state) end)
{exec_time, new_state} =
:timer.tc(fn -> exec_db_mutations(gens_mutations, db_state, clear_mem?) end)

gens_per_min = (to_height + 1 - from_height) * 60_000_000 / (mutations_time + exec_time)
Status.set_gens_per_min(gens_per_min)
Expand Down Expand Up @@ -287,11 +290,11 @@ defmodule AeMdw.Sync.Server do
end)
end

defp exec_db_mutations(gens_mutations, state) do
defp exec_db_mutations(gens_mutations, state, clear_mem?) do
gens_mutations
|> Enum.flat_map(fn {_height, blocks_mutations} -> blocks_mutations end)
|> Enum.reduce(state, fn {{_height, mbi}, block, block_mutations}, state ->
new_state = State.commit_db(state, block_mutations)
new_state = State.commit_db(state, block_mutations, clear_mem?)

broadcast_block(block, mbi == -1)

Expand All @@ -300,15 +303,12 @@ defmodule AeMdw.Sync.Server do
end

defp exec_mem_mutations(gens_mutations, state) do
gens_mutations
|> Enum.flat_map(fn {_height, blocks_mutations} -> blocks_mutations end)
|> Enum.reduce(state, fn {{_height, mbi}, block, block_mutations}, state ->
new_state = State.commit_mem(state, block_mutations)
mutations =
gens_mutations
|> Enum.flat_map(fn {_height, blocks_mutations} -> blocks_mutations end)
|> Enum.flat_map(fn {_block_height, _block, block_mutations} -> block_mutations end)

broadcast_block(block, mbi == -1)

new_state
end)
State.commit_mem(state, mutations)
end

defp spawn_task(fun) do
Expand Down

0 comments on commit f059238

Please sign in to comment.