Skip to content

Commit

Permalink
Gh4157 control when mempool sync starts (#4160)
Browse files Browse the repository at this point in the history
* Add mempool:sync_start option

* Add release note
  • Loading branch information
uwiger committed Jul 7, 2023
1 parent f002167 commit 4843612
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 46 deletions.
20 changes: 17 additions & 3 deletions apps/aecore/src/aec_chain.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
micro_blocks := [aec_blocks:micro_block()],
dir := backward | forward }.

%% Preferred activity type is dirty for read functions.
%% If called from within an activity, the activity type is inherited.
activity(F) when is_function(F, 0) ->
aec_db:ensure_activity(async_dirty, F).

%%%===================================================================
%%% Accounts
%%%===================================================================
Expand Down Expand Up @@ -497,6 +502,9 @@ top_header_hash_and_state() ->

-spec top_key_block() -> 'error' | {ok, aec_blocks:block()}.
top_key_block() ->
activity(fun top_key_block_/0).

top_key_block_() ->
case aec_db:get_top_block_hash() of
Hash when is_binary(Hash) ->
{ok, Block} = get_block(Hash),
Expand All @@ -510,19 +518,25 @@ top_key_block() ->

-spec top_key_block_hash() -> 'undefined' | binary().
top_key_block_hash() ->
activity(fun top_key_block_hash_/0).

top_key_block_hash_() ->
case aec_db:get_top_block_hash() of
Hash when is_binary(Hash) ->
{ok, Block} = get_block(Hash),
case aec_blocks:type(Block) of
Header = aec_db:get_header(Hash),
case aec_headers:type(Header) of
key -> Hash;
micro -> aec_blocks:prev_key_hash(Block)
micro -> aec_headers:prev_key_hash(Header)
end;
undefined ->
undefined
end.

-spec top_block_with_state() -> 'undefined' | {aec_blocks:block(), aec_trees:trees()}.
top_block_with_state() ->
activity(fun top_block_with_state_/0).

top_block_with_state_() ->
case top_block_hash() of
undefined -> undefined;
Hash -> {aec_db:get_block(Hash), aec_db:get_block_state(Hash)}
Expand Down
165 changes: 122 additions & 43 deletions apps/aecore/src/aec_peer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ handle_cast({send_block, SerBlock}, State) ->
handle_cast({set_sync_height, none}, State) ->
{noreply, maps:remove(sync_height, State)};
handle_cast({set_sync_height, Height}, State) when is_integer(Height) ->
{noreply, State#{ sync_height => Height }};
State1 = State#{ sync_height => Height },
maybe_tx_pool_sync_(State1),
{noreply, State1};
handle_cast({expand_micro_block, MicroBlockFragment}, State) ->
{noreply, expand_micro_block(State, MicroBlockFragment)};
handle_cast(stop, State) ->
Expand Down Expand Up @@ -724,8 +726,9 @@ handle_first_ping(S, RemotePingObj) ->
handle_ping_msg(S, RemotePingObj) ->
#{ genesis_hash := LGHash,
best_hash := LTHash,
top_header := TopHeader,
sync_allowed := LSyncAllowed,
difficulty := LDiff } = local_ping_obj(S),
difficulty := LDiff } = local_ping_obj(S, _Extended = true),
#{ address := SourceAddr } = S,
PeerId = peer_id(S),
DecodedPingObj = decode_remote_ping(RemotePingObj),
Expand All @@ -736,33 +739,83 @@ handle_ping_msg(S, RemotePingObj) ->
aec_peer_analytics:log_temporary_peer_status(S, RGHash0, RTHash0, RDiff0);
{error, _} -> ok
end,
case DecodedPingObj of
{ok, true, RGHash, RTHash, RDiff, RPeers}
when RGHash == LGHash, LSyncAllowed =:= true ->
case {{LTHash, LDiff}, {RTHash, RDiff}} of
{{T, _}, {T, _}} ->
epoch_sync:debug("Same top blocks", []),
aec_sync:get_generation(PeerId, T),
aec_events:publish(chain_sync, {chain_sync_done, PeerId}),
ok;
{{_, DL}, {_, DR}} when DL > DR ->
epoch_sync:debug("Our difficulty is higher", []),
aec_events:publish(chain_sync, {chain_sync_done, PeerId}),
ok;
{{_, _}, {_, DR}} ->
aec_sync:start_sync(PeerId, RTHash, DR)
end,
ok = aec_peers:add_peers(SourceAddr, RPeers),
aec_tx_pool_sync:connect(PeerId, self()),
ok;
{ok, _SyncAllowed, RGHash, _RTHash, _RDiff, RPeers}
when RGHash == LGHash ->
epoch_sync:debug("Temporary connection, not synchronizing", []),
ok = aec_peers:add_peers(SourceAddr, RPeers);
{ok, _SyncAllowed, _RGHash, _RTHash, _RDiff, _RPeers} ->
{error, wrong_genesis_hash};
{error, Reason} ->
{error, Reason}
{Res, MpSync} =
case DecodedPingObj of
{ok, true, RGHash, RTHash, RDiff, RPeers}
when RGHash == LGHash, LSyncAllowed =:= true ->
{R1, MPSync1} =
case {{LTHash, LDiff}, {RTHash, RDiff}} of
{{T, _}, {T, _}} ->
epoch_sync:debug("Same top blocks", []),
aec_sync:get_generation(PeerId, T),
aec_events:publish(chain_sync, {chain_sync_done, PeerId}),
{ok, true};
{{_, DL}, {_, DR}} when DL > DR ->
epoch_sync:debug("Our difficulty is higher", []),
aec_events:publish(chain_sync, {chain_sync_done, PeerId}),
{ok, true};
{{_, _}, {_, DR}} ->
epoch_sync:debug("Starting sync to ~p", [PeerId]),
aec_sync:start_sync(PeerId, RTHash, DR),
{ok, perhaps}
end,
ok = aec_peers:add_peers(SourceAddr, RPeers),
{R1, MPSync1};
{ok, _SyncAllowed, RGHash, _RTHash, _RDiff, RPeers}
when RGHash == LGHash ->
epoch_sync:debug("Temporary connection, not synchronizing", []),
ok = aec_peers:add_peers(SourceAddr, RPeers),
{ok, false};
{ok, _SyncAllowed, _RGHash, _RTHash, _RDiff, _RPeers} ->
epoch_sync:debug("Wrong genesis", []),
{{error, wrong_genesis_hash}, false};
{error, Reason} ->
epoch_sync:debug("Error: ~p", [Reason]),
{{error, Reason}, false}
end,
maybe_tx_pool_sync(MpSync, TopHeader, S),
Res.

