Skip to content

Commit

Permalink
feat: sync up to latest micro-block (#726)
Browse files Browse the repository at this point in the history
  • Loading branch information
sborrazas committed Jun 29, 2022
1 parent af95379 commit bff7d0f
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 53 deletions.
4 changes: 2 additions & 2 deletions lib/ae_mdw/blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ defmodule AeMdw.Blocks do
end

defp fetch_gen_blocks(state, gen, _last_gen) do
@table
|> Collection.stream(:backward, nil, {gen, <<>>})
state
|> Collection.stream(@table, :backward, nil, {gen, <<>>})
|> Stream.take_while(&match?({^gen, _mb_index}, &1))
|> Enum.map(fn key -> State.fetch!(state, @table, key) end)
|> Enum.reverse()
Expand Down
3 changes: 1 addition & 2 deletions lib/ae_mdw/db/status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ defmodule AeMdw.Db.Status do
mdw_height: mdw_height,
mdw_tx_index: mdw_tx_index,
mdw_async_tasks: async_tasks_counters,
# MDW is always 1 generation behind
mdw_synced: node_height == mdw_height + 1,
mdw_synced: node_height == mdw_height,
mdw_syncing: mdw_syncing?,
mdw_gens_per_minute: round(gens_per_minute * 100) / 100
}
Expand Down
42 changes: 20 additions & 22 deletions lib/ae_mdw/db/sync/block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule AeMdw.Db.Sync.Block do
alias AeMdw.Db.Mutation
alias AeMdw.Log
alias AeMdw.Node, as: AE
alias AeMdw.Node.Db
alias AeMdw.Txs

require Model
Expand All @@ -42,28 +43,20 @@ defmodule AeMdw.Db.Sync.Block do

################################################################################

@spec blocks_mutations(Blocks.height(), Blocks.mbi(), Txs.txi(), Blocks.height()) ::
@spec blocks_mutations(
Blocks.height(),
Blocks.mbi(),
Txs.txi(),
Blocks.height() | Blocks.block_hash()
) ::
{[height_mutations()], Txs.txi()}
def blocks_mutations(from_height, from_mbi, from_txi, to_height) do
{:ok, header} = :aec_chain.get_key_header_by_height(to_height + 1)
{:ok, initial_hash} = :aec_headers.hash_header(header)

{heights_hashes, _prev_hash} =
Enum.map_reduce((to_height + 1)..from_height, initial_hash, fn height, hash ->
{:ok, kh} = :aec_chain.get_header(hash)
^height = :aec_headers.height(kh)
:key = :aec_headers.type(kh)

{{height, hash}, :aec_headers.prev_key_hash(kh)}
end)

heights_hashes = Enum.reverse(heights_hashes)

heights_hashes
|> Enum.zip(Enum.drop(heights_hashes, 1))
|> Enum.flat_map_reduce(from_txi, fn {{height, kb_hash}, {_next_height, next_kb_hash}}, txi ->
{key_block, micro_blocks} = AE.Db.get_blocks(kb_hash, next_kb_hash)
def blocks_mutations(from_height, from_mbi, from_txi, to_height_or_hash) do
from_height
|> Db.get_blocks_per_height(to_height_or_hash)
|> Enum.flat_map_reduce(from_txi, fn {key_block, micro_blocks, next_kb_hash}, txi ->
height = :aec_blocks.height(key_block)
kb_header = :aec_blocks.to_key_header(key_block)
{:ok, kb_hash} = :aec_headers.hash_header(kb_header)

if rem(height, @log_freq) == 0, do: Log.info("creating mutations for block at #{height}")

Expand All @@ -83,7 +76,12 @@ defmodule AeMdw.Db.Sync.Block do
{{{height, mbi}, micro_block, mutations}, txi}
end)

next_kb_model = Model.block(index: {height + 1, -1}, hash: next_kb_hash, tx_index: txi)
next_kb_mutation =
if next_kb_hash do
key_block = Model.block(index: {height + 1, -1}, hash: next_kb_hash, tx_index: txi)

KeyBlockMutation.new(key_block)
end

kb0_mutation =
if height == 0 do
Expand All @@ -102,7 +100,7 @@ defmodule AeMdw.Db.Sync.Block do
NamesExpirationMutation.new(height),
OraclesExpirationMutation.new(height),
StatsMutation.new(height, from_height != height or from_mbi != 0),
KeyBlockMutation.new(next_kb_model)
next_kb_mutation
]

blocks_mutations = micro_blocks_gens ++ [{{height, -1}, key_block, gen_mutations}]
Expand Down
49 changes: 49 additions & 0 deletions lib/ae_mdw/node/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,61 @@ defmodule AeMdw.Node.Db do
@type pubkey() :: <<_::256>>
@type hash_type() :: nil | :key | :micro
@type height_hash() :: {hash_type(), pos_integer(), binary()}
@opaque key_block() :: tuple()
@opaque micro_block() :: tuple()

