Skip to content

Commit

Permalink
Hyperchains parent layer (#3335)
Browse files Browse the repository at this point in the history
* Parent connector behaviour; Chain simulator connector draft;

* Interface was changed accordingly to disscussion;

* Connector validation; Block subscription;

* Signing transactions;

* Add block events; Compatibility with OTP21;

* Tests for Chain Simulator connector;

* Address review comments;

* Add get_block_by_hash/1 to connector API; Move test connector to test dir;

* Add export type; Dializer fixes;

* Fix tests; Enable mocks from test utils;
  • Loading branch information
erlmachinedev authored and gorbak25 committed Feb 9, 2021
1 parent 1310749 commit ef02256
Show file tree
Hide file tree
Showing 11 changed files with 482 additions and 103 deletions.
1 change: 1 addition & 0 deletions apps/aecore/src/aec_events.erl
Expand Up @@ -31,6 +31,7 @@
| peers
| metric
| chain_sync
| {parent_chain, any()}
| oracle_query_tx_created
| oracle_response_tx_created
| {tx_event, any()}.
Expand Down
66 changes: 32 additions & 34 deletions apps/aecore/src/aec_plugin.erl
Expand Up @@ -6,24 +6,30 @@
, get_module/2
]).

%% Pre-OTP 21 ==================================================
-ifndef(OTP_RELEASE).

register(_Map) ->
error(requires_OTP21).

get_module(_Tag) ->
undefined.

get_module(_Tag, _Default) ->
undefined.

%% OTP 21 and later ============================================
-else.

tags() ->
[aec_tx_pool, aec_fork_block_settings].

valid_registry(R) when is_map(R) ->
lists:all(
fun({K, M}) ->
case (is_atom(M)
andalso
lists:member(K, tags())) of
true ->
module_is_loaded(M);
false ->
false
end
end, maps:to_list(R)).

module_is_loaded(M) ->
case code:ensure_loaded(M) of
{module, _} -> true;
{error, E} ->
lager:error("Module ~p not loaded: ~p", [M, E]),
false
end.

register(Map) when is_map(Map) ->
case valid_registry(Map) of
true ->
Expand All @@ -43,26 +49,18 @@ get_module(Tag, Default) ->
Default
end.

valid_registry(R) when is_map(R) ->
lists:all(
fun({K, M}) ->
case (is_atom(M)
andalso
lists:member(K, tags())) of
true ->
module_is_loaded(M);
false ->
false
end
end, maps:to_list(R)).

module_is_loaded(M) ->
case code:ensure_loaded(M) of
{module, _} -> true;
{error, E} ->
lager:error("Module ~p not loaded: ~p", [M, E]),
false
end.
%% Pre-OTP 21 ==================================================
-ifndef(OTP_RELEASE).

set_registry(R) ->
application:set_env(aecore, {?MODULE, registry}, R).

get_registry() ->
application:get_env(aecore, {?MODULE, registry}, undefined).

%% OTP 21 and later ============================================
-else.

%% We want to use persistent_term for storing the registry, since
%% fetching it will have the same overhead as a remote function call.
Expand Down
20 changes: 10 additions & 10 deletions apps/aecore/src/aecore_sup.erl
Expand Up @@ -9,16 +9,16 @@
%%% aecore_sup
%%% (one_for_one)
%%% |
%%% -----------------------------------------
%%% | | | | |
%%% | aec_metrics aec_keys aec_tx_pool |
%%% | |
%%% aec_connection_sup aec_conductor_sup
%%% (one_for_all) (rest_for_one)
%%% | |
%%% | ---------------------
%%% | | |
%%% | aec_block_generator aec_conductor
%%% |----------------------------------------------------------------------------------------------------------
%%% | | | | | | | | | |
%%% | watchdog aec_upnp aec_metrics aec_keys aec_tx_pool aec_tx_pool_gc aec_db_error_store aec_db_gc |
%%% | |
%%% aec_connection_sup aec_conductor_sup
%%% (one_for_all) (rest_for_one)
%%% | |
%%% | ---------------------
%%% | | |
%%% | aec_block_generator aec_conductor
%%% |
%%% -------------------------------------------------------------------
%%% | | | | |
Expand Down
109 changes: 77 additions & 32 deletions apps/aecore/test/aec_chain_sim.erl
Expand Up @@ -38,6 +38,9 @@