maybe_tx_pool_sync(Guess, LocalHeader, S) ->
case Guess of
true -> do_tx_pool_sync(S);
perhaps -> maybe_tx_pool_sync_(LocalHeader, S);
false -> ignore
end.

do_tx_pool_sync(S) ->
PeerId = peer_id(S),
lager:debug("Trying to start mempool sync to ~p", [aec_peer:ppp(PeerId)]),
aec_tx_pool_sync:connect(PeerId, self()).

maybe_tx_pool_sync_(S) ->
LocalHeader = aec_chain:dirty_top_header(),
maybe_tx_pool_sync_(LocalHeader, S).

maybe_tx_pool_sync_(LocalHeader, S) ->
TopTarget = aec_sync:get_top_target(),
case TopTarget of
0 ->
lager:debug("no top target - not starting mempool sync.", []),
ignore;
_ ->
LocalHeight = aec_headers:height(LocalHeader),
{ok, WhenTxPoolSync} = aeu_env:find_config(
[<<"mempool">>, <<"sync_start">>],
[user_config, schema_default]),
AtHeight = if WhenTxPoolSync < 0 ->
max(0, TopTarget + WhenTxPoolSync);
true ->
WhenTxPoolSync
end,
lager:debug("AtHeight = ~p; LocalHeight = ~p; TopTarget = ~p",
[AtHeight, LocalHeight, TopTarget]),
case AtHeight =< LocalHeight of
true ->
do_tx_pool_sync(S);
false ->
ignore
end
end.