@spec get_blocks(Blocks.block_hash(), Blocks.block_hash()) :: tuple()
def get_blocks(kb_hash, next_kb_hash) do
{:aec_db.get_block(kb_hash), get_micro_blocks(next_kb_hash)}
end

@spec get_blocks_per_height(Blocks.height(), Blocks.block_hash() | Blocks.height()) :: [
{Blocks.height(), [micro_block()], Blocks.block_hash()}
]
def get_blocks_per_height(from_height, block_hash) when is_binary(block_hash),
do: get_blocks_per_height(from_height, block_hash, nil)

def get_blocks_per_height(from_height, to_height) when is_integer(to_height) do
{:ok, header} = :aec_chain.get_key_header_by_height(to_height + 1)

last_mb_hash = :aec_headers.prev_hash(header)
{:ok, last_kb_hash} = :aec_headers.hash_header(header)

get_blocks_per_height(from_height, last_mb_hash, last_kb_hash)
end

defp get_blocks_per_height(from_height, last_mb_hash, last_kb_hash) do
{last_mb_hash, last_kb_hash}
|> Stream.unfold(fn {last_mb_hash, last_kb_hash} ->
{key_block, micro_blocks} = get_kb_mbs(last_mb_hash)

prev_hash = :aec_blocks.prev_hash(key_block)
key_header = :aec_blocks.to_header(key_block)
{:ok, key_hash} = :aec_headers.hash_header(key_header)

{{key_block, micro_blocks, last_kb_hash}, {prev_hash, key_hash}}
end)
|> Enum.take_while(fn {key_block, _micro_blocks, _last_kb_hash} ->
:aec_blocks.height(key_block) >= from_height
end)
|> Enum.reverse()
end

defp get_kb_mbs(last_mb_hash) do
last_mb_hash
|> Stream.unfold(fn block_hash ->
block = :aec_db.get_block(block_hash)

{block, :aec_blocks.prev_hash(block)}
end)
|> Enum.reduce_while([], fn block, micro_blocks ->
case :aec_blocks.type(block) do
:micro -> {:cont, [block | micro_blocks]}
:key -> {:halt, {block, micro_blocks}}
end
end)
end

@spec get_micro_blocks(Blocks.block_hash()) :: list()
def get_micro_blocks(next_kb_hash) do
next_kb_hash
Expand Down
49 changes: 26 additions & 23 deletions lib/ae_mdw/sync/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ defmodule AeMdw.Sync.Server do

require Logger

defstruct [:chain_height, :mem_height, :db_state, :restarts, :gens_per_min]
defstruct [:chain_height, :chain_hash, :mem_hash, :db_state, :restarts, :gens_per_min]

@typep height() :: Blocks.height()
@typep hash() :: Blocks.block_hash()
@typep state() ::
:initialized
| :idle
Expand All @@ -53,16 +54,17 @@ defmodule AeMdw.Sync.Server do
| {:syncing_mem, reference()}
@typep state_data() :: %__MODULE__{
db_state: State.t(),
mem_height: height(),
chain_hash: hash(),
mem_hash: height(),
restarts: non_neg_integer()
}
@typep cast_event() :: {:new_height, height()} | :restart_sync
@typep cast_event() :: {:new_height, height(), hash()} | :restart_sync
@typep internal_event() :: :check_sync
@typep call_event() :: :syncing?
@typep reason() :: term()
@typep info_event() ::
{reference(), State.t()}
| {reference(), height()}
| {reference(), height(), hash()}
| {:DOWN, reference(), :process, pid(), reason()}

@max_restarts 5
Expand All @@ -77,8 +79,8 @@ defmodule AeMdw.Sync.Server do
@spec start_link(GenServer.options()) :: :gen_statem.start_ret()
def start_link(_opts), do: GenStateMachine.start_link(__MODULE__, [], name: __MODULE__)

@spec new_height(height()) :: :ok
def new_height(height), do: GenStateMachine.cast(__MODULE__, {:new_height, height})
@spec new_height(height(), hash()) :: :ok
def new_height(height, hash), do: GenStateMachine.cast(__MODULE__, {:new_height, height, hash})

@spec syncing?() :: boolean()
def syncing?, do: GenStateMachine.call(__MODULE__, :syncing?)
Expand All @@ -94,7 +96,7 @@ defmodule AeMdw.Sync.Server do
state_data = %__MODULE__{
chain_height: nil,
db_state: db_state,
mem_height: State.height(db_state),
mem_hash: nil,
restarts: 0,
gens_per_min: 0
}
Expand All @@ -111,16 +113,18 @@ defmodule AeMdw.Sync.Server do
:gen_statem.event_handler_result(state())
@spec handle_event(:internal, internal_event(), state(), state_data()) ::
:gen_statem.event_handler_result(state())
def handle_event(:cast, {:new_height, chain_height}, :initialized, state_data) do
def handle_event(:cast, {:new_height, chain_height, chain_hash}, :initialized, state_data) do
actions = [{:next_event, :internal, :check_sync}]

