Skip to content

Commit

Permalink
GC triggers at configurable depth below top (#4127)
Browse files Browse the repository at this point in the history
* GC triggers at configurable depth below top

* Add release notes

* Restart GC scanners when in doubt
  • Loading branch information
uwiger committed Apr 21, 2023
1 parent 584b47d commit ccb9061
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 43 deletions.
20 changes: 18 additions & 2 deletions apps/aecore/src/aec_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@
, make_primary_state_tab/2
, secondary_state_tab/1
, write_last_gc_switch/1
, read_last_gc_switch/0 ]).
, read_last_gc_switch/0
, write_last_gc_scan/1
, read_last_gc_scan/0 ]).

%% MP trees backend
-export([ tree_table_name/1
Expand Down Expand Up @@ -963,6 +965,20 @@ read_last_gc_switch() ->
lager:debug("<-- last GC Height: ~p", [R]),
R.

write_last_gc_scan(Height) ->
lager:debug("Last complete GC scan: ~p", [Height]),
?t(write(#aec_chain_state{key = last_gc_scan, value = Height})).

read_last_gc_scan() ->
R = ?t(case read(aec_chain_state, last_gc_scan) of
[] ->
0;
[#aec_chain_state{value = Height}] ->
Height
end),
lager:debug("<-- last GC Height: ~p", [R]),
R.

-spec make_primary_state_tab(tree_name(), table_name()) -> ok.
make_primary_state_tab(Tree, P) ->
lager:debug("New primary for ~p: ~p", [Tree, P]),
Expand Down Expand Up @@ -991,7 +1007,7 @@ cache_primary_state_tabs() ->
?t(write(#aec_chain_state{key = Key, value = T})),
cache_primary_state_tab(Tree, T);
P ->
cache_primary_state_tab(T, P)
cache_primary_state_tab(Tree, P)
end,
ok
end, ok, gced_tables()).
Expand Down
111 changes: 78 additions & 33 deletions apps/aecore/src/aec_db_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@
{enabled :: boolean(), % do we garbage collect?
history :: non_neg_integer(), % how many block state back from top to keep
min_height :: undefined | non_neg_integer(), % if hash_not_found error, try GC from this height
depth :: non_neg_integer(), % how far below the top to perform scans (0: fork resistance height)
during_sync :: boolean(), % run GC from the beginning
synced :: boolean(), % we only run GC if chain is synced
last_switch :: non_neg_integer(), % height at last switch
last_switch = 0 :: non_neg_integer(), % height at last switch
last_complete_scan = 0 :: non_neg_integer(), % height of last complete scan
trees = [] :: [tree_name()], % State trees being GC:ed
scanners = [] :: [#scanner{}] % ongoing scans
}).
Expand Down Expand Up @@ -150,7 +152,7 @@ info(Keys) when is_list(Keys) ->
end.

info_keys() ->
[enabled, history, last_gc, active_sweeps, during_sync, trees].
[enabled, history, depth, last_gc, last_scan, active_sweeps, during_sync, trees].

-ifdef(TEST).
install_test_env() ->
Expand All @@ -167,27 +169,45 @@ init(#{ <<"enabled">> := Enabled
, <<"during_sync">> := DuringSync
, <<"history">> := History
, <<"minimum_height">> := MinHeight
, <<"depth">> := Depth
} = Cfg) when is_integer(History), History > 0 ->
lager:debug("Cfg = ~p", [Cfg]),
cache_enabled_status(Enabled),
LastSwitch = case Enabled of
true ->
aec_events:subscribe(chain_sync),
aec_db:read_last_gc_switch();
false ->
0
end,
{LastSwitch, LastScan} =
case Enabled of
true ->
aec_events:subscribe(chain_sync),
aec_db:ensure_activity(
async_dirty,
fun() ->
{aec_db:read_last_gc_switch(),
aec_db:read_last_gc_scan()}
end);
false ->
{0, 0}
end,
lager:debug("LastSwitch = ~p", [LastSwitch]),
Scanners = maybe_restart_scanners(LastSwitch, LastScan, Trees),
%% TODO: Make min_height configurable
Data = #st{enabled = Enabled,
during_sync = DuringSync,
trees = Trees,
history = History,
min_height = MinHeight,
depth = Depth,
last_switch = LastSwitch,
last_complete_scan = LastScan,
scanners = Scanners,
synced = false},
{ok, Data}.

maybe_restart_scanners(LastSwitch, LastScan, Trees) ->
if LastSwitch > 0, LastScan < LastSwitch ->
[start_scanner(T, LastSwitch) || T <- Trees];
true ->
[]
end.

%% once the chain is synced, there's no way to "unsync"
handle_info({_, chain_sync, #{info := {chain_sync_done, _}}}, St) ->
case aec_sync:sync_progress() of
Expand All @@ -204,34 +224,45 @@ handle_info({'DOWN', MRef, process, Pid, Reason}, #st{scanners = Scanners} = St)
#scanner{tree = Tree, height = Height} = S ->
lager:error("GC Scanner process for ~p died: ~p", [Tree, Reason]),
signal_scanning_failed(Height),
{noreply, St#st{scanners = Scanners -- [S]}}
Scanners1 = Scanners -- [S],
NewSt = if Reason =/= normal ->
NewS = start_scanner(Tree, Height),
St#st{scanners = [NewS | Scanners1]};
true ->
St#st{scanners = Scanners1}
end,
{noreply, NewSt}
end;

handle_info(_, St) ->
{noreply, St}.

handle_call({maybe_garbage_collect, TopHeight, Header}, _From,
#st{enabled = true, synced = Synced, during_sync = DuringSync,
history = History, min_height = MinHeight,
trees = Trees, scanners = [], last_switch = Last} = St)
when (Synced orelse DuringSync), TopHeight >= MinHeight, TopHeight > Last + History ->
lager:debug("WILL collect. St = ~p", [lager:pr(St, ?MODULE)]),
%% Double-check that the GC hasn't been requested on a microblock.
%% This would be a bug, since aec_conductor should only ask for keyblocks.
case aec_headers:type(Header) of
key ->
?PROTECT(perform_switch(Trees, TopHeight),
fun(Scanners1) ->
{reply, ok, St#st{ scanners = Scanners1
, last_switch = TopHeight}}
end,
signal_switching_failed_and_reply(St, nop));
micro ->
lager:warning("GC called on microblock - ignoring", []),
history = History, min_height = MinHeight, depth = Depth,
trees = Trees, scanners = [], last_switch = Last} = St) ->
SafeHeight = safe_height(TopHeight, Depth),
if (Synced orelse DuringSync), SafeHeight >= MinHeight, SafeHeight >= Last + History ->
lager:debug("WILL collect at height ~p (Top = ~p). St = ~p",
[SafeHeight, TopHeight, lager:pr(St, ?MODULE)]),
%% Double-check that the GC hasn't been requested on a microblock.
%% This would be a bug, since aec_conductor should only ask for keyblocks.
case aec_headers:type(Header) of
key ->
?PROTECT(perform_switch(Trees, SafeHeight),
fun(Scanners1) ->
{reply, ok, St#st{ scanners = Scanners1
, last_switch = SafeHeight}}
end,
signal_switching_failed_and_reply(St, nop));
micro ->
lager:warning("GC called on microblock - ignoring", []),
{reply, nop, St}
end;
true ->
{reply, nop, St}
end;
handle_call({maybe_garbage_collect, _, _}, _From, St) ->
lager:debug("Won't collect. St = ~p", [lager:pr(St, ?MODULE)]),
{reply, nop, St};
handle_call({info, Keys}, _, St) ->
{reply, info_(Keys, St), St}.
Expand All @@ -240,15 +271,16 @@ handle_call({info, Keys}, _, St) ->
handle_cast({scanning_failed, ErrHeight}, St) ->
{noreply, St#st{min_height = ErrHeight}};

handle_cast({scan_complete, Name}, #st{scanners = Scanners} = St) ->
handle_cast({scan_complete, Name, Height}, #st{scanners = Scanners} = St) ->
Scanners1 = lists:keydelete(Name, #scanner.tree, Scanners),
case Scanners1 of
[] ->
aec_events:publish(gc, scans_complete);
aec_db:write_last_gc_scan(Height),
aec_events:publish(gc, scans_complete),
{noreply, St#st{last_complete_scan = Height}};
_ ->
ok
end,
{noreply, St#st{scanners = Scanners1}};
{noreply, St#st{scanners = Scanners1}}
end;

handle_cast(_, St) ->
{noreply, St}.
Expand All @@ -262,6 +294,14 @@ code_change(_FromVsn, St, _Extra) ->
%%% Internal functions
%%%===================================================================

safe_height(TopHeight, 0) ->
case aec_resilience:fork_resistance_active() of
no -> TopHeight;
{yes, FRHeight} -> TopHeight - FRHeight - 1
end;
safe_height(TopHeight, Depth) when Depth > 0 ->
TopHeight - Depth.

info_(Keys, St) ->
maps:from_list(
[{K, info_item(K, St)} || K <- Keys]).
Expand All @@ -270,13 +310,17 @@ info_item(history, #st{history = H}) ->
H;
info_item(last_gc, #st{last_switch = L}) ->
L;
info_item(last_scan, #st{last_complete_scan = L}) ->
L;
info_item(active_sweeps, #st{scanners = Scanners}) ->
[#{tree => T, pid => P, height => H} ||
#scanner{tree = T, height = H, pid = P} <- Scanners];
info_item(during_sync, #st{during_sync = Flag}) ->
Flag;
info_item(trees, #st{trees = Trees}) ->
Trees;
info_item(depth, #st{depth = D}) ->
D;
info_item(enabled, #st{enabled = Bool}) ->
Bool.

Expand Down Expand Up @@ -320,7 +364,7 @@ scan_tree(Name, Height, Parent) ->
T1 = erlang:system_time(millisecond),
lager:info("GC fullscan done for ~p, Height = ~p, Count = ~p, Time = ~p ms",
[Name, Height, Count, T1 - T0]),
gen_server:cast(Parent, {scan_complete, Name}),
gen_server:cast(Parent, {scan_complete, Name, Height}),
ok.

collect_reachable_hashes_fullscan(Tree, Height) ->
Expand Down Expand Up @@ -435,6 +479,7 @@ config() ->
Key <- [<<"enabled">>,
<<"history">>,
<<"minimum_height">>,
<<"depth">>,
<<"during_sync">>]]),
M#{<<"trees">> => Trees}.

Expand Down
19 changes: 12 additions & 7 deletions apps/aecore/test/aec_db_gc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-define(NUM_GCED_NODES, 20).
-define(DUMMY_HASH, <<0:256>>).
-define(GC_HISTORY, 50).
-define(FORK_RESISTANCE, 5).

all() ->
[{group, all_nodes}].
Expand All @@ -40,7 +41,8 @@ init_per_suite(Config0) ->
<<"hard_forks">> => Forks,
<<"garbage_collection">> => #{<<"enabled">> => true,
<<"history">> => ?GC_HISTORY}},
<<"sync">> => #{<<"single_outbound_per_group">> => false},
<<"sync">> => #{<<"single_outbound_per_group">> => false,
<<"sync_allowed_height_from_top">> => ?FORK_RESISTANCE},
<<"mempool">> => #{<<"tx_ttl">> => 100},
<<"mining">> => #{<<"micro_block_cycle">> => 100,
<<"expected_mine_rate">> => 100}},
Expand Down Expand Up @@ -129,14 +131,14 @@ main_test(_Config) ->
[N1, N2] = [aecore_suite_utils:node_name(X) || X <- [dev1, dev2]],
H1 = aec_headers:height(rpc:call(N1, aec_chain, top_header, [])),
ct:log("Height = ~p", [H1]),
true = H1 < ?GC_HISTORY,
true = H1 < ?GC_HISTORY + ?FORK_RESISTANCE,
Primary1 = primary(N1),
ct:log("Primary1 = ~p", [Primary1]),

true = has_key(N1, ?DUMMY_HASH, Primary1),

%% Mining of another keyblock starts second GC phase
mine_until_height(N1, N2, ?GC_HISTORY + 2),
mine_until_height(N1, N2, ?GC_HISTORY + ?FORK_RESISTANCE + 2),

Primary2 = primary(N1),
ct:log("Primary2 = ~p", [Primary2]),
Expand Down Expand Up @@ -337,16 +339,18 @@ calls_test(_Config) ->
%% mine beyond the GC
ct:log("Mining beyond the GC point"),
ct:log("GC server state: ~p", [rpc:call(N1, sys, get_state, [aec_db_gc])]),
{ok, Mined1} = aecore_suite_utils:mine_key_blocks(N1, ?GC_HISTORY + 1),
{ok, Mined1} = aecore_suite_utils:mine_key_blocks(N1, ?GC_HISTORY + ?FORK_RESISTANCE + 1),
ct:log("Blocks mined: ~p", [length(Mined1)]),
_GcSwitch1 = await_gc_switch(),
GcSwitch1 = await_gc_switch(),
0 = GcSwitch1 rem ?GC_HISTORY,
await_scans_complete(),
H1 = aec_headers:height(rpc:call(N1, aec_chain, top_header, [])),
ct:log("Current height is ~p", [H1]),
ct:log("Last GC switch: ~p", [rpc:call(N1, aec_db_gc, info, [[last_gc]])]),
{ok, Mined2} = aecore_suite_utils:mine_key_blocks(N1, ?GC_HISTORY),
ct:log("Mined ~p keyblocks", [length(Mined2)]),
await_gc_switch(),
GcSwitch2 = await_gc_switch(),
0 = GcSwitch2 rem ?GC_HISTORY,
ct:log("Second GC switch (and clearing tables)", []),
{ok, 410, _} = aehttp_integration_SUITE:get_contract_call_object(ContractCreateTxHash),
{ok, 410, _} = aehttp_integration_SUITE:get_contract_call_object(ContractCallTxHash),
Expand Down Expand Up @@ -385,7 +389,8 @@ await_scans_complete() ->
await_gc_switch() ->
receive
{gproc_ps_event, gc, #{info := {gc_switch, AtHeight}}} ->
ct:log("Got GC switch notification for height ~p", [AtHeight])
ct:log("Got GC switch notification for height ~p", [AtHeight]),
AtHeight
after 10000 ->
error({timeout, waiting_for_gc_switch})
end.
8 changes: 7 additions & 1 deletion apps/aeutils/priv/aeternity_config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1623,11 +1623,17 @@
"default" : 50000
},
"history" : {
"description" : "How many blocks (back from the top) should be scanned for reachable hashes",
"description" : "How many generations (at least) should pass between garbage collections",
"type" : "integer",
"minimum" : 50,
"default" : 15000
},
"depth" : {
"description" : "How far below the top to perform collection scans (default: 0 = fork resistance depth)",
"type" : "integer",
"minimum" : 0,
"default" : 0
},
"trees" : {
"description": "Which state trees to scan. Default: all of them",
"type" : "array",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* To eliminate the risk of garbage-collecting a competing fork, which is later evicted,
the garbage collector sweeps start below the fork resistance depth (or a user-configured depth)
below the top.

0 comments on commit ccb9061

Please sign in to comment.