decode_remote_ping(#{ genesis_hash := GHash,
Expand Down Expand Up @@ -803,20 +856,46 @@ ping_obj_rsp(S, RemotePingObj) ->
ping_obj(LocalPingObj#{ share => Share },
[PeerId | [aec_peer:id(P) || P <- TheirPeers]]).

local_ping_obj(#{ kind := ConnKind, ext_sync_port := Port }) ->
{GHash, TopHash, {ok, Difficulty}} =
?dirty(fun() ->
{aec_chain:genesis_hash(),
aec_chain:top_key_block_hash(),
aec_chain:difficulty_at_top_block()}
end),
#{ genesis_hash => GHash,
best_hash => TopHash,
difficulty => Difficulty,
sync_allowed => ConnKind =/= temporary,
share => gossiped_peers_count(),
peers => [],
port => Port }.
local_ping_obj(S) ->
local_ping_obj(S, false).

local_ping_obj(#{ kind := ConnKind, ext_sync_port := Port }, Extended) ->
Obj = aec_db:ensure_activity(
async_dirty,
fun() ->
local_ping_obj_(Extended)
end),
Obj#{ sync_allowed => ConnKind =/= temporary
, share => gossiped_peers_count()
, peers => []
, port => Port }.

local_ping_obj_(Extended) ->
GHash = aec_chain:genesis_hash(),
TopHash = aec_chain:top_block_hash(),
Header = aec_db:get_header(TopHash),
Height = aec_headers:height(Header),
TopKeyHash = key_block_hash_of_hash(TopHash, Header),
{ok, Difficulty} = aec_chain:difficulty_at_hash(TopKeyHash),
maybe_extend(Extended, Header,
#{ genesis_hash => GHash
, best_hash => TopKeyHash
, height => Height
, difficulty => Difficulty }).

maybe_extend(true, Header, Obj) ->
Obj#{top_header => Header};
maybe_extend(false, _, Obj) ->
Obj.

key_block_hash_of_hash(Hash, Header) ->
{ok, Header} = aec_chain:get_header(Hash),
case aec_headers:type(Header) of
key ->
Hash;
micro ->
aec_headers:prev_key_hash(Header)
end.

%% -- Get Header by Hash -----------------------------------------------------

Expand Down
7 changes: 7 additions & 0 deletions apps/aecore/src/aec_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

-export([ start_sync/3
, get_generation/2
, get_top_target/0
, set_last_generation_in_sync/0
, ask_all_for_node_info/0
, ask_all_for_node_info/1]).
Expand Down Expand Up @@ -60,6 +61,9 @@ start_sync(PeerId, RemoteHash, RemoteDifficulty) ->
get_generation(PeerId, Hash) ->
gen_server:cast(?MODULE, {get_generation, PeerId, Hash}).

get_top_target() ->
gen_server:call(?MODULE, get_top_target).

set_last_generation_in_sync() ->
gen_server:cast(?MODULE, set_last_generation_in_sync).

Expand Down Expand Up @@ -232,6 +236,8 @@ handle_call({worker_for_peer, PeerId}, _, State) ->
{reply, get_worker_for_peer(State, PeerId), State};
handle_call({sync_in_progress, PeerId}, _, State) ->
{reply, peer_in_sync(State, PeerId), State};
handle_call(get_top_target, _, State) ->
{reply, State#state.top_target, State};
handle_call({known_chain, Chain0 = #chain{ id = CId0 }, NewChainInfo}, _From, State0) ->
{Chain, State} =
case NewChainInfo of
Expand Down Expand Up @@ -560,6 +566,7 @@ valid_sync_tasks(#state{sync_tasks = STs}) ->
[ST || #sync_task{suspect = false} = ST <- STs].

update_top_target(TopTarget, State) ->
lager:debug("TopTarget = ~p, State = ~p", [TopTarget, lager:pr(State, ?MODULE)]),
State#state{ top_target = TopTarget }.

do_update_sync_task(State, STId, Update) ->
Expand Down
5 changes: 5 additions & 0 deletions apps/aeutils/priv/aeternity_config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@
"type" : "integer",
"default" : 1800000
},
"sync_start" : {
"description" : "When to start syncing the mempool. A negative number indicates distance from the remote top; a positive number denotes absolute local height",
"type" : "integer",
"default" : -100
},
"nonce_offset" : {
"description" : "Maximum nonce offset accepted",
"type" : "integer",
Expand Down
5 changes: 5 additions & 0 deletions docs/release-notes/next/GH4157-control-mempool-sync-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
* A config option, `mempool:sync_start` (integer) has been introduced to control when syncing of the mempool
starts during chain sync. A positive number denotes the height at which to begin syncing; a negative number
denotes how far from the network top height (best guess by the sync logic) to start. Default is `-500`, i.e.
start syncing the mempool 500 blocks from the top. A mempool sync is always triggered when chain sync is done,
and if a negative value greater than the top height is given, sync starts from `0`, i.e. from the beginning.

0 comments on commit 4843612

Please sign in to comment.