diff --git a/apps/aechannel/src/aesc_fsm.erl b/apps/aechannel/src/aesc_fsm.erl index 133c047c0b..85dfb1b319 100644 --- a/apps/aechannel/src/aesc_fsm.erl +++ b/apps/aechannel/src/aesc_fsm.erl @@ -183,6 +183,8 @@ [aesc_offchain_update:update()]} | {reestablish, OffChainTx :: aetx_sign:signed_tx()}. +-type log() :: aesc_window:window({Op :: atom(), Type :: atom(), erlang:timestamp(), Msg :: term()}). + -opaque state_name() :: awaiting_open | awaiting_reestablish | initialized @@ -218,7 +220,7 @@ , latest = undefined :: latest_op() , ongoing_update = false :: boolean() , last_reported_update :: undefined | non_neg_integer() - , log :: aesc_window:window() + , log :: log() , strict_checks = true :: boolean() }). diff --git a/apps/aechannel/src/aesc_fsm_min_depth_watcher.erl b/apps/aechannel/src/aesc_fsm_min_depth_watcher.erl index 4d2ffdcf3e..8ee898f853 100644 --- a/apps/aechannel/src/aesc_fsm_min_depth_watcher.erl +++ b/apps/aechannel/src/aesc_fsm_min_depth_watcher.erl @@ -18,6 +18,19 @@ -export([get_txs_since/2]). -define(GEN_SERVER_OPTS, []). +-define(IS_INFO_OF_SCENARIO_HAS_TX(X), + ( is_tuple(X) + andalso (tuple_size(X) =:= 2) + andalso ( is_tuple(element(1, X)) + andalso (tuple_size(element(1, X)) =:= 2) + andalso is_binary(element(1, element(1, X))) + andalso is_binary(element(2, element(1, X))) + ) + andalso ( is_map(element(2, X)) + andalso (map_size(element(2, X)) =:= 3 + orelse map_size(element(2, X)) =:= 4) + ) + ) ). -define(IS_SCENARIO(S), ( (S =:= top) orelse (S =:= next_block) @@ -25,7 +38,7 @@ orelse ( is_tuple(S) andalso (tuple_size(S) =:= 2) andalso (element(1, S) =:= has_tx) - %% TODO Check element(2, S) + andalso ?IS_INFO_OF_SCENARIO_HAS_TX(element(2, S)) ) ) ). @@ -34,11 +47,21 @@ , chan_vsn , last_block %% last time we updated channel vsn , last_top %% the block hash of the last check - , tx_log = aesc_window:new() - , rpt_log = aesc_window:new() + , tx_log = aesc_window:new() :: tx_log() + , rpt_log = aesc_window:new() :: rpt_log() , closing = false , requests = [] }). +-record(tx_log_entry, + { key :: tx_log_entry_key() + , value :: tx_log_entry_value() + }). + +-record(rpt_log_entry, + { key :: {changed_on_chain | closing_on_chain | closed_on_chain, tx_hash()} + , value :: #{ atom() := term() } + }). + -type mode() :: close | unlock | tx_hash @@ -64,7 +87,11 @@ -type block_hash() :: binary(). -type tx_hash() :: binary(). --type scenario() :: top | next_block | fork_switch | {has_tx, tx_hash()}. +-type info_of_scenario_has_tx() :: {tx_log_entry_key(), tx_log_entry_value()}. +-type scenario() :: top + | next_block + | fork_switch + | {has_tx, info_of_scenario_has_tx()}. -type chan_vsn() :: undefined | { aesc_channels:round() , aesc_channels:solo_round() , aesc_channels:is_active() @@ -77,9 +104,18 @@ , lock_period := aesc_channels:lock_period() , locked_until := aesc_channels:locked_until() }. +-type tx_log_entry_key() :: {tx_hash(), block_hash()}. +-type tx_log_entry_value() :: #{ tx => aetx_sign:signed_tx() + , block_hash := block_hash() + , block_origin := chain + , type := aetx:tx_type() }. +-type tx_log() :: aesc_window:window(#tx_log_entry{}). + +-type rpt_log() :: aesc_window:window(#rpt_log_entry{}). + -type cache() :: #{ mode := mode() - , tx_log := aesc_window:window() - , rpt_log := aesc_window:window() + , tx_log := tx_log() + , rpt_log := rpt_log() , last_block := block_hash() , chan_vsn := chan_vsn() , block_hash => block_hash() @@ -341,11 +377,12 @@ init_cache_(#st{ last_block = LastBlock %% The TxLog relies on the custom tx events (remember that it's a bounded list) find_tx_in_block(BHash, TxLog) -> - case aesc_window:info_find([{block_hash, BHash}], 2, TxLog) of + Filter = [{block_hash, BHash}], + case aesc_window:info_find(Filter, #tx_log_entry.value, TxLog) of false -> false; - LastTx -> - LastTx + X = #tx_log_entry{} -> + {X#tx_log_entry.key, X#tx_log_entry.value} end. %% We do an async_dirty activity for minimal overhead. Note that the analysis @@ -475,9 +512,9 @@ check_req(#{mode := unlock} = R, #st{chan_id = ChId, chan_vsn = Vsn} = St, C) -> check_req(#{mode := watch} = R, #st{chan_id = ChId} = St, #{scenario := Scenario, top_hash := Hash } = C) when ?IS_SCENARIO(Scenario) -> + lager:debug("Scenario = ~p", [Scenario]), case Scenario of - {has_tx, {_, #{ type := TxType - , tx_hash := TxHash }}} -> + {has_tx, {{TxHash,_}, #{ type := TxType }}} -> {Ch, C1} = get_channel(ChId, Hash, C), {#{ height := H }, C2} = top_info(Hash, C1), case Ch of @@ -495,6 +532,7 @@ check_req(#{mode := watch} = R, #st{chan_id = ChId} = St, fork_switch -> watch_for_channel_change(R, St, C); _ -> + lager:debug("Other scenario - ignore (~p)", [Scenario]), {R, C} end; check_req(#{mode := tx_hash, tx_hash := TxHash, min_depth := MinDepth} = R, @@ -597,10 +635,11 @@ report_closed_on_chain(#{ tx_type := _TxType maybe_report(RptKey, Info, Rpt, C) when is_function(Rpt) -> RptLog = maps:get(rpt_log, C), - case aesc_window:keyfind(RptKey, 1, RptLog) of + case aesc_window:keyfind(RptKey, #rpt_log_entry.key, RptLog) of false -> C1 = call_rpt(Rpt, C), - C1#{rpt_log => aesc_window:add({RptKey, Info}, RptLog)}; + LogEntry = #rpt_log_entry{key = RptKey, value = Info}, + C1#{rpt_log => aesc_window:add(LogEntry, RptLog)}; _ -> C end. @@ -666,15 +705,16 @@ handle_ch_status_changed(ChId, #{ last_block := Last lager:debug("channel status changed, Hash=~p, Last=~p", [Hash,Last]), {TxLog, C1} = get_txs_since({any_after_block, Last}, Hash, ChId, C), lager:debug("New TxLog: ~p", [TxLog]), - #{tx_hash := Tx, block_hash := BlockHash} = get_latest_tx(TxLog), - lager:debug("Latest Tx = ~p, Hash = ~p", [Tx, BlockHash]), - {Status, C2} = get_basic_ch_status_(ChId, BlockHash, C1), + {TxHash, #{tx := Tx, block_hash := BlockHash}, C2} = + get_latest_tx(TxLog, C1), + lager:debug("Latest TxHash = ~p, Hash = ~p", [TxHash, BlockHash]), + {Status, C3} = get_basic_ch_status_(ChId, BlockHash, C2), lager:debug("ChStatus(~p) = ~p", [BlockHash, Status]), %% Assert that this is really a changed channel object - true = channel_status_changed(maps:get(vsn, Status), C2), + true = channel_status_changed(maps:get(vsn, Status), C3), lager:debug("asserted status changed", []), Status1 = Status#{tx => Tx}, - {Status1, C2#{{channel, BlockHash} => Status1}}. + {Status1, C3#{{channel, BlockHash} => Status1}}. get_basic_ch_status_(ChId, #{ top_hash := Hash } = C) -> get_basic_ch_status_(ChId, Hash, C). @@ -732,18 +772,40 @@ log_tx(#{ tx_hash := TxHash , block_origin := _Origin , type := _Type } = Info, #st{tx_log = TxLog} = St) -> Key = {TxHash, BlockHash}, - case aesc_window:keymember(Key, 1, TxLog) of + case aesc_window:keymember(Key, #tx_log_entry.key, TxLog) of true -> St; false -> - TxLog1 = aesc_window:add({Key, Info}, TxLog), + LogEntry = #tx_log_entry{key = Key, + value = maps:with([block_hash, + block_origin, + type], Info)}, + TxLog1 = aesc_window:add(LogEntry, TxLog), St#st{tx_log = TxLog1} end. -get_latest_tx(Log) -> - {{{_TxHash, _BlockHash}, Tx}, _TxLog1} = aesc_window:pop(Log), - lager:debug("Tx = ~p", [Tx]), - Tx. +get_latest_tx(Log, C) -> + {#tx_log_entry{key = {TxHash, _BlockHash}, + value = TxInfo}, + TxLog1} = + aesc_window:pop(Log), + lager:debug("TxInfo = ~p", [TxInfo]), + case ensure_signed_tx_included(TxHash, TxInfo, C) of + {#{tx := undefined}, C1} -> + lager:debug("No such signed tx (~p)", [TxHash]), + get_latest_tx(TxLog1, C1#{tx_log => TxLog1}); + {TxInfo1, C1} -> + {TxHash, TxInfo1, C1} + end. + +ensure_signed_tx_included(TxHash, TxInfo, C) -> + case maps:is_key(tx, TxInfo) of + true -> + {TxInfo, C}; + false -> + {SignedTx, C1} = get_signed_tx(TxHash, C), + {TxInfo#{tx => SignedTx}, C1} + end. %% Find recent txs. Two different stop conditions: %% - {any_after_block, BlockHash}: Stop as soon as a block is found with @@ -765,12 +827,17 @@ get_txs_since(StopCond, Hash, ChId, C) -> lists:foldl( fun({BlockHash, Txs}, Acc) -> lists:foldl( - fun(TxHash, Acc1) -> - aesc_window:add( - { {TxHash, BlockHash}, - #{ tx_hash => TxHash - , block_hash => BlockHash - , block_origin => chain } }, Acc1) + fun(SignedTx, Acc1) -> + Type = tx_type(SignedTx), + TxHash = aetx_sign:hash(SignedTx), + LogEntry = + #tx_log_entry{ + key = {TxHash, BlockHash}, + value = #{ tx => SignedTx + , block_hash => BlockHash + , block_origin => chain + , type => Type } }, + aesc_window:add(LogEntry, Acc1) end, Acc, Txs) end, TxLog, Found), {TxLog1, C2}; @@ -833,8 +900,8 @@ stop_cond({any_after_block, Hash}, PrevHash, Found) -> has_create_tx([]) -> false; has_create_tx([SignedTx|_] = Found) -> - case aetx:specialize_type(aetx_sign:tx(SignedTx)) of - {channel_create_tx, _} -> + case tx_type(SignedTx) of + channel_create_tx -> {true, Found}; _ -> false @@ -936,18 +1003,23 @@ tx_location(TxHash, C) -> cached_get({location, TxHash}, C, fun(C1) -> tx_location_(TxHash, C1) end). tx_location_(TxHash, C) -> - L = case aec_chain:find_tx_with_location(TxHash) of + {L, Type, SignedTx} = + case aec_chain:find_tx_with_location(TxHash) of none -> lager:debug("couldn't find tx hash", []), - undefined; - {mempool, _} -> + {undefined, undefined, undefined}; + {mempool, STx} -> lager:debug("tx still in mempool", []), - undefined; - {BlockHash, _} -> + {undefined, tx_type(STx), STx}; + {BlockHash, STx} -> lager:debug("tx in Block ~p", [BlockHash]), - BlockHash + {BlockHash, tx_type(STx), STx} end, - {L, update_tx_log(TxHash, L, C)}. + {L, update_tx_log(TxHash, SignedTx, L, Type, C)}. + +tx_type(SignedTx) -> + {Type, _} = aetx:specialize_type(aetx_sign:tx(SignedTx)), + Type. in_main_chain(BHash, C) -> %% We don't want to use aec_chain_state:hash_is_in_main_chain/1, since we want to @@ -959,14 +1031,18 @@ in_main_chain_(Hash, C) -> Res = aec_chain_state:hash_is_in_main_chain(Hash, TopHash), {Res, C1}. -update_tx_log(TxHash, BlockHash, #{tx_log := TxLog} =C) +update_tx_log(_, undefined, undefined, undefined, C) -> + %% don't add a non-existing tx + C; +update_tx_log(TxHash, SignedTx, BlockHash, Type, #{tx_log := TxLog} =C) when is_binary(BlockHash) -> - C#{tx_log => aesc_window:add( - { {TxHash, BlockHash}, - #{ tx_hash => TxHash - , block_hash => BlockHash - , block_origin => chain } }, TxLog)}; -update_tx_log(_, _, C) -> + LogEntry = #tx_log_entry{key = {TxHash, BlockHash}, + value = #{ tx => SignedTx + , block_hash => BlockHash + , block_origin => chain + , type => Type } }, + C#{tx_log => aesc_window:add(LogEntry, TxLog)}; +update_tx_log(_, _, _, _, C) -> C. top_height(C) -> diff --git a/apps/aechannel/src/aesc_window.erl b/apps/aechannel/src/aesc_window.erl index 23bc17b3a2..d89b14f94f 100644 --- a/apps/aechannel/src/aesc_window.erl +++ b/apps/aechannel/src/aesc_window.erl @@ -12,12 +12,12 @@ -export([record_fields/1]). --export_type([window/0]). +-export_type([window/1]). -define(KEEP, 10). -type size() :: non_neg_integer(). --type entry() :: any(). +-type entry() :: tuple(). %% This is a bounded buffer, optimized for performance. %% A counter, `na`, keeps track of the number of elements @@ -34,11 +34,14 @@ -record(w, { na = 0 :: non_neg_integer() , nb = 0 :: non_neg_integer() , keep = ?KEEP :: size() - , a = [] :: [entry()] - , b = [] :: [entry()] + , a = [] + , b = [] }). --type window() :: #w{}. +-type window(Entry) :: #w{ a :: [Entry] + , b :: [Entry] + }. +-type window() :: window(entry()). %% ================================================================== %% Tracing support @@ -58,17 +61,18 @@ new(Sz) when is_integer(Sz), Sz >= 0 -> %% When changing `keep`, we do not modify (e.g. truncate) the data set. %% This is for performance reasons, and because we don't strive to keep %% the exact size anyway: `keep` is an approximate number. --spec change_keep(size(), window()) -> window(). +-spec change_keep(size(), window(Entry)) -> window(Entry) when Entry :: entry(). change_keep(Keep, #w{} = W) when is_integer(Keep), Keep >= 0 -> W#w{keep = Keep}. --spec add(entry(), window()) -> window(). +-spec add(Entry, window(Entry)) -> window(Entry) when Entry :: entry(). add(Item, #w{na = N, a = A, keep = Keep} = W) when N < Keep -> W#w{na = N+1, a = [Item|A]}; add(Item, #w{na = PrevNa, a = A} = W) -> W#w{na = 1, a = [Item], nb = PrevNa, b = A}. --spec pop(window()) -> {entry(), window()} | error. +-spec pop(window(Entry)) -> {Entry, window(Entry)} | error + when Entry :: entry(). pop(#w{a = [], b = []}) -> error; pop(#w{a = [], b = [H|T], nb = N} = W) -> @@ -79,13 +83,14 @@ pop(#w{a = [H|T], na = N} = W) -> -spec size(window()) -> non_neg_integer(). size(#w{na = Na, nb = Nb}) -> Na + Nb. --spec to_list(window()) -> [entry()]. +-spec to_list(window(Entry)) -> [Entry] when Entry :: entry(). to_list(#w{a = A, b = B}) -> A ++ B. %% Like lists:keyfind/3. Finds the most recent match (if any), %% since items are essentially stored in LIFO fashion. --spec keyfind(any(), non_neg_integer(), window()) -> false | entry(). +-spec keyfind(any(), non_neg_integer(), window(Entry)) -> false | Entry + when Entry :: entry(). keyfind(K, Pos, #w{a = A, b = B}) -> case lists:keyfind(K, Pos, A) of false -> @@ -104,8 +109,8 @@ keymember(K, Pos, #w{a = A, b = B}) -> %% element is not a map are skipped). If a value in the `KVL' is `undefined', %% this will match either the value `undefined' or the key being missing. %% --spec info_find([{any(), any()}], non_neg_integer(), window()) -> - false | entry(). +-spec info_find([{any(), any()}], non_neg_integer(), window(Entry)) -> + false | Entry when Entry :: entry(). info_find(KVL, Pos, #w{a = A, b = B}) when is_list(KVL) -> case info_find_(KVL, Pos, A) of false ->