Skip to content

Commit

Permalink
Sync stats improved (#4278)
Browse files Browse the repository at this point in the history
* Introduce aec_sync_stats module

* Add sync-status endpoint

* More realistic precision for 'speed'

* Don't log incomplete stats
  • Loading branch information
hanssv committed Mar 4, 2024
1 parent d16e657 commit dc125a0
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 32 deletions.
3 changes: 2 additions & 1 deletion apps/aeapi/src/aeapi.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ top_key_block() ->
%% @doc Get the current sync status, percent complete and height reached.
-spec sync_progress() -> {boolean(), float(), aec_blocks:height()}.
sync_progress() ->
aec_sync:sync_progress().
{Syncing, Progress, Top, _} = aec_sync:sync_progress(),
{Syncing, Progress, Top}.

connected_peers() ->
aec_peers:connected_peers().
Expand Down
2 changes: 1 addition & 1 deletion apps/aecli/src/aecli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ show_status(#aecli{} = J, _Item) ->
{ok, TopKeyBlock} = aec_chain:top_key_block(),
GenesisBlockHash = aec_consensus:get_genesis_hash(),
Difficulty = aec_blocks:difficulty(TopKeyBlock),
{Syncing, SyncProgress, _} = aec_sync:sync_progress(),
{Syncing, SyncProgress, _, _} = aec_sync:sync_progress(),
NodeVersion = aeu_info:get_version(),
NodeRevision = aeu_info:get_revision(),
PeerCount = aec_peers:count(peers),
Expand Down
1 change: 1 addition & 0 deletions apps/aecore/src/aec_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ init([]) ->
, worker(aec_tx_gossip_cache)
, worker(aec_peers)
, worker(aec_sync)
, worker(aec_sync_stats)
, peer_listener_spec() % incoming connections
],
{ok, {SupFlags, ChildSpecs}}.
Expand Down
4 changes: 2 additions & 2 deletions apps/aecore/src/aec_db_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ siblings_at_height(H) ->
%% once the chain is synced, there's no way to "unsync"
handle_info({_, chain_sync, #{info := {chain_sync_done, _}}}, St) ->
case aec_sync:sync_progress() of
{false, _, _} ->
{false, _, _, _} ->
{noreply, St#st{synced = true}};
_ ->
{noreply, St}
Expand Down Expand Up @@ -384,7 +384,7 @@ handle_cast({scanning_failed, _ErrHeight}, St) ->

handle_cast(#scanner_done{tree = Name, height = Height, block_hash = Hash},
#st{scanners = Scanners} = St) ->
Scanners1 =
Scanners1 =
[X || #scanner{height = He, block_hash = Ha, tree = T} = X <- Scanners,
{He, Ha, T} =/= {Height, Hash, Name}],
St1 = update_current_scan(Hash, Scanners1, St#st{scanners = Scanners1}),
Expand Down
79 changes: 53 additions & 26 deletions apps/aecore/src/aec_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ gossip_txs(GossipTxs) ->
sync_in_progress(PeerId) ->
opt_call({sync_in_progress, PeerId}, false).

-spec sync_progress() -> {boolean(), float(), aec_blocks:height()}.
-spec sync_progress() -> {boolean(), float(), aec_blocks:height(), undefined | chain_id()}.
sync_progress() ->
opt_call(sync_progress, {false, 100.0, 0}).
opt_call(sync_progress, {false, 100.0, 0, undefined}).

-spec is_syncing() -> boolean().
is_syncing() ->
Expand Down Expand Up @@ -358,7 +358,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
end,
{noreply, do_terminate_worker(Pid, State, Reason)};
handle_info(update_sync_progress_metric, State) ->
{_, SyncProgress, _} = sync_progress(State),
{_, SyncProgress, _, _} = sync_progress(State),
aec_metrics:try_update([ae,epoch,aecore,sync,progress], SyncProgress),
log_sync_status(State),

Expand All @@ -378,10 +378,12 @@ sync_task_for_chain(Chain, S = #state{ sync_tasks = STs }) ->
case match_tasks(Chain, STs, []) of
no_match ->
ST = init_sync_task(Chain),
new_sync_stats(ST),
{{new, Chain, ST#sync_task.id}, set_sync_task(ST, S)};
{match, ST = #sync_task{ id = STId, chain = C2 }} ->
NewChain = merge_chains(Chain#chain{ id = STId }, C2),
ST1 = ST#sync_task{ chain = NewChain },
new_sync_stats(ST1),
{{existing, NewChain, STId}, set_sync_task(ST1, S)};
Res = {inconclusive, _, _} ->
{Res, S}
Expand Down Expand Up @@ -414,8 +416,9 @@ handle_last_result(ST, {get_generation, Height, Hash, PeerId, {ok, Block}}) ->
ST#sync_task{ pool = Pool1 };
handle_last_result(ST, {post_blocks, ok}) ->
ST#sync_task{ adding = [] };
handle_last_result(ST, {post_blocks, {ok, NewTop}}) ->
handle_last_result(ST, {post_blocks, {ok, NewTop, Stats}}) ->
inform_workers(ST, {new_top, NewTop}),
post_stats(ST, Stats),
ST#sync_task{ adding = [] };
handle_last_result(ST, {post_blocks, {rejected, BlockFromPeerId, Height}}) ->
mark_workers_as_suspect(ST#sync_task.workers),
Expand Down Expand Up @@ -529,6 +532,7 @@ maybe_end_sync_task(State, ST) ->
false ->
epoch_sync:info("Removing/ending sync task ~p target was ~p",
[ST#sync_task.id, pp_chain_block(Target)]),
aec_sync_stats:sync_done(ST#sync_task.id),
State1 = delete_sync_task(ST, State),
maybe_update_top_target(State1);
true ->
Expand Down Expand Up @@ -950,23 +954,27 @@ get_header_by_height(PeerId, Height, RemoteTop) ->
end
end.

new_sync_stats(#sync_task{ id = STId, chain = Chain }) ->
#chain{ blocks = [#chain_block{ hash = Hash, height = Height } | _] } = Chain,
aec_sync_stats:new_sync(STId, {Height, Hash}).

post_stats(#sync_task{ id = STId }, Stats) ->
aec_sync_stats:post_stats(STId, Stats).

post_blocks([]) -> ok;
post_blocks([#pool_item{ height = StartHeight } | _] = Blocks) ->
post_blocks(StartHeight, StartHeight, Blocks, empty_stats()).

post_blocks(To, To, [], Stats) ->
epoch_sync:info("Synced block ~p (~s)", [To, pp_stats(Stats)]),
{ok, To};
post_blocks(From, To, [], Stats) ->
epoch_sync:info("Synced blocks ~p - ~p (~s)", [From, To, pp_stats(Stats)]),
{ok, To};
log_sync_stats(From, To, Stats),
{ok, To, lists:reverse(maps:get(gens, Stats, []))};
post_blocks(From, _To, [#pool_item{ height = Height, got = {_PeerId, local} } | Blocks],
Stats = #{gs := Gs}) ->
post_blocks(From, Height, Blocks, Stats#{gs := Gs + 1});
post_blocks(From, To, [#pool_item{ height = Height, got = {PeerId, Block} } | Blocks], Stats) ->
case add_generation(Block, Stats) of
{ok, Stats1} ->
post_blocks(From, Height, Blocks, Stats1);
case add_generation(Block, empty_gen_stats(Height)) of
{ok, GenStats} ->
post_blocks(From, Height, Blocks, add_gen_stats(Stats, GenStats));
{error, Reason} ->
epoch_sync:info("Failed to add synced block ~p: ~p", [Height, Reason]),
[ epoch_sync:info("Synced blocks ~p - ~p (~s)", [From, To - 1, pp_stats(Stats)]) || To > From ],
Expand All @@ -981,10 +989,25 @@ post_blocks(From, To, [#pool_item{ height = Height, got = {PeerId, Block} } | Bl
end.

empty_stats() ->
#{t0 => os:timestamp(), gs => 0, mbs => 0, txs => 0}.
#{t0 => erlang:system_time(microsecond), gs => 0, mbs => 0, txs => 0, gens => []}.

empty_gen_stats(Height) ->
ES = empty_stats(),
ES#{height => Height}.

add_gen_stats(S = #{gs := Gs, mbs := MBs, txs := Txs, gens := Gens},
GS = #{mbs := GMBs, txs := GTxs}) ->
T1 = erlang:system_time(microsecond),
S#{gs := Gs + 1, mbs := MBs + GMBs, txs := Txs + GTxs,
gens := [GS#{t1 => T1} | Gens]}.

log_sync_stats(To, To, Stats) ->
epoch_sync:info("Synced block ~p (~s)", [To, pp_stats(Stats)]);
log_sync_stats(From, To, Stats) ->
epoch_sync:info("Synced blocks ~p - ~p (~s)", [From, To, pp_stats(Stats)]).

pp_stats(#{t0 := T0, gs := Gs, mbs := MBs, txs := Txs}) ->
T = timer:now_diff(os:timestamp(), T0),
T = erlang:system_time(microsecond) - T0,
Avg = if Gs == 0 -> 0.0; true -> T / (1000 * Gs) end,
io_lib:format("~p generations (~.2f ms/gen), ~p mblock(s), ~p tx(s)",
[Gs, Avg, MBs, Txs]).
Expand All @@ -996,11 +1019,11 @@ pp_stats(#{t0 := T0, gs := Gs, mbs := MBs, txs := Txs}) ->
%% each synchronous call.
%% Map contains key dir, saying in which direction we sync
add_generation(#{dir := forward, key_block := _KB, micro_blocks := MBs},
Stats = #{mbs := Bs, gs := Gs}) ->
add_blocks(MBs, Stats#{mbs := Bs + length(MBs), gs := Gs + 1});
Stats = #{mbs := NMBs}) ->
add_blocks(MBs, Stats#{mbs := NMBs + length(MBs)});
add_generation(#{dir := backward, key_block := KB, micro_blocks := MBs},
Stats = #{mbs := Bs, gs := Gs}) ->
add_blocks(MBs ++ [KB], Stats#{mbs := Bs + length(MBs), gs := Gs + 1}).
Stats = #{mbs := NMBs}) ->
add_blocks(MBs ++ [KB], Stats#{mbs := NMBs + length(MBs)}).

add_blocks([], Stats) ->
{ok, Stats};
Expand Down Expand Up @@ -1203,20 +1226,24 @@ max_gossip() ->
is_syncing(#state{sync_tasks = SyncTasks}) ->
[1 || #sync_task{suspect = false} <- SyncTasks] =/= [].

-spec sync_progress(#state{}) -> {boolean(), float(), aec_blocks:height()}.
-spec sync_progress(#state{}) -> {boolean(), float(), aec_blocks:height(), undefined | chain_id()}.
sync_progress(#state{sync_tasks = SyncTasks, top_target = TopTarget} = State) ->
case is_syncing(State) of
false -> {false, 100.0, TopTarget};
false -> {false, 100.0, TopTarget, undefined};
true ->
TargetHeight =
{TargetHeight, SyncTaskId} =
lists:foldl(
fun(#sync_task{suspect = true}, Acc) ->
Acc;
(SyncTask, MaxHeight) ->
(SyncTask, {MaxHeight, STId}) ->
#chain{blocks = Chain} = SyncTask#sync_task.chain,
[#chain_block{height = Height} | _] = Chain,
max(Height, MaxHeight)
end, 0, SyncTasks),
if Height > MaxHeight ->
{Height, SyncTask#sync_task.id};
true ->
{MaxHeight, STId}
end
end, {0, undefined}, SyncTasks),
TopHeight = aec_headers:height(aec_chain:dirty_top_header()),
SyncProgress0 = round(10000000 * TopHeight / TargetHeight) / 100000,
%% It is possible to have TopHeight already higher than Height in sync task,
Expand All @@ -1228,7 +1255,7 @@ sync_progress(#state{sync_tasks = SyncTasks, top_target = TopTarget} = State) ->
true -> 99.9;
false -> SyncProgress0
end,
{true, SyncProgress, TargetHeight}
{true, SyncProgress, TargetHeight, SyncTaskId}
end.

peer_get_header_by_hash(PeerId, RemoteHash) ->
Expand Down Expand Up @@ -1316,7 +1343,7 @@ validate_block(Block) ->

log_sync_status(#state{is_syncing = false}) -> ok;
log_sync_status(#state{sync_tasks = STs} = S) ->
{_, SyncProgress, _} = sync_progress(S),
{_, SyncProgress, _, _} = sync_progress(S),
epoch_sync:info("Sync progress: ~.4f%", [SyncProgress]),
[log_sync_task(ST) || ST <- STs].

Expand Down

0 comments on commit dc125a0

Please sign in to comment.