Skip to content

Commit

Permalink
Pt 167781061 snapshot solo in ws api (#2647)
Browse files Browse the repository at this point in the history
* Working snapshot_solo test in sc_SUITE

* fix rebase-related error

* Improve snapshot test + refactor tests for better event scoping

* Remove unused vars & fix log printouts

* Improve error returns on snapshot errors

* Close potential event registation race condition re RPC + signing
  • Loading branch information
uwiger committed Aug 21, 2019
1 parent 5412325 commit d3381c7
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 104 deletions.
175 changes: 160 additions & 15 deletions apps/aechannel/src/aesc_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
%% API
-export([ start_link/1
, attach_responder/2 %% (fsm(), map())
, close_solo/1
, close_solo/1 %% (fsm())
, snapshot_solo/1 %% (fsm())
, reconnect_client/2 %% (fsm(), signed_tx())
, reconnect_client/3 %% (fsm(), Client :: pid(), signed_tx())
, connection_died/1 %% (fsm())
Expand Down Expand Up @@ -145,6 +146,10 @@ close_solo(Fsm) ->
lager:debug("close_solo(~p)", [Fsm]),
gen_statem:call(Fsm, close_solo).

snapshot_solo(Fsm) ->
lager:debug("snapshot_solo(~p)", [Fsm]),
gen_statem:call(Fsm, snapshot_solo).

-spec reconnect_client(pid(), aetx_sign:signed_tx()) -> ok | {error, any()}.
reconnect_client(Fsm, Tx) ->
reconnect_client(Fsm, self(), Tx).
Expand Down Expand Up @@ -749,6 +754,20 @@ awaiting_signature(cast, {?SIGNED, ?SHUTDOWN_ACK, SignedTx} = Msg,
report_on_chain_tx(close_mutual, SignedTx, D1),
close(close_mutual, D2)
end, D);
awaiting_signature(cast, {?SIGNED, snapshot_solo_tx, SignedTx} = Msg,
#data{op = #op_sign{ tag = snapshot_solo_tx
, data = OpData }} = D) ->
lager:debug("snapshot_solo_tx signed", []),
#op_data{updates = Updates} = OpData,
D1 = D#data{ log = log_msg(rcv, ?SIGNED, Msg, D#data.log)
, op = ?NO_OP },
case verify_signatures_onchain_check(pubkeys(me, D, SignedTx), SignedTx) of
ok ->
snapshot_solo_signed(SignedTx, Updates, D1);
{error,_} = Error ->
lager:debug("Failed signature check: ~p", [Error]),
next_state(open, D1)
end;
awaiting_signature(cast, {?SIGNED, close_solo_tx, SignedTx} = Msg,
#data{op = #op_sign{ tag = close_solo_tx
, data = OpData}} = D) ->
Expand Down Expand Up @@ -1383,37 +1402,48 @@ check_accept_msg(#{ chain_hash := ChainHash
end.

new_onchain_tx_for_signing(Type, Opts, D) ->
try new_onchain_tx_for_signing_(Type, Opts, D)
new_onchain_tx_for_signing(Type, Opts, fail, D).

new_onchain_tx_for_signing(Type, Opts, OnErr, D) when OnErr == fail;
OnErr == return ->
try new_onchain_tx_for_signing_(Type, Opts, OnErr, D)
catch
error:Reason ->
?LOG_CAUGHT(Reason),
error(Reason)
end.

new_onchain_tx_for_signing_(Type, Opts, D) ->
new_onchain_tx_for_signing_(Type, Opts, OnErr, D) ->
Defaults = tx_defaults(Type, Opts, D),
Opts1 = maps:merge(Defaults, Opts),
CurrHeight = curr_height(),
%% TODO PT-165214367: maybe set block_hash
BlockHash = ?NOT_SET_BLOCK_HASH,
{ok, Tx, Updates} = new_onchain_tx(Type, Opts1, D, CurrHeight),
case {aetx:min_fee(Tx, CurrHeight), aetx:fee(Tx)} of
{MinFee, Fee} when MinFee =< Fee ->
{ok, Tx, Updates, BlockHash};
{MinFee, Fee} ->
lager:debug("Fee (~p) is too low for ~p (Min: ~p)",
[Fee, Type, MinFee]),
error(too_low_fee)
case {new_onchain_tx(Type, Opts1, D, CurrHeight), OnErr} of
{{ok, Tx, Updates}, _} ->
case {aetx:min_fee(Tx, CurrHeight), aetx:fee(Tx)} of
{MinFee, Fee} when MinFee =< Fee ->
{ok, Tx, Updates, BlockHash};
{MinFee, Fee} ->
lager:debug("Fee (~p) is too low for ~p (Min: ~p)",
[Fee, Type, MinFee]),
error(too_low_fee)
end;
{{error, Reason}, fail} ->
error(Reason);
{{error, Reason}, return} ->
{error, Reason}
end.

-spec new_onchain_tx(channel_create_tx
| channel_close_mutual_tx
| channel_deposit_tx
| channel_withdraw_tx
| channel_snapshot_solo_tx
| channel_close_solo_tx
| channel_slash_tx
| channel_settle_tx, map(), #data{}, aec_blocks:height()) ->
{ok, aetx:tx(), [aesc_offchain_update:update()]}.
{ok, aetx:tx(), [aesc_offchain_update:update()]} | {error, atom()}.
new_onchain_tx(channel_close_mutual_tx, #{ acct := From } = Opts,
#data{opts = DOpts, on_chain_id = Chan, state = State}, _) ->
#{initiator := Initiator,
Expand Down Expand Up @@ -1541,6 +1571,30 @@ new_onchain_tx(channel_close_solo_tx, Opts,
, ttl => TTL},
{ok, Tx} = new_onchain_tx_(aesc_close_solo_tx, Opts1, CurrHeight),
{ok, Tx, []};
new_onchain_tx(channel_snapshot_solo_tx, Opts,
#data{ on_chain_id = ChanId
, state = State } = D, CurrHeight) ->
Account = my_account(D),
TTL = adjust_ttl(maps:get(ttl, Opts, 0)),
{Round, SignedTx} = aesc_offchain_state:get_latest_signed_tx(State),
case aesc_utils:is_offchain_tx(SignedTx) of
true ->
case later_round_than_onchain(ChanId, Round) of
true ->
Payload = aetx_sign:serialize_to_binary(SignedTx),
Opts1 =
Opts#{ channel_id => aeser_id:create(channel, ChanId)
, from_id => aeser_id:create(account, Account)
, payload => Payload
, ttl => TTL },
{ok, Tx} = new_onchain_tx_(aesc_snapshot_solo_tx, Opts1, CurrHeight),
{ok, Tx, []};
false ->
{error, already_onchain}
end;
false ->
{error, not_offchain_tx}
end;
new_onchain_tx(channel_slash_tx, Opts,
#data{ on_chain_id = ChanId
, opts = #{initiator := Initiator,
Expand All @@ -1564,6 +1618,7 @@ new_onchain_tx_(Mod, Opts, CurrHeight) when Mod =:= aesc_create_tx;
Mod =:= aesc_withdraw_tx;
Mod =:= aesc_deposit_tx;
Mod =:= aesc_close_solo_tx;
Mod =:= aesc_snapshot_solo_tx;
Mod =:= aesc_slash_tx;
Mod =:= aesc_settle_tx ->
case maps:is_key(fee, Opts) of
Expand Down Expand Up @@ -1664,6 +1719,11 @@ close_solo_tx_for_signing(D) ->
Account = my_account(D),
new_onchain_tx_for_signing(channel_close_solo_tx, #{acct => Account}, D).

snapshot_solo_tx_for_signing(D) ->
Account = my_account(D),
new_onchain_tx_for_signing(channel_snapshot_solo_tx, #{acct => Account},
_OnErr = return, D).

tx_defaults(Type, Opts, #data{ on_chain_id = ChanId } = D) ->
Default = tx_defaults_(Type, Opts, D),
Default#{ channel_id => ChanId
Expand All @@ -1678,6 +1738,8 @@ tx_defaults_(channel_withdraw_tx, Opts, D) ->
tx_defaults_(channel_deposit_tx, Opts, D); %% same as deposit defaults
tx_defaults_(channel_slash_tx = Tx, Opts, D) ->
default_ttl(Tx, Opts, D);
tx_defaults_(channel_snapshot_solo_tx = Tx, Opts, D) ->
default_ttl(Tx, Opts, D);
tx_defaults_(channel_close_solo_tx = Tx, Opts, D) ->
default_ttl(Tx, Opts, D);
tx_defaults_(channel_settle_tx = Tx, Opts, D) ->
Expand Down Expand Up @@ -2617,11 +2679,33 @@ report_leave(#data{state = State} = D) ->
cache_state(D).

report_on_chain_tx(Info, SignedTx, D) ->
{Type,_} = aetx:specialize_type(aetx_sign:innermost_tx(SignedTx)),
report_on_chain_tx(Info, signed_tx_type(SignedTx), SignedTx, D).

report_on_chain_tx(Info, Type, SignedTx, D) ->
report(on_chain_tx, #{ tx => SignedTx
, type => Type
, info => Info}, D).

maybe_act_on_tx(channel_snapshot_solo_tx, SignedTx, D) ->
MyPubkey = my_account(D),
case call_cb(aetx_sign:innermost_tx(SignedTx), origin, []) of
MyPubkey ->
lager:debug("snapshot_solo_tx from my client", []),
#data{op = OrigOp} = D,
%% We want to order a local min_depth watch, but not log it as a wait state op.
{ok, D1} = start_min_depth_watcher({?MIN_DEPTH, ?WATCH_SNAPSHOT_SOLO}, SignedTx, [], D),
D1#data{op = OrigOp};
OtherPubkey ->
lager:debug("snapshot_solo_tx from other client (~p)", [OtherPubkey]),
D
end;
maybe_act_on_tx(_, _, D) ->
D.

signed_tx_type(SignedTx) ->
{Type, _} = aetx:specialize_type(aetx_sign:innermost_tx(SignedTx)),
Type.

report(Tag, Info, D) ->
report_info(do_rpt(Tag, D), #{ type => report
, tag => Tag
Expand Down Expand Up @@ -2787,6 +2871,11 @@ verify_signatures_offchain(ChannelPubkey, Pubkeys, SignedTx) ->
_ -> {error, bad_signature}
end.

later_round_than_onchain(ChannelPubkey, Round) ->
{ok, Channel} = aec_chain:get_channel(ChannelPubkey),
OnChainRound = aesc_channels:round(Channel),
Round > OnChainRound.

check_tx_and_verify_signatures(SignedTx, Updates, Mod, Data, Pubkeys, ErrTypeMsg) ->
ChannelPubkey = cur_channel_id(Data),
MyPubkey = my_account(Data),
Expand Down Expand Up @@ -3274,6 +3363,28 @@ settle_signed(SignedTx, Updates, #data{ on_chain_id = ChId} = D) ->
close(Error, D)
end.

snapshot_solo_signed(SignedTx, _Updates, #data{ on_chain_id = ChId } = D) ->
lager:debug("SignedTx = ~p", [SignedTx]),
case aec_chain:get_channel(ChId) of
{ok, Ch} ->
case aesc_channels:is_active(Ch) of
true ->
case aec_tx_pool:push(SignedTx) of
ok ->
ok;
{error, Reason} ->
lager:debug("Snapshot_solo tx failed: ~p", [Reason]),
ok
end;
false ->
lager:debug("Snapshot_solo tx failed: channel not active", []),
ok
end,
next_state(open, D);
{error,_} = Error ->
close(Error, D)
end.

close_solo_signed(SignedTx, _Updates, #data{ on_chain_id = ChId } = D) ->
lager:debug("SignedTx = ~p", [SignedTx]),
case aec_chain:get_channel(ChId) of
Expand Down Expand Up @@ -3631,6 +3742,28 @@ handle_call_(St, close_solo, From, D) ->
keep_state(D)
end
end;
handle_call_(St, snapshot_solo = Req, From, D) ->
case St of
channel_closing ->
keep_state(D, [{reply, From, {error, channel_closing}}]);
_ ->
D1 = log(rcv, Req, Req, D),
case snapshot_solo_tx_for_signing(D1) of
{ok, SnapshotSoloTx, Updates, BlockHash} ->
case request_signing(
snapshot_solo_tx, SnapshotSoloTx, Updates, BlockHash, D1, defer) of
{ok, Send, D2, Actions} ->
%% reply before sending sig request
gen_statem:reply(From, ok),
Send(),
next_state(awaiting_signature, set_ongoing(D2), Actions);
{error, _} = Error ->
keep_state(D1, [{reply, From, Error}])
end;
{error, _} = Error ->
keep_state(D1, [{reply, From, Error}])
end
end;
handle_call_(St, slash, From, D) ->
case St of
channel_closing ->
Expand Down Expand Up @@ -3752,9 +3885,21 @@ handle_common_event_(cast, {?CHANNEL_CHANGED, #{tx_hash := TxHash} = Info} = Msg
close({error, unexpected_tx_on_chain}, D);
_ ->
lager:debug("Fsm notes channel change ~p", [Info]),
report_on_chain_tx(channel_changed, maps:get(tx, Info), D1),
keep_state(D1)
SignedTx = maps:get(tx, Info),
Type = signed_tx_type(SignedTx),
report_on_chain_tx(channel_changed, Type, SignedTx, D1),
keep_state(maybe_act_on_tx(Type, SignedTx, D1))
end;
handle_common_event_(cast, {?MIN_DEPTH_ACHIEVED, _ChainId,
?WATCH_SNAPSHOT_SOLO, TxHash}, _St, _, D) ->
%% This min_depth notification is handled unconventionally, since only the
%% requesting client is to get a min_depth notification, and the fsm is not
%% locked while waiting for it.
lager:debug("Received min_depth confirmation of snapshot_solo_tx ~p", [TxHash]),
report(info, #{ event => min_depth_achieved
, type => aesc_snapshot_solo_tx:type()
, tx_hash => TxHash }, D),
keep_state(D);
handle_common_event_(cast, {?CHANNEL_CLOSED, #{tx := SignedTx} = _Info} = Msg, _St, _, D) ->
%% Start a minimum-depth watch, then (if confirmed) terminate
report_on_chain_tx(channel_closed, SignedTx, D),
Expand Down
3 changes: 3 additions & 0 deletions apps/aechannel/src/aesc_fsm.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
, error => true
, debug => false
, on_chain_tx => true
, min_depth => true
}).

-define(TIMER_SUBST(State),
Expand Down Expand Up @@ -63,6 +64,7 @@
-define(WATCH_DEP, deposit).
-define(WATCH_WDRAW, withdraw).
-define(WATCH_CLOSED, closed).
-define(WATCH_SNAPSHOT_SOLO, snapshot_solo).
-define(MIN_DEPTH, min_depth).

-define(NO_OP, no_op).
Expand Down Expand Up @@ -178,6 +180,7 @@
| slash_tx
| deposit_tx
| withdraw_tx
| snapshot_solo_tx
| close_solo_tx
| settle_tx
| ?FND_CREATED
Expand Down
3 changes: 3 additions & 0 deletions apps/aehttp/src/sc_ws_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ process_fsm_(#{type := sign,
orelse Tag =:= deposit_created
orelse Tag =:= withdraw_tx
orelse Tag =:= withdraw_created
orelse Tag =:= snapshot_solo_tx
orelse Tag =:= shutdown
orelse Tag =:= shutdown_ack
orelse Tag =:= funding_created
Expand Down Expand Up @@ -196,6 +197,8 @@ process_fsm_(#{type := report,
case {Tag, Event} of
{info, {died, _}} -> #{event => <<"died">>};
{info, _} when is_atom(Event) -> #{event => atom_to_binary(Event, utf8)};
{info, #{event := _} = Info} ->
Info;
{on_chain_tx, #{tx := Tx} = Info} ->
EncodedTx = aeser_api_encoder:encode(
transaction,
Expand Down

0 comments on commit d3381c7

Please sign in to comment.