-export([ next_nonce/1 %% (Acct) -> integer()
, push/1 %% (Tx) -> ok
, find_signed_tx/1 %% (TxHash) -> {value, STx)} | none
, top_block_hash/0 %% () -> Hash
, block_by_hash/1 %% (BlockHash) -> {ok, Block}
, add_keyblock/0 %% () -> {ok, Block}
, add_keyblock/1 %% (ForkId) -> {ok, Block}
, add_microblock/0 %% () -> {ok, Block}
Expand Down Expand Up @@ -66,6 +69,8 @@
, dict = #{}
, chain }).

-type simulator() :: default | parent_chain.

-type fork_id() :: main | term().
-type block_hash() :: aec_blocks:block_header_hash().

Expand Down Expand Up @@ -93,6 +98,7 @@ start() ->
%% Starts the simulator
%% Supported options:
%% - monitor => pid(): The simulator will terminate if the monitored process dies
%% - simulator => simulator(): The simulator mode (parent chain for Hyperchains or default)
%%
start(Opts) when is_map(Opts) ->
gen_server:start(?MODULE, Opts, []).
Expand All @@ -116,6 +122,17 @@ stop() ->
end.


-spec simulator(map()) -> simulator().
%%
%% Simulator type. default is used mostly in test suites.
%% parent_chain is used to run the process as attached "parent chain"
%% to reproduce various parent chain scenarios for Hyperchains;
%%
simulator(Opts) ->
Type = maps:get(simulator, Opts, default),
true = (Type == default orelse Type == parent_chain),
Type.

%% Chain simulator requests

-spec next_nonce(Acct :: aec_keys:pubkey()) -> integer().
Expand All @@ -139,6 +156,14 @@ push(Tx) ->
add_keyblock() ->
add_keyblock(main).

-spec block_by_hash(block_hash()) -> {ok, sim_keyblock() |sim_microblock()}.
block_by_hash(BlockHash) ->
chain_req({block_by_hash, BlockHash}).

-spec top_block_hash() -> binary().
top_block_hash() ->
chain_req(top_block_hash).

-spec add_keyblock( ForkId :: fork_id() ) -> {ok, sim_keyblock()}.
%%
%% Adds a keyblock. If ForkId == main, the keyblock is added to the main fork.
Expand Down Expand Up @@ -206,6 +231,13 @@ dict_set(Key, Value) ->
dict_get(Key, Default) ->
chain_req({dict_get, Key, Default}).

-spec find_signed_tx(binary()) -> {value, binary()} | none.
%%
%% Returns the transaction hash associated with Key
%%
find_signed_tx(TxHash) ->
chain_req({find_signed_tx, TxHash}).

