Skip to content

Commit

Permalink
Fixes for db_direct_access + optimizations (#4135)
Browse files Browse the repository at this point in the history
* Fixes for db_direct_access + optimizations

Run mempool gc synchronously from conductor

Fix dialyzer warning

revert accidental rebar.lock change

* GC mempool even if leader. Fix test suites
  • Loading branch information
uwiger committed Jun 21, 2023
1 parent bf1b8c0 commit a3c768f
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 105 deletions.
3 changes: 2 additions & 1 deletion apps/aecore/src/aec_chain_metrics_probe.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ sample_(_, Acc) ->
Acc.

total_difficulty() ->
try {ok, V} = aec_chain:difficulty_at_top_block(),
try {ok, V} = aec_db:ensure_activity(
async_dirty, fun() -> aec_chain:difficulty_at_top_block() end),
V
catch
error:_ -> 0
Expand Down
19 changes: 10 additions & 9 deletions apps/aecore/src/aec_conductor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ handle_call({post_block, Block},_From, State) ->
handle_call(stop_block_production,_From, State = #state{ consensus = Cons }) ->
epoch_mining:info("Mining stopped"),
aec_block_generator:stop_generation(),
[ aec_tx_pool:garbage_collect() || is_record(Cons, consensus) andalso Cons#consensus.leader ],
aec_events:publish(stop_mining, []),
State1 = kill_all_workers(State),
State2 = State1#state{block_producing_state = 'stopped',
Expand Down Expand Up @@ -841,13 +840,15 @@ deregister_instance(Pid, #state{instances = MinerInstances0} = State) ->
preempt_on_new_top(#state{ top_block_hash = OldHash,
top_key_block_hash = OldKeyHash,
top_height = OldHeight,
consensus = Consensus,
mode = Mode
} = State, NewBlock, NewHash, Origin) ->
ConsensusModule = consensus_module(State),
#consensus{ consensus_module = ConsensusModule } = Consensus,
BlockType = aec_blocks:type(NewBlock),
PrevNewHash = aec_blocks:prev_hash(NewBlock),
Hdr = aec_blocks:to_header(NewBlock),
Height = aec_headers:height(Hdr),
maybe_gc_tx_pool(BlockType, Height, OldHeight),
aec_tx_pool:top_change(#{type => BlockType,
old_hash => OldHash,
new_hash => NewHash,
Expand Down Expand Up @@ -1386,7 +1387,7 @@ handle_add_block(Block, Hash, Prev, #state{top_block_hash = TopBlockHash} = Stat
{{error, Reason}, State}
end.

handle_successfully_added_block(Block, Hash, false, _, Events, State, Origin) ->
handle_successfully_added_block(Block, Hash, false, _PrevKeyHeader, Events, State, Origin) ->
maybe_publish_tx_events(Events, Hash, Origin),
maybe_publish_block(Origin, Block),
State1 = maybe_consensus_change(State, Block),
Expand All @@ -1407,15 +1408,15 @@ handle_successfully_added_block(Block, Hash, true, PrevKeyHeader, Events, State,
[ maybe_garbage_collect(NewTopBlock, Hash, true)
|| BlockType == key ],
IsLeader = is_leader(NewTopBlock, PrevKeyHeader, ConsensusModule),
case IsLeader of
true ->
ok; %% Don't spend time when we are the leader.
false ->
aec_tx_pool:garbage_collect()
end,
{ok, setup_loop(State2, true, IsLeader, Origin)}
end.

maybe_gc_tx_pool(key, Height, OldHeight) when Height > OldHeight ->
_ = aec_tx_pool_gc:sync_gc(Height),
ok;
maybe_gc_tx_pool(_, _, _) ->
ok.

maybe_consensus_change(State, Block) ->
%% When a new block got successfully inserted we need to check whether the next block
%% would use different consensus algorithm - This is the point where
Expand Down
20 changes: 16 additions & 4 deletions apps/aecore/src/aec_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
]).

-export([ensure_transaction/1,
ensure_dirty/1,
ensure_activity/2,
activity/2]).

Expand Down Expand Up @@ -506,6 +507,9 @@ backend_mode(<<"mnesia">>, #{persist := true } = M) ->
ensure_transaction(Fun) when is_function(Fun, 0) ->
ensure_activity(get_backend_module(), transaction, Fun).

ensure_dirty(Fun) when is_function(Fun, 0) ->
ensure_activity(get_backend_module(), async_dirty, Fun).

ensure_activity(Type, Fun) when is_function(Fun, 0) ->
ensure_activity(get_backend_module(), Type, Fun).

Expand Down Expand Up @@ -565,6 +569,8 @@ direct_api_activity(Type, Fun) ->
end;
#{activity := #{type := tx}} ->
Fun();
#{activity := #{type := batch}} ->
Fun();
_ when Type == async_dirty; Type == sync_dirty ->
Fun();
_ ->
Expand Down Expand Up @@ -643,7 +649,15 @@ walk_tstore_({{Table, Key}, _, delete} = E, {F, NF}) ->
%% {K, V} = erlang:fun_info(F, K),
%% V.

mrdb_activity(Type, Fun) ->
mrdb_activity(T, Fun) ->
Type = case T of
_ when T == tx;
T == transaction;
T == sync_transaction ->
{tx, #{retries => {0,50}}};
_ ->
T
end,
mrdb:activity(Type, rocksdb_copies, Fun).

%% ======================================================================
Expand Down Expand Up @@ -1339,9 +1353,7 @@ enter_tree_node_(Hash, Value, Tab, Rec) ->
%% and the data is immutable anyway.
promote_tree_node(Hash, Value, Tab, Rec) ->
Obj = mk_record(Rec, Hash, Value),
activity(async_dirty, fun() ->
write(Tab, Obj, write)
end).
ok = write(Tab, Obj, write).

mk_record(aec_account_state , K, V) -> #aec_account_state{key = K, value = V};
mk_record(aec_call_state , K, V) -> #aec_call_state{key = K, value = V};
Expand Down
25 changes: 15 additions & 10 deletions apps/aecore/src/aec_peer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

-import(aeu_debug, [pp/1]).

-define(dirty(Fun), aec_db:ensure_activity(async_dirty, Fun)).

-define(P2P_PROTOCOL_VSN, 1).

-define(DEFAULT_CONNECT_TIMEOUT, 1000).
Expand Down Expand Up @@ -173,7 +175,7 @@ cast_or_call(PeerId, Action, CastOrCall, Timeout) ->
accept_init(Ref, TcpSock, ranch_tcp, Opts) ->
ok = ranch:accept_ack(Ref),
Version = <<?P2P_PROTOCOL_VSN:64>>,
Genesis = aec_chain:genesis_hash(),
Genesis = ?dirty(fun() -> aec_chain:genesis_hash() end),
HSTimeout = noise_hs_timeout(),
case inet:peername(TcpSock) of
{error, Reason} ->
Expand Down Expand Up @@ -802,9 +804,12 @@ ping_obj_rsp(S, RemotePingObj) ->
[PeerId | [aec_peer:id(P) || P <- TheirPeers]]).

local_ping_obj(#{ kind := ConnKind, ext_sync_port := Port }) ->
GHash = aec_chain:genesis_hash(),
TopHash = aec_chain:top_key_block_hash(),
{ok, Difficulty} = aec_chain:difficulty_at_top_block(),
{GHash, TopHash, {ok, Difficulty}} =
?dirty(fun() ->
{aec_chain:genesis_hash(),
aec_chain:top_key_block_hash(),
aec_chain:difficulty_at_top_block()}
end),
#{ genesis_hash => GHash,
best_hash => TopHash,
difficulty => Difficulty,
Expand Down Expand Up @@ -841,7 +846,7 @@ get_header(hash, Hash) ->
get_header(height, N) ->
get_header(fun aec_chain:get_key_header_by_height/1, N);
get_header(Fun, Arg) ->
case Fun(Arg) of
case ?dirty(fun() -> Fun(Arg) end) of
{ok, Header} ->
HH = aec_headers:serialize_to_binary(Header),
{ok, #{ hdr => HH }};
Expand All @@ -857,8 +862,8 @@ handle_get_header_by_height(S, ?VSN_1, Msg) ->
S;
handle_get_header_by_height(S, ?GET_HEADER_BY_HEIGHT_VSN,
#{ height := H, top_hash := TopHash}) ->
case {aec_chain:get_key_header_by_height(H),
aec_chain:hash_is_in_main_chain(TopHash)} of
case ?dirty(fun() -> {aec_chain:get_key_header_by_height(H),
aec_chain:hash_is_in_main_chain(TopHash)} end) of
{{ok, Header}, true} ->
SerHeader = aec_headers:serialize_to_binary(Header),
send_response(S, header, {ok, #{ hdr => SerHeader }});
Expand Down Expand Up @@ -894,7 +899,7 @@ handle_get_n_successors(S, Vsn, Msg) ->
#{ from_hash := FromHash, target_hash := TargetHash, n := N }
when Vsn == ?GET_N_SUCCESSORS_VSN ->
Res = do_get_n_successors(FromHash, N),
case aec_chain:hash_is_in_main_chain(TargetHash) of
case ?dirty(fun() -> aec_chain:hash_is_in_main_chain(TargetHash) end) of
true -> Res;
false -> {error, not_on_chain}
end;
Expand Down Expand Up @@ -1075,13 +1080,13 @@ check_gossiped_header_height(S, Header) ->

handle_light_micro_block(_S, Header, TxHashes, PoF) ->
%% Before assembling the block, check if it is known, and valid
case pre_assembly_check(Header) of
case ?dirty(fun() -> pre_assembly_check(Header) end) of
known ->
ok;
E = {error, _} ->
{ok, HH} = aec_headers:hash_header(Header),
epoch_sync:debug("Dropping gossiped light micro_block (~s): ~p", [pp(HH), E]),
case aec_chain:get_header(aec_headers:prev_key_hash(Header)) of
case ?dirty(fun() -> aec_chain:get_header(aec_headers:prev_key_hash(Header)) end) of
{ok, PrevHeader} ->
epoch_sync:debug("miner beneficiary: ~p", [aec_headers:beneficiary(PrevHeader)]),
ok;
Expand Down
14 changes: 9 additions & 5 deletions apps/aecore/src/aec_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ agree_on_height(PeerId, #chain{ blocks = [#chain_block{ hash = TopHash, height =
true -> TopHash;
false -> get_header_by_height(PeerId, LocalHeight, TopHash)
end,
case aec_chain:hash_is_connected_to_genesis(RemoteHash) of
case hash_is_connected_to_genesis(RemoteHash) of
true ->
{ok, MinHeight, RemoteHash};
false ->
Expand All @@ -906,28 +906,31 @@ agree_on_height(PeerId, #chain{ blocks = [#chain_block{ hash = TopHash, height =
agree_on_height(PeerId, RemoteTop, Height, Step) ->
NewHeight = max(aec_block_genesis:height(), Height - Step),
RHash = get_header_by_height(PeerId, NewHeight, RemoteTop),
case aec_chain:hash_is_connected_to_genesis(RHash) of
case hash_is_connected_to_genesis(RHash) of
true ->
agree_on_height(PeerId, RemoteTop, NewHeight, Height, RHash);
false ->
agree_on_height(PeerId, RemoteTop, NewHeight, Step * 2)
end.

hash_is_connected_to_genesis(Hash) ->
aec_db:ensure_dirty(fun() -> aec_chain:hash_is_connected_to_genesis(Hash) end).

%% We agree on Hash at MinH and disagree at MaxH
agree_on_height(_PeerId, _RemoteTop, MinH, MaxH, Hash) when MaxH == MinH + 1 ->
{ok, MinH, Hash};
agree_on_height(PeerId, RemoteTop, MinH, MaxH, Hash) ->
H = (MinH + MaxH) div 2,
RHash = get_header_by_height(PeerId, H, RemoteTop),
case aec_chain:hash_is_connected_to_genesis(RHash) of
case hash_is_connected_to_genesis(RHash) of
true ->
agree_on_height(PeerId, RemoteTop, H, MaxH, RHash);
false ->
agree_on_height(PeerId, RemoteTop, MinH, H, Hash)
end.
get_header_by_height(PeerId, Height, RemoteTop) ->
case Height == aec_block_genesis:height() of
true -> aec_chain:genesis_hash(); %% Handshake ensure we agree on genesis
true -> aec_db:ensure_dirty(fun() -> aec_chain:genesis_hash() end); %% Handshake ensure we agree on genesis
false ->
case peer_get_header_by_height(PeerId, Height, RemoteTop) of
{ok, RemoteAtHeight} ->
Expand Down Expand Up @@ -1138,7 +1141,8 @@ gen_is_consecutive(backward, KB, MBs = [MB1, MB2 | _]) ->
%%%=============================================================================

has_generation(KeyBlockHash) ->
case aec_chain:get_header(KeyBlockHash) of
case aec_db:ensure_dirty(
fun() -> aec_chain:get_header(KeyBlockHash) end) of
error ->
false;
{ok, Header} ->
Expand Down
59 changes: 30 additions & 29 deletions apps/aecore/src/aec_tx_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
, stop/0
]).

-export([ garbage_collect/0
, get_candidate/2
-export([ get_candidate/2
, get_candidate/3
, get_max_nonce/1
, minimum_miner_gas_price/0
Expand All @@ -57,6 +56,7 @@

%% exports used by GC (should perhaps be in a common lib module)
-export([ dbs_/0
, gc_height_and_dbs/0
, gc_db/1
, origins_cache/0
, origins_cache_max_size/0
Expand All @@ -75,7 +75,6 @@
-include_lib("aecontract/include/hard_forks.hrl").

-ifdef(TEST).
-export([sync_garbage_collect/1]). %% Only for (Unit-)test
-export([restore_mempool/0]).
-export([peek_db/0]).
-export([peek_visited/0]).
Expand Down Expand Up @@ -242,18 +241,7 @@ get_max_nonce(Sender) ->
#dbs{nonce_db = NDb} = dbs(),
?TC(int_get_max_nonce(NDb, Sender), {max_nonce, Sender}).

-spec garbage_collect() -> ok.
garbage_collect() ->
lager:debug("garbage_collect()", []),
gen_server:cast(?SERVER, garbage_collect).

-ifdef(TEST).
-spec sync_garbage_collect(Height :: aec_blocks:height()) -> ok.
sync_garbage_collect(Height) ->
aec_tx_pool_gc:gc(Height, dbs()),
sys:get_status(aec_tx_pool_gc), %% sync point (gc is asynchronous)
ok.

restore_mempool() ->
revisit(dbs()).

Expand Down Expand Up @@ -363,6 +351,9 @@ origins_cache_max_size() -> ?ORIGINS_CACHE_MAX_SIZE.
dbs() ->
gen_server:call(?SERVER, dbs).

gc_height_and_dbs() ->
gen_server:call(?SERVER, gc_height_and_dbs).

raw_delete(#dbs{} = Dbs, Key) ->
pool_db_raw_delete(Dbs, Key).

Expand Down Expand Up @@ -492,22 +483,20 @@ handle_call_({failed_txs, FailedTxs}, _From, #state{dbs = Dbs} = State) ->
{reply, ok, State};
handle_call_(dbs, _From, #state{dbs = Dbs} = State) ->
{reply, Dbs, State};
handle_call_(gc_height_and_dbs, _From, #state{dbs = Dbs} = State) ->
case State of
#state{gc_height = undefined, sync_top_calc = P} when is_pid(P) ->
{reply, undefined, State};
#state{gc_height = H} when is_integer(H) ->
{reply, {H, Dbs}, State}
end;
handle_call_(Request, From, State) ->
lager:warning("Ignoring unknown call request from ~p: ~p", [From, Request]),
{noreply, State}.

handle_cast(Msg, St) ->
?TC(handle_cast_(Msg, St), Msg).

handle_cast_(garbage_collect, State) ->
case State of
#state{gc_height = undefined, sync_top_calc = P} when is_pid(P) ->
%% sync_top update will be followed by GC (in handle_info/2 below)
{noreply, State};
#state{gc_height = H} when is_integer(H) ->
State1 = do_update_sync_top_target(H, State),
{noreply, State1}
end;
handle_cast_(Msg, State) ->
lager:warning("Ignoring unknown cast message: ~p", [Msg]),
{noreply, State}.
Expand All @@ -516,7 +505,6 @@ handle_info(Msg, St) ->
?TC(handle_info_(Msg, St), Msg).

handle_info_({P, new_gc_height, GCHeight}, #state{sync_top_calc = P} = State) ->
aec_tx_pool_gc:gc(GCHeight, State#state.dbs),
{noreply, State#state{sync_top_calc = undefined, gc_height = GCHeight}};
handle_info_({'ETS-TRANSFER', _, _, _}, State) ->
{noreply, State};
Expand Down Expand Up @@ -560,8 +548,7 @@ int_get_max_nonce(NonceDb, Sender) ->
%% ... Unless no matching txs can be found in the regular mempool.
%%
int_get_candidate(MaxGas, IgnoreTxs, BlockHash, #dbs{db = Db} = DBs) ->
{ok, Trees} = aec_chain:get_block_state(BlockHash),
{ok, Header} = aec_chain:get_header(BlockHash),
{Trees, Header} = get_trees_and_header(BlockHash),
lager:debug("size(Db) = ~p", [ets:info(Db, size)]),
MinMinerGasPrice = aec_tx_pool:minimum_miner_gas_price(),
MinTxGas = aec_governance:min_tx_gas(),
Expand All @@ -582,6 +569,14 @@ int_get_candidate(MaxGas, IgnoreTxs, BlockHash, #dbs{db = Db} = DBs) ->

{ok, Txs}.

get_trees_and_header(BlockHash) ->
aec_db:ensure_dirty(
fun() ->
{ok, Trees} = aec_chain:get_block_state(BlockHash),
{ok, Header} = aec_chain:get_header(BlockHash),
{Trees, Header}
end).

int_get_candidate(Db, Gas, MinTxGas, MinMinerGasPrice, Trees, Header, DBs, Acc)
when Gas > MinTxGas ->
Pat = [{ '_', [], ['$_'] }],
Expand Down Expand Up @@ -895,8 +890,11 @@ add_to_origins_cache(OriginsCache, SignedTx) ->
ok = aec_tx_pool_gc:add_to_origins_cache(OriginsCache, Origin, Nonce).

-spec check_pool_db_put(aetx_sign:signed_tx(), tx_hash(), event()) ->
ignore | ok | {error, atom()}.
ignore | ok | {error, atom()}.
check_pool_db_put(Tx, TxHash, Event) ->
aec_db:ensure_dirty(fun() -> check_pool_db_put_(Tx, TxHash, Event) end).

check_pool_db_put_(Tx, TxHash, Event) ->
AllowReentryOfDeletedTx = allow_reentry(),
case aec_chain:find_tx_location(TxHash) of
BlockHash when is_binary(BlockHash) ->
Expand Down Expand Up @@ -1070,9 +1068,12 @@ nonce_baseline_check(TxNonce, _) ->
false -> {error, nonce_too_high}
end.

get_account(AccountKey, {account_trees, AccountsTrees}) ->
get_account(AccountKey, How) ->
aec_db:ensure_dirty(fun() -> get_account_(AccountKey, How) end).

get_account_(AccountKey, {account_trees, AccountsTrees}) ->
aec_accounts_trees:lookup(AccountKey, AccountsTrees);
get_account(AccountKey, {block_hash, BlockHash}) ->
get_account_(AccountKey, {block_hash, BlockHash}) ->
aec_chain:get_account_at_hash(AccountKey, BlockHash).

check_minimum_fee(Tx, _TxHash, Block, _BlockHash, _Trees, _Event) ->
Expand Down

0 comments on commit a3c768f

Please sign in to comment.