Skip to content

Commit

Permalink
Pt 159406473 sc continuous chain monitoring 4 -- review (#2488)
Browse files Browse the repository at this point in the history
* Document entries in aesc_window used in channels code

* Prefer record to tuple in internal watcher logs so to avoid magic indexes

* Fix type of scenario `has_tx` in channel watcher

Also make checkes on scenario more strict.

Using records could make code easier to read and maintain though would
require more invasive changes.

* code changes due to stricter types (#2493)

* take pains to avoid non-existing txs in the txlog
  • Loading branch information
lucafavatella authored and uwiger committed Jun 5, 2019
1 parent f9afc7b commit c39748a
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 59 deletions.
4 changes: 3 additions & 1 deletion apps/aechannel/src/aesc_fsm.erl
Expand Up @@ -183,6 +183,8 @@
[aesc_offchain_update:update()]}
| {reestablish, OffChainTx :: aetx_sign:signed_tx()}.

-type log() :: aesc_window:window({Op :: atom(), Type :: atom(), erlang:timestamp(), Msg :: term()}).

-opaque state_name() :: awaiting_open
| awaiting_reestablish
| initialized
Expand Down Expand Up @@ -218,7 +220,7 @@
, latest = undefined :: latest_op()
, ongoing_update = false :: boolean()
, last_reported_update :: undefined | non_neg_integer()
, log :: aesc_window:window()
, log :: log()
, strict_checks = true :: boolean()
}).

Expand Down
168 changes: 122 additions & 46 deletions apps/aechannel/src/aesc_fsm_min_depth_watcher.erl
Expand Up @@ -18,14 +18,27 @@
-export([get_txs_since/2]).

-define(GEN_SERVER_OPTS, []).
-define(IS_INFO_OF_SCENARIO_HAS_TX(X),
( is_tuple(X)
andalso (tuple_size(X) =:= 2)
andalso ( is_tuple(element(1, X))
andalso (tuple_size(element(1, X)) =:= 2)
andalso is_binary(element(1, element(1, X)))
andalso is_binary(element(2, element(1, X)))
)
andalso ( is_map(element(2, X))
andalso (map_size(element(2, X)) =:= 3
orelse map_size(element(2, X)) =:= 4)
)
) ).
-define(IS_SCENARIO(S),
( (S =:= top)
orelse (S =:= next_block)
orelse (S =:= fork_switch)
orelse ( is_tuple(S)
andalso (tuple_size(S) =:= 2)
andalso (element(1, S) =:= has_tx)
%% TODO Check element(2, S)
andalso ?IS_INFO_OF_SCENARIO_HAS_TX(element(2, S))
)
) ).

Expand All @@ -34,11 +47,21 @@
, chan_vsn
, last_block %% last time we updated channel vsn
, last_top %% the block hash of the last check
, tx_log = aesc_window:new()
, rpt_log = aesc_window:new()
, tx_log = aesc_window:new() :: tx_log()
, rpt_log = aesc_window:new() :: rpt_log()
, closing = false
, requests = [] }).

-record(tx_log_entry,
{ key :: tx_log_entry_key()
, value :: tx_log_entry_value()
}).

-record(rpt_log_entry,
{ key :: {changed_on_chain | closing_on_chain | closed_on_chain, tx_hash()}
, value :: #{ atom() := term() }
}).

-type mode() :: close
| unlock
| tx_hash
Expand All @@ -64,7 +87,11 @@

-type block_hash() :: binary().
-type tx_hash() :: binary().
-type scenario() :: top | next_block | fork_switch | {has_tx, tx_hash()}.
-type info_of_scenario_has_tx() :: {tx_log_entry_key(), tx_log_entry_value()}.
-type scenario() :: top
| next_block
| fork_switch
| {has_tx, info_of_scenario_has_tx()}.
-type chan_vsn() :: undefined | { aesc_channels:round()
, aesc_channels:solo_round()
, aesc_channels:is_active()
Expand All @@ -77,9 +104,18 @@
, lock_period := aesc_channels:lock_period()
, locked_until := aesc_channels:locked_until() }.