-spec setup_meck() -> ok.
%%
%% Installs the mocks needed for the simulator. The assumption is that the mocks
Expand Down Expand Up @@ -269,7 +301,7 @@ remove_meck() ->
init(Opts) when is_map(Opts) ->
gproc:reg({n,l,{?MODULE, chain_process}}),
Chain = new_chain(),
?LOG("Initial chain: ~p", [Chain]),
?LOG("Initial chain (~p simulator): ~p", [simulator(Opts), Chain]),
{ok, maybe_monitor(#st{opts = Opts, chain = Chain})}.

maybe_monitor(#st{opts = #{monitor := Pid}} = St) when is_pid(Pid) ->
Expand All @@ -278,20 +310,20 @@ maybe_monitor(#st{opts = #{monitor := Pid}} = St) when is_pid(Pid) ->
maybe_monitor(St) ->
St.

handle_call({add_micro, ForkId}, _From, #st{chain = Chain} = St) ->
{Res, Chain1} = add_microblock_(ForkId, Chain),
handle_call({add_micro, ForkId}, _From, #st{opts = Opts, chain = Chain} = St) ->
{Res, Chain1} = add_microblock_(ForkId, Chain, Opts),
{reply, Res, St#st{chain = Chain1}};
handle_call({clone_micro_on_fork, Hash, ForkId}, _From, #st{chain = Chain} = St) ->
{Res, Chain1} = clone_micro_on_fork_(Hash, ForkId, Chain),
handle_call({clone_micro_on_fork, Hash, ForkId}, _From, #st{opts = Opts, chain = Chain} = St) ->
{Res, Chain1} = clone_micro_on_fork_(Hash, ForkId, Chain, Opts),
{reply, Res, St#st{chain = Chain1}};
handle_call({add_key, ForkId}, _From, #st{chain = Chain} = St) ->
{Res, Chain1} = add_keyblock_(ForkId, Chain),
handle_call({add_key, ForkId}, _From, #st{opts = Opts, chain = Chain} = St) ->
{Res, Chain1} = add_keyblock_(ForkId, Chain, Opts),
{reply, Res, St#st{chain = Chain1}};
handle_call({fork_from_hash, ForkId, FromHash}, _From, #st{chain = Chain} = St) ->
{Res, Chain1} = fork_from_hash_(ForkId, FromHash, Chain),
handle_call({fork_from_hash, ForkId, FromHash}, _From, #st{opts = Opts, chain = Chain} = St) ->
{Res, Chain1} = fork_from_hash_(ForkId, FromHash, Chain, Opts),
{reply, Res, St#st{chain = Chain1}};
handle_call({fork_switch, ForkId}, _From, #st{chain = Chain} = St) ->
{Res, Chain1} = fork_switch_(ForkId, Chain),
handle_call({fork_switch, ForkId}, _From, #st{opts = Opts, chain = Chain} = St) ->
{Res, Chain1} = fork_switch_(ForkId, Chain, Opts),
{reply, Res, St#st{chain = Chain1}};
handle_call({push, Tx}, _From, #st{chain = #{mempool := Pool} = Chain} = St) ->
%% TODO: not yet asserting increasing nonces
Expand All @@ -302,6 +334,8 @@ handle_call({next_nonce, Acct}, _From, #st{chain = #{nonces := Nonces} = Chain}
{reply, NewN, St#st{chain = Chain#{nonces => Nonces#{ Acct => NewN }}}};
handle_call(top_block_hash, _From, #st{chain = Chain} = St) ->
{reply, top_block_hash_(Chain), St};
handle_call({block_by_hash, Hash},_From, #st{chain = Chain} = St) ->
{reply, get_block_(Hash, Chain), St};
handle_call({get_block_state, Hash}, _From, #st{chain = Chain} = St) ->
{reply, get_block_state_(Hash, Chain), St};
handle_call({get_header, Hash}, _From, #st{chain = Chain} = St) ->
Expand Down Expand Up @@ -340,11 +374,11 @@ code_change(_FromVsn, C, _Extra) ->
{ok, C}.

%% Called from the chain process
add_microblock_(ForkId, #{mempool := Pool} = Chain) ->
add_microblock_(ForkId, #{mempool := Pool} = Chain, Opts) ->
Txs = lists:reverse(Pool),
add_microblock_(ForkId, Txs, Chain#{mempool => []}).
add_microblock_(ForkId, Txs, Chain#{mempool => []}, Opts).

add_microblock_(ForkId, Txs, #{forks := Forks} = Chain) ->
add_microblock_(ForkId, Txs, #{forks := Forks} = Chain, Opts) ->
?LOG("add_microblock(Txs = ~p", [Txs]),
#{blocks := Blocks} = F = maps:get(ForkId, Forks),
#{hash := PrevHash, header := TopHdr} = hd(Blocks),
Expand All @@ -363,19 +397,19 @@ add_microblock_(ForkId, Txs, #{forks := Forks} = Chain) ->
NewFork = F#{blocks => [Block | Blocks]},
?LOG("NewFork(~p): ~p", [ForkId, NewFork]),
NewForks = Forks#{ForkId => NewFork},
NewChain = announce(ForkId, Txs, Chain#{ forks => NewForks}),
NewChain = announce(ForkId, Txs, Chain#{ forks => NewForks}, Opts),
{{ok, Block}, NewChain}.

clone_micro_on_fork_(Hash, ForkId, #{forks := Forks} = Chain) ->
clone_micro_on_fork_(Hash, ForkId, #{forks := Forks} = Chain, Opts) ->
#{blocks := Blocks} = maps:get(main, Forks),
case lists_mapfind(Hash, hash, Blocks) of
false ->
error({no_such_hash, Hash});
#{txs := Txs} ->
add_microblock_(ForkId, Txs, Chain)
add_microblock_(ForkId, Txs, Chain, Opts)
end.

fork_from_hash_(ForkId, Hash, #{ forks := Forks } = Chain) ->
fork_from_hash_(ForkId, Hash, #{ forks := Forks } = Chain, Opts) ->
case maps:is_key(ForkId, Forks) of
true ->
error({fork_id_exists, ForkId});
Expand All @@ -389,11 +423,11 @@ fork_from_hash_(ForkId, Hash, #{ forks := Forks } = Chain) ->
, blocks => BlocksUntilHash },
NewForks = Forks#{ForkId => NewFork},
%% Automatically add a keyblock as instigator of the fork.
add_keyblock_(ForkId, Chain#{forks => NewForks})
add_keyblock_(ForkId, Chain#{forks => NewForks}, Opts)
end
end.

fork_switch_(ForkId, #{forks := Forks, mempool := Pool, orphans := Orphans} = Chain) ->
fork_switch_(ForkId, #{forks := Forks, mempool := Pool, orphans := Orphans} = Chain, Opts) ->
#{ main := #{blocks := Blocks} = M
, ForkId := #{fork_point := ForkPoint, blocks := FBlocks}} = Forks,
Evict = lists:takewhile(
Expand All @@ -407,28 +441,31 @@ fork_switch_(ForkId, #{forks := Forks, mempool := Pool, orphans := Orphans} = Ch
NewChain = Chain#{ forks => NewForks
, mempool => NewPool
, orphans => Evict ++ Orphans },
{ok, announce(main, [], NewChain)}.
{ok, announce(main, [], NewChain, Opts)}.

%% Announce top_changed and tx events
announce(ForkId, Txs, #{ forks := Forks } = Chain) ->
announce(ForkId, Txs, #{ forks := Forks } = Chain, Opts) ->
#{ ForkId := #{ blocks := [#{ hash := TopHash
, prev := PrevHash
, header := Hdr } | _] = Blocks} } = Forks,
SimulatorT = simulator(Opts),
Height = length(Blocks) + 1,
Type = aec_headers:type(Hdr),
Origin = origin(Type),
Info = #{ block_hash => TopHash
Info = #{ pid => self(),
block_hash => TopHash
, block_type => Type
, block_origin => Origin
, prev_hash => PrevHash
, height => Height },
send_tx_events(Txs, Info),
case ForkId of
main ->
?LOG("Publishing top_changed, I = ~p", [Info]),
aec_events:publish(top_changed, Info);
_ ->
ignore
if SimulatorT == parent_chain ->
aec_events:publish({parent_chain, top_changed}, Info#{txs => Txs});
true ->
send_tx_events(Txs, Info),
(ForkId == main) andalso
begin ?LOG("Publishing top_changed, I = ~p", [Info]),
aec_events:publish(top_changed, Info)
end
end,
Chain.

Expand All @@ -454,7 +491,7 @@ new_chain() ->
, blocks => Blocks } }}.

%% Called from the chain process
add_keyblock_(ForkId, #{forks := Forks, miner := #{pubkey := Miner}} = Chain) ->
add_keyblock_(ForkId, #{forks := Forks, miner := #{pubkey := Miner}} = Chain, Opts) ->
?LOG("add_keyblock(~p)", [ForkId]),
#{ ForkId := #{blocks := Blocks} = F } = Forks,
#{hash := PrevHash, header := TopHdr} = hd(Blocks),
Expand All @@ -469,7 +506,7 @@ add_keyblock_(ForkId, #{forks := Forks, miner := #{pubkey := Miner}} = Chain) ->
, header => NewHdr
, txs => [] },
NewChain = Chain#{ forks => Forks#{ ForkId => F#{blocks => [Block | Blocks]} } },
announce(ForkId, [], NewChain),
announce(ForkId, [], NewChain, Opts),
{{ok, Block}, NewChain}.

send_tx_events(Txs, #{ block_hash := BlockHash
Expand Down Expand Up @@ -637,6 +674,14 @@ get_header_(Hash, Chain) ->
error
end.

get_block_(Hash, Chain) ->
case blocks_until_hash(Hash, blocks(Chain)) of
[Block|_] ->
{ok, Block};
[] ->
error
end.

get_channel_(ChId, Chain) ->
case trees(blocks(Chain)) of
{ok, Trees} ->
Expand Down
8 changes: 8 additions & 0 deletions apps/aehyperchains/src/aehc_app.erl
Expand Up @@ -9,6 +9,10 @@
, check_env/0
]).

-export([get_connector_id/0]).

-define(DEFAULT_CONNECTOR_ID, <<"aehc_aeternity_connector">>).

start(_StartType, _StartArgs) ->
aehc_sup:start_link().

Expand Down Expand Up @@ -46,3 +50,7 @@ set_env({set_env, K}, V) when is_atom(K) ->
application:set_env(aehyperchains, K, V);
set_env(F, V) when is_function(F, 1) ->
F(V).

get_connector_id() ->
aeu_env:config_value([<<"chain">>, <<"hyperchains">>, <<"connector">>, <<"id">>],
aehyperchains, [hyperchains, connector, id], ?DEFAULT_CONNECTOR_ID).

0 comments on commit ef02256

Please sign in to comment.