diff --git a/src/hbbft.erl b/src/hbbft.erl index f7073fc..f2573c8 100644 --- a/src/hbbft.erl +++ b/src/hbbft.erl @@ -1,6 +1,7 @@ -module(hbbft). -export([init/5, + start_on_demand/1, input/2, finalize_round/3, next_round/1, @@ -22,6 +23,7 @@ j :: non_neg_integer(), round = 0 :: non_neg_integer(), buf = [] :: [binary()], + max_buf = infinity :: infinity, acs :: hbbft_acs:acs_data(), acs_init = false :: boolean(), sent_txns = false :: boolean(), @@ -40,6 +42,7 @@ j :: non_neg_integer(), round = 0 :: non_neg_integer(), buf = [] :: [binary()], + max_buf = infinity :: infinity, acs :: hbbft_acs:acs_serialized_data(), acs_init = false :: boolean(), sent_txns = false :: boolean(), @@ -63,6 +66,7 @@ status(HBBFTData) -> #{batch_size => HBBFTData#hbbft_data.batch_size, buf => length(HBBFTData#hbbft_data.buf), + max_buf => HBBFTData#hbbft_data.max_buf, round => HBBFTData#hbbft_data.round, acs_init => HBBFTData#hbbft_data.acs_init, acs => hbbft_acs:status(HBBFTData#hbbft_data.acs), @@ -76,12 +80,30 @@ status(HBBFTData) -> init(SK, N, F, J, BatchSize) -> #hbbft_data{secret_key=SK, n=N, f=F, j=J, batch_size=BatchSize, acs=hbbft_acs:init(SK, N, F, J)}. +%% start acs on demand +-spec start_on_demand(hbbft_data()) -> {hbbft_data(), already_started | {send, [rbc_wrapped_output()]}}. +start_on_demand(Data = #hbbft_data{buf=Buf, n=N, secret_key=SK, batch_size=BatchSize, acs_init=false}) -> + %% pick proposed whichever is lesser from batchsize/n or buffer + Proposed = hbbft_utils:random_n(min((BatchSize div N), length(Buf)), lists:sublist(Buf, BatchSize)), + %% encrypt x -> tpke.enc(pk, proposed) + EncX = encrypt(tpke_privkey:public_key(SK), term_to_binary(Proposed)), + %% time to kick off a round + {NewACSState, {send, ACSResponse}} = hbbft_acs:input(Data#hbbft_data.acs, EncX), + %% add this to acs set in data and send out the ACS response(s) + {Data#hbbft_data{acs=NewACSState, acs_init=true}, + {send, hbbft_utils:wrap({acs, Data#hbbft_data.round}, ACSResponse)}}; +start_on_demand(Data) -> + {Data, already_started}. + %% someone submitting a transaction to the replica set --spec input(hbbft_data(), binary()) -> {hbbft_data(), ok | {send, [rbc_wrapped_output()]}}. -input(Data = #hbbft_data{buf=Buf}, Txn) -> +-spec input(hbbft_data(), binary()) -> {hbbft_data(), ok | {send, [rbc_wrapped_output()]} | full}. +input(Data = #hbbft_data{buf=Buf, max_buf=MaxBuf}, Txn) when length(Buf) < MaxBuf-> %% add this txn to the the buffer NewBuf = [Txn | Buf], - maybe_start_acs(Data#hbbft_data{buf=NewBuf}). + maybe_start_acs(Data#hbbft_data{buf=NewBuf}); +input(Data = #hbbft_data{buf=_Buf}, _Txn) -> + %% drop the txn + {Data, full}. %% The user has constructed something that looks like a block and is telling us which transactions %% to remove from the buffer (accepted or invalid). Transactions missing causal context @@ -274,6 +296,7 @@ deserialize(#hbbft_serialized_data{batch_size=BatchSize, j=J, round=Round, buf=Buf, + max_buf=MaxBuf, acs=ACSData, acs_init=ACSInit, sent_txns=SentTxns, @@ -295,6 +318,7 @@ deserialize(#hbbft_serialized_data{batch_size=BatchSize, j=J, round=Round, buf=Buf, + max_buf=MaxBuf, acs=hbbft_acs:deserialize(ACSData, SK), acs_init=ACSInit, sent_txns=SentTxns, @@ -321,6 +345,7 @@ serialize_hbbft_data(#hbbft_data{batch_size=BatchSize, j=J, round=Round, buf=Buf, + max_buf=MaxBuf, acs=ACSData, acs_init=ACSInit, sent_txns=SentTxns, @@ -341,6 +366,7 @@ serialize_hbbft_data(#hbbft_data{batch_size=BatchSize, f=F, round=Round, buf=Buf, + max_buf=MaxBuf, acs=hbbft_acs:serialize(ACSData), acs_init=ACSInit, sent_txns=SentTxns, diff --git a/test/hbbft_SUITE.erl b/test/hbbft_SUITE.erl index 263de38..a6fc574 100644 --- a/test/hbbft_SUITE.erl +++ b/test/hbbft_SUITE.erl @@ -10,7 +10,8 @@ two_actors_no_txns_test/1, one_actor_missing_test/1, two_actors_missing_test/1, - encrypt_decrypt_test/1 + encrypt_decrypt_test/1, + start_on_demand_test/1 ]). all() -> @@ -20,7 +21,8 @@ all() -> two_actors_no_txns_test, one_actor_missing_test, two_actors_missing_test, - encrypt_decrypt_test + encrypt_decrypt_test, + start_on_demand_test ]. init_per_testcase(_, Config) -> @@ -221,6 +223,62 @@ encrypt_decrypt_test(Config) -> ?assertEqual(PlainText, hbbft:decrypt(DecKey, Enc)), ok. +start_on_demand_test(Config) -> + N = proplists:get_value(n, Config), + F = proplists:get_value(f, Config), + BatchSize = proplists:get_value(batchsize, Config), + PubKey = proplists:get_value(pubkey, Config), + PrivateKeys = proplists:get_value(privatekeys, Config), + Workers = [ element(2, hbbft_worker:start_link(N, F, I, tpke_privkey:serialize(SK), BatchSize, false)) || {I, SK} <- enumerate(PrivateKeys) ], + + [W1, _W2 | RemainingWorkers] = Workers, + + Msgs = [ crypto:strong_rand_bytes(128) || _ <- lists:seq(1, N*20)], + + KnownMsg = crypto:strong_rand_bytes(128), + %% feed the badgers some msgs + lists:foreach(fun(Msg) -> + Destinations = random_n(rand:uniform(length(RemainingWorkers)), RemainingWorkers), + io:format("destinations ~p~n", [Destinations]), + [ok = hbbft_worker:submit_transaction(Msg, D) || D <- Destinations] + end, Msgs), + + ok = hbbft_worker:submit_transaction(KnownMsg, W1), + + _ = hbbft_worker:start_on_demand(W1), + + %% wait for all the worker's mailboxes to settle and + %% wait for the chains to converge + ok = hbbft_ct_utils:wait_until(fun() -> + Chains = sets:from_list(lists:map(fun(W) -> + {ok, Blocks} = hbbft_worker:get_blocks(W), + Blocks + end, Workers)), + + 0 == lists:sum([element(2, erlang:process_info(W, message_queue_len)) || W <- Workers ]) andalso + 1 == sets:size(Chains) andalso + 0 /= length(hd(sets:to_list(Chains))) + end, 60*2, 500), + + + Chains = sets:from_list(lists:map(fun(W) -> + {ok, Blocks} = hbbft_worker:get_blocks(W), + Blocks + end, Workers)), + 1 = sets:size(Chains), + [Chain] = sets:to_list(Chains), + io:format("chain is of height ~p~n", [length(Chain)]), + %% verify they are cryptographically linked + true = hbbft_worker:verify_chain(Chain, PubKey), + %% check all the transactions are unique + BlockTxns = lists:flatten([ hbbft_worker:block_transactions(B) || B <- Chain ]), + true = lists:member(KnownMsg, BlockTxns), + true = length(BlockTxns) == sets:size(sets:from_list(BlockTxns)), + %% check they're all members of the original message list + true = sets:is_subset(sets:from_list(BlockTxns), sets:from_list([KnownMsg | Msgs])), + io:format("chain contains ~p distinct transactions~n", [length(BlockTxns)]), + ok. + %% helper functions enumerate(List) -> diff --git a/test/hbbft_worker.erl b/test/hbbft_worker.erl index 0e79e06..c9b6647 100644 --- a/test/hbbft_worker.erl +++ b/test/hbbft_worker.erl @@ -2,7 +2,7 @@ -behaviour(gen_server). --export([start_link/6, submit_transaction/2, get_blocks/1]). +-export([start_link/6, submit_transaction/2, start_on_demand/1, get_blocks/1]). -export([verify_chain/2, block_transactions/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). @@ -30,6 +30,9 @@ start_link(N, F, ID, SK, BatchSize, ToSerialize) -> submit_transaction(Msg, Pid) -> gen_server:call(Pid, {submit_txn, Msg}, infinity). +start_on_demand(Pid) -> + gen_server:call(Pid, start_on_demand, infinity). + get_blocks(Pid) -> gen_server:call(Pid, get_blocks, infinity). @@ -88,6 +91,9 @@ init([N, F, ID, SK, BatchSize, ToSerialize]) -> %% store the serialized state and serialized SK {ok, #state{hbbft=HBBFT, blocks=[], id=ID, n=N, sk=DSK, ssk=SK, to_serialize=ToSerialize}}. +handle_call(start_on_demand, _From, State = #state{hbbft=HBBFT, sk=SK}) -> + NewState = dispatch(hbbft:start_on_demand(maybe_deserialize_hbbft(HBBFT, SK)), State), + {reply, ok, NewState}; handle_call({submit_txn, Txn}, _From, State = #state{hbbft=HBBFT, sk=SK}) -> NewState = dispatch(hbbft:input(maybe_deserialize_hbbft(HBBFT, SK), Txn), State), {reply, ok, NewState};