From bf3b58960830880e50b390112936261a57771594 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Fri, 21 Sep 2018 13:18:32 -0700 Subject: [PATCH 1/2] Add a event stamp function for attaching causality information to RBC To provide some semblence of causality we allow the user of hbbft to provide a 'stamp fun' that allows them to attach some causality information to the Reliable Broadcast payload. This can be a timestamp, a monotonic counter, a vector clock, whatever. The resulting 'stamp' is encrypted along with the other transactions and is thus subject to the consensus rules around agreeing on transactions. --- src/hbbft.erl | 40 ++++++++++++++++++++++++++++++---------- test/hbbft_SUITE.erl | 4 ++-- test/hbbft_worker.erl | 2 +- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/hbbft.erl b/src/hbbft.erl index 893a0cf..c771c39 100644 --- a/src/hbbft.erl +++ b/src/hbbft.erl @@ -1,6 +1,7 @@ -module(hbbft). -export([init/6, + init/7, start_on_demand/1, input/2, finalize_round/3, @@ -35,7 +36,9 @@ dec_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), erlang_pbc:element()}}, decrypted = #{} :: #{non_neg_integer() => [binary()]}, sig_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), erlang_pbc:element()}}, - thingtosign :: undefined | erlang_pbc:element() + thingtosign :: undefined | erlang_pbc:element(), + stampfun :: undefined | {atom(), atom(), list()}, + stamps = [] :: list() }). -record(hbbft_serialized_data, { @@ -54,7 +57,9 @@ decrypted = #{} :: #{non_neg_integer() => [binary()]}, sig_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), binary()}}, dec_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), binary()}}, - thingtosign :: undefined | binary() + thingtosign :: undefined | binary(), + stampfun :: undefined | {atom(), atom(), list()}, + stamps = [] :: list() }). -type hbbft_data() :: #hbbft_data{}. @@ -83,13 +88,21 @@ status(HBBFTData) -> init(SK, N, F, J, BatchSize, MaxBuf) -> #hbbft_data{secret_key=SK, n=N, f=F, j=J, batch_size=BatchSize, acs=hbbft_acs:init(SK, N, F, J), max_buf=MaxBuf}. +-spec init(tpke_privkey:privkey(), pos_integer(), non_neg_integer(), non_neg_integer(), pos_integer(), infinity | pos_integer(), {atom(), atom(), list()}) -> hbbft_data(). +init(SK, N, F, J, BatchSize, MaxBuf, {M, Fn, A}) -> + #hbbft_data{secret_key=SK, n=N, f=F, j=J, batch_size=BatchSize, acs=hbbft_acs:init(SK, N, F, J), max_buf=MaxBuf, stampfun={M, Fn, A}}. + %% start acs on demand -spec start_on_demand(hbbft_data()) -> {hbbft_data(), already_started | {send, [rbc_wrapped_output()]}}. start_on_demand(Data = #hbbft_data{buf=Buf, n=N, secret_key=SK, batch_size=BatchSize, acs_init=false}) -> %% pick proposed whichever is lesser from batchsize/n or buffer Proposed = hbbft_utils:random_n(min((BatchSize div N), length(Buf)), lists:sublist(Buf, BatchSize)), %% encrypt x -> tpke.enc(pk, proposed) - EncX = encrypt(tpke_privkey:public_key(SK), term_to_binary(Proposed)), + Stamp = case Data#hbbft_data.stampfun of + undefined -> undefined; + {M, F, A} -> erlang:apply(M, F, A) + end, + EncX = encrypt(tpke_privkey:public_key(SK), term_to_binary({Stamp, Proposed})), %% time to kick off a round {NewACSState, {send, ACSResponse}} = hbbft_acs:input(Data#hbbft_data.acs, EncX), %% add this to acs set in data and send out the ACS response(s) @@ -138,7 +151,7 @@ next_round(Data = #hbbft_data{secret_key=SK, n=N, f=F, j=J}) -> acs_init=false, acs_results=[], sent_txns=false, sent_sig=false, dec_shares=#{}, decrypted=#{}, - sig_shares=#{}, thingtosign=undefined}, + sig_shares=#{}, thingtosign=undefined, stamps=[]}, maybe_start_acs(NewData). -spec next_round(hbbft_data(), pos_integer(), [binary()]) -> {hbbft_data(), ok | {send, []}}. @@ -152,7 +165,7 @@ next_round(Data = #hbbft_data{secret_key=SK, n=N, f=F, j=J, buf=Buf}, NextRound, acs_init=false, acs_results=[], sent_txns=false, sent_sig=false, dec_shares=#{}, decrypted=#{}, buf=NewBuf, - sig_shares=#{}, thingtosign=undefined}, + sig_shares=#{}, thingtosign=undefined, stamps=[]}, maybe_start_acs(NewData). -spec round(hbbft_data()) -> non_neg_integer(). @@ -162,7 +175,7 @@ round(_Data=#hbbft_data{round=Round}) -> -spec handle_msg(hbbft_data(), non_neg_integer(), acs_msg() | dec_msg() | sign_msg()) -> {hbbft_data(), ok | defer | {send, [hbbft_utils:multicast(dec_msg() | sign_msg()) | rbc_wrapped_output() | bba_wrapped_output()]} | - {result, {transactions, [binary()]}} | + {result, {transactions, list(), [binary()]}} | {result, {signature, binary()}}}. handle_msg(Data = #hbbft_data{round=R}, _J, {{acs, R2}, _ACSMsg}) when R2 > R -> %% ACS requested we defer this message for now @@ -207,20 +220,23 @@ handle_msg(Data = #hbbft_data{round=R}, J, {dec, R, I, Share}) -> error -> {Data#hbbft_data{dec_shares=NewShares}, ok}; Decrypted -> - NewDecrypted = maps:put(I, binary_to_term(Decrypted), Data#hbbft_data.decrypted), + {Stamp, Transactions} = binary_to_term(Decrypted), + NewDecrypted = maps:put(I, Transactions, Data#hbbft_data.decrypted), + Stamps = [{I, Stamp} | Data#hbbft_data.stamps], case maps:size(NewDecrypted) == length(Data#hbbft_data.acs_results) andalso not Data#hbbft_data.sent_txns of true -> %% we did it! %% Combine all unique messages into a single list TransactionsThisRound = lists:usort(lists:flatten(maps:values(NewDecrypted))), + StampsThisRound = lists:usort(Stamps), %% return the transactions we agreed on to the user %% we have no idea which transactions are valid, invalid, out of order or missing %% causal context (eg. a nonce is not monotonic) so we return them to the user to let them %% figure it out. We expect the user to call finalize_round/3 once they've decided what they want to accept %% from this set of transactions. - {Data#hbbft_data{dec_shares=NewShares, decrypted=NewDecrypted, sent_txns=true}, {result, {transactions, TransactionsThisRound}}}; + {Data#hbbft_data{dec_shares=NewShares, decrypted=NewDecrypted, stamps=Stamps, sent_txns=true}, {result, {transactions, StampsThisRound, TransactionsThisRound}}}; false -> - {Data#hbbft_data{dec_shares=NewShares, decrypted=NewDecrypted}, ok} + {Data#hbbft_data{dec_shares=NewShares, decrypted=NewDecrypted, stamps=Stamps}, ok} end end end; @@ -268,7 +284,11 @@ maybe_start_acs(Data = #hbbft_data{n=N, secret_key=SK, batch_size=BatchSize}) -> %% pick a random B/N selection of them Proposed = hbbft_utils:random_n(BatchSize div N, lists:sublist(Data#hbbft_data.buf, length(Data#hbbft_data.buf) - BatchSize + 1, BatchSize)), %% encrypt x -> tpke.enc(pk, proposed) - EncX = encrypt(tpke_privkey:public_key(SK), term_to_binary(Proposed)), + Stamp = case Data#hbbft_data.stampfun of + undefined -> undefined; + {M, F, A} -> erlang:apply(M, F, A) + end, + EncX = encrypt(tpke_privkey:public_key(SK), term_to_binary({Stamp, Proposed})), %% time to kick off a round {NewACSState, {send, ACSResponse}} = hbbft_acs:input(Data#hbbft_data.acs, EncX), %% add this to acs set in data and send out the ACS response(s) diff --git a/test/hbbft_SUITE.erl b/test/hbbft_SUITE.erl index 892caf1..4abacac 100644 --- a/test/hbbft_SUITE.erl +++ b/test/hbbft_SUITE.erl @@ -112,7 +112,7 @@ one_actor_no_txns_test(Config) -> DistinctResults = sets:from_list([BVal || {result, {_, BVal}} <- sets:to_list(ConvergedResults)]), %% check all N actors returned the same result ?assertEqual(1, sets:size(DistinctResults)), - {_, AcceptedMsgs} = lists:unzip(lists:flatten(sets:to_list(DistinctResults))), + {_, _, AcceptedMsgs} = lists:unzip3(lists:flatten(sets:to_list(DistinctResults))), %% check all the Msgs are actually from the original set ?assert(sets:is_subset(sets:from_list(lists:flatten(AcceptedMsgs)), sets:from_list(Msgs))), ok. @@ -174,7 +174,7 @@ one_actor_missing_test(Config) -> DistinctResults = sets:from_list([BVal || {result, {_, BVal}} <- sets:to_list(ConvergedResults)]), %% check all N actors returned the same result ?assertEqual(1, sets:size(DistinctResults)), - {_, AcceptedMsgs} = lists:unzip(lists:flatten(sets:to_list(DistinctResults))), + {_, _, AcceptedMsgs} = lists:unzip3(lists:flatten(sets:to_list(DistinctResults))), %% check all the Msgs are actually from the original set ?assert(sets:is_subset(sets:from_list(lists:flatten(AcceptedMsgs)), sets:from_list(Msgs))), ok. diff --git a/test/hbbft_worker.erl b/test/hbbft_worker.erl index 7d4febd..a40d4c7 100644 --- a/test/hbbft_worker.erl +++ b/test/hbbft_worker.erl @@ -137,7 +137,7 @@ handle_info(Msg, State) -> dispatch({NewHBBFT, {send, ToSend}}, State) -> do_send(ToSend, State), State#state{hbbft=maybe_serialize_HBBFT(NewHBBFT, State#state.to_serialize)}; -dispatch({NewHBBFT, {result, {transactions, Txns}}}, State) -> +dispatch({NewHBBFT, {result, {transactions, _, Txns}}}, State) -> NewBlock = case State#state.blocks of [] -> %% genesis block From 560b3f0b4b34dc2b7f9236a3a42d18dd85f9944f Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Fri, 21 Sep 2018 14:35:42 -0700 Subject: [PATCH 2/2] Better spec for stamp list --- src/hbbft.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hbbft.erl b/src/hbbft.erl index c771c39..411de90 100644 --- a/src/hbbft.erl +++ b/src/hbbft.erl @@ -38,7 +38,7 @@ sig_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), erlang_pbc:element()}}, thingtosign :: undefined | erlang_pbc:element(), stampfun :: undefined | {atom(), atom(), list()}, - stamps = [] :: list() + stamps = [] :: [{non_neg_integer(), any()}] }). -record(hbbft_serialized_data, { @@ -59,7 +59,7 @@ dec_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), binary()}}, thingtosign :: undefined | binary(), stampfun :: undefined | {atom(), atom(), list()}, - stamps = [] :: list() + stamps = [] :: [{non_neg_integer(), any()}] }). -type hbbft_data() :: #hbbft_data{}.