{:next_state, :idle, %__MODULE__{state_data | chain_height: chain_height}, actions}
{:next_state, :idle,
%__MODULE__{state_data | chain_height: chain_height, chain_hash: chain_hash}, actions}
end

def handle_event(:cast, {:new_height, chain_height}, _state, state_data) do
def handle_event(:cast, {:new_height, chain_height, chain_hash}, _state, state_data) do
actions = [{:next_event, :internal, :check_sync}]

{:keep_state, %__MODULE__{state_data | chain_height: chain_height}, actions}
{:keep_state, %__MODULE__{state_data | chain_height: chain_height, chain_hash: chain_hash},
actions}
end

def handle_event(
Expand All @@ -129,8 +133,9 @@ defmodule AeMdw.Sync.Server do
:idle,
%__MODULE__{
chain_height: chain_height,
chain_hash: chain_hash,
db_state: db_state,
mem_height: mem_height
mem_hash: mem_hash
} = state_data
) do
max_db_height = chain_height - @mem_gens
Expand All @@ -144,10 +149,8 @@ defmodule AeMdw.Sync.Server do

{:next_state, {:syncing_db, ref}, state_data}

db_height >= max_db_height and mem_height != chain_height - 1 ->
from_height = db_height
to_height = chain_height - 1
ref = spawn_mem_sync(from_height, to_height)
db_height >= max_db_height and mem_hash != chain_hash ->
ref = spawn_mem_sync(db_height, chain_hash)

{:next_state, {:syncing_mem, ref}, state_data}

Expand All @@ -163,7 +166,7 @@ defmodule AeMdw.Sync.Server do

new_state_data = %__MODULE__{
state_data
| mem_height: State.height(new_db_state),
| mem_hash: State.height(new_db_state),
db_state: new_db_state
}

Expand All @@ -172,10 +175,10 @@ defmodule AeMdw.Sync.Server do
{:next_state, :idle, new_state_data, actions}
end

def handle_event(:info, {ref, mem_height}, {:syncing_mem, ref}, state_data) do
def handle_event(:info, {ref, mem_hash}, {:syncing_mem, ref}, state_data) do
actions = [{:next_event, :internal, :check_sync}]

new_state_data = %__MODULE__{state_data | mem_height: mem_height}
new_state_data = %__MODULE__{state_data | mem_hash: mem_hash}

{:next_state, :idle, new_state_data, actions}
end
Expand Down Expand Up @@ -260,7 +263,7 @@ defmodule AeMdw.Sync.Server do
end)
end

defp spawn_mem_sync(from_height, to_height) do
defp spawn_mem_sync(from_height, last_hash) do
spawn_task(fn ->
mem_state = State.new_mem_state()
from_txi = Block.next_txi(mem_state)
Expand All @@ -273,15 +276,15 @@ defmodule AeMdw.Sync.Server do

{mutations_time, {gens_mutations, _next_txi}} =
:timer.tc(fn ->
Block.blocks_mutations(from_height, from_mbi, from_txi, to_height)
Block.blocks_mutations(from_height, from_mbi, from_txi, last_hash)
end)

{exec_time, _new_state} = :timer.tc(fn -> exec_mem_mutations(gens_mutations, mem_state) end)

gens_per_min = (to_height + 1 - from_height) * 60_000_000 / (mutations_time + exec_time)
gens_per_min = length(gens_mutations) * 60_000_000 / (mutations_time + exec_time)
Status.set_gens_per_min(gens_per_min)

to_height
last_hash
end)
end

Expand Down
18 changes: 14 additions & 4 deletions lib/ae_mdw/sync/watcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,33 @@ defmodule AeMdw.Sync.Watcher do
:aec_events.subscribe(:chain)
:aec_events.subscribe(:top_changed)

Server.new_height(chain_height())
Server.new_height(chain_height(), chain_hash())

{:noreply, state}
end

@impl true
@spec handle_info(term(), t()) :: {:noreply, t()}
def handle_info({:gproc_ps_event, :top_changed, %{info: %{block_type: :key}}}, state) do
Server.new_height(chain_height())
Server.new_height(chain_height(), chain_hash())

{:noreply, state}
end

def handle_info({:gproc_ps_event, :top_changed, %{info: %{block_type: :micro}}}, state),
do: {:noreply, state}
def handle_info({:gproc_ps_event, :top_changed, %{info: %{block_type: :micro}}}, state) do
Server.new_height(chain_height(), chain_hash())

{:noreply, state}
end

def handle_info(_msg, state), do: {:noreply, state}

defp chain_height, do: :aec_headers.height(:aec_chain.top_header())

defp chain_hash do
{:ok, block_hash} =
:aec_chain.top_block() |> :aec_blocks.to_header() |> :aec_headers.hash_header()

block_hash
end
end

0 comments on commit bff7d0f

Please sign in to comment.