Skip to content


Small sync improvements (#4311)
Browse files Browse the repository at this point in the history
* Small improvements to sync efficiency

- always start fetching the _next_ generation
- pre-fetch the next pool of generations

* Add a (small) random delay in aec_peers to help tests

* Improvements to aec_peers

Less info-logging, and also update the known_sockets in the case where
there is an incoming connection.
  • Loading branch information
hanssv committed Mar 25, 2024
1 parent 5a4ba4b commit 14a82a1
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 33 deletions.
29 changes: 18 additions & 11 deletions apps/aecore/src/aec_peers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

%=== MACROS ====================================================================

-define(RAND_CONN_DELAY, 50).
-define(MAX_CONNECTION_INTERVAL, 30 * 1000). % 30 seconds

Expand Down Expand Up @@ -842,7 +843,7 @@ tcp_probe(PeerPoolName, State0) ->
schedule_tcp_probe(?MAX_TCP_PROBE_INTERVAL, PeerPoolName, State2);
{wait, Delay, State2} ->
epoch_sync:debug("No peers available for TCP probe in ~p, "
"waiting ~b ms", [PeerPoolName, Delay]),
"waiting ~b ms", [PeerPoolName, Delay]),
schedule_tcp_probe(Delay, PeerPoolName, State2);
{selected, Peer, State2} ->
schedule_tcp_probe(undefined, PeerPoolName,
Expand Down Expand Up @@ -911,12 +912,16 @@ next_connect_delay(State) ->
#state{ last_connect_time = LastTime, outbound = Outbound } = State,
ExpDelay = floor(math:pow(2, Outbound - 1)) * 1000,
BoundDelay = min(ExpDelay, ?MAX_CONNECTION_INTERVAL),
max(?MIN_CONNECTION_INTERVAL, BoundDelay - (timestamp() - LastTime));
max(min_connection_interval(), BoundDelay - (timestamp() - LastTime));
%% If this is a monitoring node then aggressively connect to peers
true ->

min_connection_interval() ->

%% Gives the node some time to receive peers and establish outbound connections
%% before it starts TCP probes of verified and unverified peers.
-spec initial_tcp_probe_delay() -> non_neg_integer().
Expand Down Expand Up @@ -1370,9 +1375,12 @@ on_add_peer(Peer, State0) ->
{_, {ok, Peer2}} ->
{{ok, _}, {ok, Peer2}} ->
{error, {ok, Peer2}} ->
epoch_sync:debug("Peer ~p - known socket not set - setting = ~p", [aec_peer:ppp(PeerId), PeerSocket]),
add_known_socket(Peer2, State);
{{ok, OtherPeerPubkey}, error} ->
epoch_sync:debug("Peer ~p with address ~p - ignoring peer pubkey changed "
"from ~s to ~s by ~s", [aec_peer:ppp(PeerId),
Expand Down Expand Up @@ -1526,20 +1534,19 @@ pool_log_changes(Id, Old, New) ->
%% Tries to add given peer to the pool.
-spec pool_update(aec_peer:peer(), state())
-> {ignored | verified | unverified, state()}.
pool_update(Peer, State) ->
#state{ pool = Pool
, known_sockets = KnownSockets} = State,
pool_update(Peer, State = #state{ pool = Pool }) ->
PeerId = aec_peer:id(Peer),
Now = timestamp(),
{OldPoolName, _} = aec_peers_pool:peer_state(Pool, PeerId),
{NewPoolName, Pool2} = aec_peers_pool:update(Pool, Now, Peer),
pool_log_changes(PeerId, OldPoolName, NewPoolName),

{NewPoolName, add_known_socket(Peer, State#state{ pool = Pool2 })}.

add_known_socket(Peer, State = #state{ known_sockets = KnownSockets }) ->
PeerId = aec_peer:id(Peer),
Socket = aec_peer:socket(Peer),
{NewPoolName, State#state{ pool = Pool2
, known_sockets = gb_trees:enter(Socket,
State#state{ known_sockets = gb_trees:enter(Socket, PeerId, KnownSockets) }.

-spec pool_random_select(state())
-> {selected, aec_peer:peer(), state()}
Expand Down
82 changes: 60 additions & 22 deletions apps/aecore/src/aec_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ else_do(Else) ->
%% When an additional Ping arrives for which we agree upon genesis, we have
%% the following possibilities:
%% 1. It has worst top hash than our node, do not include in sync
%% 2. It has better top hash than our node
%% 1. It has worse/lower top hash than our node, do not include in sync
%% 2. It has better/higher top hash than our node
%% We binary search for a block that we agree upon (could be genesis)
%% We add new node to sync pool to sync agreed block up to top of new
%% 1. If we are already synchronizing, we ignore it,
Expand All @@ -169,6 +169,7 @@ else_do(Else) ->
, dir := backward
, attempts = 0 :: non_neg_integer()
-record(worker, { peer_id :: aec_peer:id()
, pid :: pid()
Expand All @@ -177,6 +178,7 @@ else_do(Else) ->
suspect = false :: boolean(),
chain :: #chain{},
pool = [] :: [#pool_item{}],
next_pool = none :: none | fetching | [#pool_item{}],
agreed :: undefined | #chain_block{},
adding = [] :: [#pool_item{}],
pending = none :: none | {[#pool_item{}], [[#pool_item{}]]},
Expand Down Expand Up @@ -404,11 +406,23 @@ handle_last_result(ST = #sync_task{ agreed = undefined }, {agreed_height, Agreed
ST#sync_task{ agreed = Agreed };
handle_last_result(ST, {agreed_height, _Agreed}) ->
handle_last_result(ST = #sync_task{ pool = [] }, {hash_pool, HashPool}) ->
handle_last_result(ST = #sync_task{ pool = [] }, {hash_pool, _FirstHeight, HashPool}) ->
%% We're already waiting for the fill_pool - handle it right away
#pool_item{ height = Height, hash = Hash, got = false } = lists:last(HashPool),
ST#sync_task{ pool = HashPool, agreed = #chain_block{ height = Height, hash = Hash } };
handle_last_result(ST, {hash_pool, _HashPool}) ->
epoch_sync:debug("We got next pool - but had to wait for it", []),
ST#sync_task{ pool = HashPool,
next_pool = none,
agreed = #chain_block{ height = Height, hash = Hash }
handle_last_result(ST, {hash_pool, FirstHeight, HashPool}) ->
case FirstHeight > ST#sync_task.agreed#chain_block.height of
true ->
%% We're working on the previous pool, save new filled pool
ST#sync_task{ next_pool = HashPool };
false ->
%% We're already working on this pool, work wasted.
handle_last_result(ST, {get_generation, Height, Hash, PeerId, {ok, Block}}) ->
Pool = ST#sync_task.pool,
NewItem = #pool_item{ height = Height, hash = Hash, got = {PeerId, Block} },
Expand Down Expand Up @@ -502,29 +516,53 @@ get_next_work_item(ST = #sync_task{ pool = [#pool_item{ got = {_, _} } | _] = Po
get_next_work_item(ST = #sync_task{ adding = [], pending = Pending }) when Pending /= none ->
{ToAdd, NewPending} = get_pending(Pending),
{{post_blocks, ToAdd}, ST#sync_task{ adding = ToAdd, pending = NewPending }};
get_next_work_item(ST = #sync_task{ pool = [#pool_item{ got = false } | _] }) ->
get_next_work_item(ST = #sync_task{ pool = [#pool_item{ got = false } | _]}) ->
Pool = ST#sync_task.pool,
PickFrom = [ P || P = #pool_item{ got = false } <- Pool ],
Random = rand:uniform(length(PickFrom)),
#pool_item{ height = PickH
, hash = PickHash
, got = false} = lists:nth(Random, PickFrom),
epoch_sync:debug("Get block at height ~p", [PickH]),
{{get_generation, PickH, PickHash}, ST};
get_next_work_item(ST = #sync_task{ pool = [], agreed = #chain_block{}, pending = Pending }) ->

MissingGens = lists:sort(fun pool_item_sort/2, [ P || P = #pool_item{ got = false } <- Pool ]),
PI = #pool_item{ height = PickH, hash = PickHash, attempts = As } = hd(MissingGens),

case ST#sync_task.next_pool of
none when length(ST#sync_task.workers) > 2 ->
%% Avoid the main choke-point - waiting for next pool fill - by starting early
_ ->
epoch_sync:debug("Get block at height ~p", [PickH]),
Pool1 = lists:keyreplace(PickH, #pool_item.height, Pool, PI#pool_item{attempts = As + 1}),
{{get_generation, PickH, PickHash}, ST#sync_task{ pool = Pool1 }}
get_next_work_item(ST = #sync_task{ pool = [], agreed = #chain_block{},
next_pool = Next, pending = Pending }) ->
%% Pool is empty
case Pending of
{_, Ps} when length(Ps) > 15 ->
{_, Ps} when length(Ps) > ?MAX_FORWARD_CHUNKS ->
%% We are (too) far ahead - take a break
{take_a_break, ST};
_ when is_list(Next) ->
%% We already have the next pool filled, start working
#pool_item{ height = Height, hash = Hash, got = false } = lists:last(Next),
ST1 = ST#sync_task{ pool = Next,
next_pool = none,
agreed = #chain_block{ height = Height, hash = Hash }
_ ->
#chain_block{ hash = LastHash, height = H } = ST#sync_task.agreed,
Chain = ST#sync_task.chain,
TargetHash = next_known_hash(Chain#chain.blocks, H + ?MAX_HEADERS_PER_CHUNK),
{{fill_pool, LastHash, TargetHash}, ST}
%% We need to fill the pool first - go fetch the info
get_next_work_item(ST) ->
epoch_sync:info("Nothing to do: ~1000p", [pp_sync_task(ST)]),
{take_a_break, ST}.

do_fill_pool(ST) ->
#chain_block{ hash = LastHash, height = H } = ST#sync_task.agreed,
Chain = ST#sync_task.chain,
TargetHash = next_known_hash(Chain#chain.blocks, H + ?MAX_HEADERS_PER_CHUNK),
{{fill_pool, LastHash, TargetHash}, ST#sync_task{ next_pool = fetching }}.

pool_item_sort(#pool_item{height = H1, attempts = A1}, #pool_item{height = H2, attempts = A2}) ->
{A1, H1} =< {A2, H2}.

maybe_end_sync_task(State, ST) ->
case ST#sync_task.chain of
#chain{ peers = [], blocks = [Target | _] } ->
Expand Down Expand Up @@ -1062,7 +1100,7 @@ fill_pool(PeerId, StartHash, TargetHash, ST) ->
, hash = Hash
, got = false
} || {Height, Hash} <- Hashes ],
do_work_on_sync_task(PeerId, ST, {hash_pool, HashPool});
do_work_on_sync_task(PeerId, ST, {hash_pool, FirstHeight, HashPool});
{error, {LastGoodHeight, FirstBadHeight}} ->
"Abort sync with ~p (bad successor height ~p after ~p)",
Expand Down Expand Up @@ -1142,7 +1180,7 @@ do_fetch_generation_ext(Hash, PeerId) ->
{error, hash_mismatch}
{error, _} = Error ->
epoch_sync:debug("failed to fetch block from ~p; Hash = ~p; Error = ~p",
epoch_sync:info("failed to fetch generation from ~p; Hash = ~p; Error = ~p",
[ppp(PeerId), pp(Hash), Error]),
Expand Down

0 comments on commit 14a82a1

Please sign in to comment.