-type tx_log_entry_key() :: {tx_hash(), block_hash()}.
-type tx_log_entry_value() :: #{ tx => aetx_sign:signed_tx()
, block_hash := block_hash()
, block_origin := chain
, type := aetx:tx_type() }.
-type tx_log() :: aesc_window:window(#tx_log_entry{}).

-type rpt_log() :: aesc_window:window(#rpt_log_entry{}).

-type cache() :: #{ mode := mode()
, tx_log := aesc_window:window()
, rpt_log := aesc_window:window()
, tx_log := tx_log()
, rpt_log := rpt_log()
, last_block := block_hash()
, chan_vsn := chan_vsn()
, block_hash => block_hash()
Expand Down Expand Up @@ -341,11 +377,12 @@ init_cache_(#st{ last_block = LastBlock

%% The TxLog relies on the custom tx events (remember that it's a bounded list)
find_tx_in_block(BHash, TxLog) ->
case aesc_window:info_find([{block_hash, BHash}], 2, TxLog) of
Filter = [{block_hash, BHash}],
case aesc_window:info_find(Filter, #tx_log_entry.value, TxLog) of
false ->
false;
LastTx ->
LastTx
X = #tx_log_entry{} ->
{X#tx_log_entry.key, X#tx_log_entry.value}
end.

%% We do an async_dirty activity for minimal overhead. Note that the analysis
Expand Down Expand Up @@ -475,9 +512,9 @@ check_req(#{mode := unlock} = R, #st{chan_id = ChId, chan_vsn = Vsn} = St, C) ->
check_req(#{mode := watch} = R, #st{chan_id = ChId} = St,
#{scenario := Scenario, top_hash := Hash } = C)
when ?IS_SCENARIO(Scenario) ->
lager:debug("Scenario = ~p", [Scenario]),
case Scenario of
{has_tx, {_, #{ type := TxType
, tx_hash := TxHash }}} ->
{has_tx, {{TxHash,_}, #{ type := TxType }}} ->
{Ch, C1} = get_channel(ChId, Hash, C),
{#{ height := H }, C2} = top_info(Hash, C1),
case Ch of
Expand All @@ -495,6 +532,7 @@ check_req(#{mode := watch} = R, #st{chan_id = ChId} = St,
fork_switch ->
watch_for_channel_change(R, St, C);
_ ->
lager:debug("Other scenario - ignore (~p)", [Scenario]),
{R, C}
end;
check_req(#{mode := tx_hash, tx_hash := TxHash, min_depth := MinDepth} = R,
Expand Down Expand Up @@ -597,10 +635,11 @@ report_closed_on_chain(#{ tx_type := _TxType

maybe_report(RptKey, Info, Rpt, C) when is_function(Rpt) ->
RptLog = maps:get(rpt_log, C),
case aesc_window:keyfind(RptKey, 1, RptLog) of
case aesc_window:keyfind(RptKey, #rpt_log_entry.key, RptLog) of
false ->
C1 = call_rpt(Rpt, C),
C1#{rpt_log => aesc_window:add({RptKey, Info}, RptLog)};
LogEntry = #rpt_log_entry{key = RptKey, value = Info},
C1#{rpt_log => aesc_window:add(LogEntry, RptLog)};
_ ->
C
end.
Expand Down Expand Up @@ -666,15 +705,16 @@ handle_ch_status_changed(ChId, #{ last_block := Last
lager:debug("channel status changed, Hash=~p, Last=~p", [Hash,Last]),
{TxLog, C1} = get_txs_since({any_after_block, Last}, Hash, ChId, C),
lager:debug("New TxLog: ~p", [TxLog]),
#{tx_hash := Tx, block_hash := BlockHash} = get_latest_tx(TxLog),
lager:debug("Latest Tx = ~p, Hash = ~p", [Tx, BlockHash]),
{Status, C2} = get_basic_ch_status_(ChId, BlockHash, C1),
{TxHash, #{tx := Tx, block_hash := BlockHash}, C2} =
get_latest_tx(TxLog, C1),
lager:debug("Latest TxHash = ~p, Hash = ~p", [TxHash, BlockHash]),
{Status, C3} = get_basic_ch_status_(ChId, BlockHash, C2),
lager:debug("ChStatus(~p) = ~p", [BlockHash, Status]),
%% Assert that this is really a changed channel object
true = channel_status_changed(maps:get(vsn, Status), C2),
true = channel_status_changed(maps:get(vsn, Status), C3),
lager:debug("asserted status changed", []),
Status1 = Status#{tx => Tx},
{Status1, C2#{{channel, BlockHash} => Status1}}.
{Status1, C3#{{channel, BlockHash} => Status1}}.

get_basic_ch_status_(ChId, #{ top_hash := Hash } = C) ->
get_basic_ch_status_(ChId, Hash, C).
Expand Down Expand Up @@ -732,18 +772,40 @@ log_tx(#{ tx_hash := TxHash
, block_origin := _Origin
, type := _Type } = Info, #st{tx_log = TxLog} = St) ->
Key = {TxHash, BlockHash},
case aesc_window:keymember(Key, 1, TxLog) of
case aesc_window:keymember(Key, #tx_log_entry.key, TxLog) of
true ->
St;
false ->
TxLog1 = aesc_window:add({Key, Info}, TxLog),
LogEntry = #tx_log_entry{key = Key,
value = maps:with([block_hash,
block_origin,
type], Info)},
TxLog1 = aesc_window:add(LogEntry, TxLog),
St#st{tx_log = TxLog1}
end.

get_latest_tx(Log) ->
{{{_TxHash, _BlockHash}, Tx}, _TxLog1} = aesc_window:pop(Log),
lager:debug("Tx = ~p", [Tx]),
Tx.
get_latest_tx(Log, C) ->
{#tx_log_entry{key = {TxHash, _BlockHash},
value = TxInfo},
TxLog1} =
aesc_window:pop(Log),
lager:debug("TxInfo = ~p", [TxInfo]),
case ensure_signed_tx_included(TxHash, TxInfo, C) of
{#{tx := undefined}, C1} ->
lager:debug("No such signed tx (~p)", [TxHash]),
get_latest_tx(TxLog1, C1#{tx_log => TxLog1});
{TxInfo1, C1} ->
{TxHash, TxInfo1, C1}
end.

ensure_signed_tx_included(TxHash, TxInfo, C) ->
case maps:is_key(tx, TxInfo) of
true ->
{TxInfo, C};
false ->
{SignedTx, C1} = get_signed_tx(TxHash, C),
{TxInfo#{tx => SignedTx}, C1}
end.

%% Find recent txs. Two different stop conditions:
%% - {any_after_block, BlockHash}: Stop as soon as a block is found with
Expand All @@ -765,12 +827,17 @@ get_txs_since(StopCond, Hash, ChId, C) ->
lists:foldl(
fun({BlockHash, Txs}, Acc) ->
lists:foldl(
fun(TxHash, Acc1) ->
aesc_window:add(
{ {TxHash, BlockHash},
#{ tx_hash => TxHash
, block_hash => BlockHash
, block_origin => chain } }, Acc1)
fun(SignedTx, Acc1) ->
Type = tx_type(SignedTx),
TxHash = aetx_sign:hash(SignedTx),
LogEntry =
#tx_log_entry{
key = {TxHash, BlockHash},
value = #{ tx => SignedTx
, block_hash => BlockHash
, block_origin => chain
, type => Type } },
aesc_window:add(LogEntry, Acc1)
end, Acc, Txs)
end, TxLog, Found),
{TxLog1, C2};
Expand Down Expand Up @@ -833,8 +900,8 @@ stop_cond({any_after_block, Hash}, PrevHash, Found) ->
has_create_tx([]) ->
false;
has_create_tx([SignedTx|_] = Found) ->
case aetx:specialize_type(aetx_sign:tx(SignedTx)) of
{channel_create_tx, _} ->
case tx_type(SignedTx) of
channel_create_tx ->
{true, Found};
_ ->
false
Expand Down Expand Up @@ -936,18 +1003,23 @@ tx_location(TxHash, C) ->
cached_get({location, TxHash}, C, fun(C1) -> tx_location_(TxHash, C1) end).

tx_location_(TxHash, C) ->
L = case aec_chain:find_tx_with_location(TxHash) of
{L, Type, SignedTx} =
case aec_chain:find_tx_with_location(TxHash) of
none ->
lager:debug("couldn't find tx hash", []),
undefined;
{mempool, _} ->
{undefined, undefined, undefined};
{mempool, STx} ->
lager:debug("tx still in mempool", []),
undefined;
{BlockHash, _} ->
{undefined, tx_type(STx), STx};
{BlockHash, STx} ->
lager:debug("tx in Block ~p", [BlockHash]),
BlockHash
{BlockHash, tx_type(STx), STx}
end,
{L, update_tx_log(TxHash, L, C)}.
{L, update_tx_log(TxHash, SignedTx, L, Type, C)}.

tx_type(SignedTx) ->
{Type, _} = aetx:specialize_type(aetx_sign:tx(SignedTx)),
Type.

in_main_chain(BHash, C) ->
%% We don't want to use aec_chain_state:hash_is_in_main_chain/1, since we want to
Expand All @@ -959,14 +1031,18 @@ in_main_chain_(Hash, C) ->
Res = aec_chain_state:hash_is_in_main_chain(Hash, TopHash),
{Res, C1}.

update_tx_log(TxHash, BlockHash, #{tx_log := TxLog} =C)
update_tx_log(_, undefined, undefined, undefined, C) ->
%% don't add a non-existing tx
C;
update_tx_log(TxHash, SignedTx, BlockHash, Type, #{tx_log := TxLog} =C)
when is_binary(BlockHash) ->
C#{tx_log => aesc_window:add(
{ {TxHash, BlockHash},
#{ tx_hash => TxHash
, block_hash => BlockHash
, block_origin => chain } }, TxLog)};
update_tx_log(_, _, C) ->
LogEntry = #tx_log_entry{key = {TxHash, BlockHash},
value = #{ tx => SignedTx
, block_hash => BlockHash
, block_origin => chain
, type => Type } },
C#{tx_log => aesc_window:add(LogEntry, TxLog)};
update_tx_log(_, _, _, _, C) ->
C.

top_height(C) ->
Expand Down
29 changes: 17 additions & 12 deletions apps/aechannel/src/aesc_window.erl
Expand Up @@ -12,12 +12,12 @@

-export([record_fields/1]).

-export_type([window/0]).
-export_type([window/1]).

-define(KEEP, 10).

-type size() :: non_neg_integer().
-type entry() :: any().
-type entry() :: tuple().

%% This is a bounded buffer, optimized for performance.
%% A counter, `na`, keeps track of the number of elements
Expand All @@ -34,11 +34,14 @@
-record(w, { na = 0 :: non_neg_integer()
, nb = 0 :: non_neg_integer()
, keep = ?KEEP :: size()
, a = [] :: [entry()]
, b = [] :: [entry()]
, a = []
, b = []
}).

-type window() :: #w{}.
-type window(Entry) :: #w{ a :: [Entry]
, b :: [Entry]
}.
-type window() :: window(entry()).

%% ==================================================================
%% Tracing support
Expand All @@ -58,17 +61,18 @@ new(Sz) when is_integer(Sz), Sz >= 0 ->
%% When changing `keep`, we do not modify (e.g. truncate) the data set.
%% This is for performance reasons, and because we don't strive to keep
%% the exact size anyway: `keep` is an approximate number.
-spec change_keep(size(), window()) -> window().
-spec change_keep(size(), window(Entry)) -> window(Entry) when Entry :: entry().
change_keep(Keep, #w{} = W) when is_integer(Keep), Keep >= 0 ->
W#w{keep = Keep}.

-spec add(entry(), window()) -> window().
-spec add(Entry, window(Entry)) -> window(Entry) when Entry :: entry().
add(Item, #w{na = N, a = A, keep = Keep} = W) when N < Keep ->
W#w{na = N+1, a = [Item|A]};
add(Item, #w{na = PrevNa, a = A} = W) ->
W#w{na = 1, a = [Item], nb = PrevNa, b = A}.

-spec pop(window()) -> {entry(), window()} | error.
-spec pop(window(Entry)) -> {Entry, window(Entry)} | error
when Entry :: entry().
pop(#w{a = [], b = []}) ->
error;
pop(#w{a = [], b = [H|T], nb = N} = W) ->
Expand All @@ -79,13 +83,14 @@ pop(#w{a = [H|T], na = N} = W) ->
-spec size(window()) -> non_neg_integer().
size(#w{na = Na, nb = Nb}) -> Na + Nb.

-spec to_list(window()) -> [entry()].
-spec to_list(window(Entry)) -> [Entry] when Entry :: entry().
to_list(#w{a = A, b = B}) ->
A ++ B.

%% Like lists:keyfind/3. Finds the most recent match (if any),
%% since items are essentially stored in LIFO fashion.
-spec keyfind(any(), non_neg_integer(), window()) -> false | entry().
-spec keyfind(any(), non_neg_integer(), window(Entry)) -> false | Entry
when Entry :: entry().
keyfind(K, Pos, #w{a = A, b = B}) ->
case lists:keyfind(K, Pos, A) of
false ->
Expand All @@ -104,8 +109,8 @@ keymember(K, Pos, #w{a = A, b = B}) ->
%% element is not a map are skipped). If a value in the `KVL' is `undefined',
%% this will match either the value `undefined' or the key being missing.
%%
-spec info_find([{any(), any()}], non_neg_integer(), window()) ->
false | entry().
-spec info_find([{any(), any()}], non_neg_integer(), window(Entry)) ->
false | Entry when Entry :: entry().
info_find(KVL, Pos, #w{a = A, b = B}) when is_list(KVL) ->
case info_find_(KVL, Pos, A) of
false ->
Expand Down

0 comments on commit c39748a

Please sign in to comment.