Skip to content

Commit

Permalink
Merge 4e91fd5 into f6fba96
Browse files Browse the repository at this point in the history
  • Loading branch information
vihu committed Jun 29, 2018
2 parents f6fba96 + 4e91fd5 commit 9371fac
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 15 deletions.
40 changes: 33 additions & 7 deletions src/hbbft.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-module(hbbft).

-export([init/5,
-export([init/6,
start_on_demand/1,
input/2,
finalize_round/3,
next_round/1,
Expand All @@ -22,6 +23,7 @@
j :: non_neg_integer(),
round = 0 :: non_neg_integer(),
buf = [] :: [binary()],
max_buf = infinity :: infinity | pos_integer(),
acs :: hbbft_acs:acs_data(),
acs_init = false :: boolean(),
sent_txns = false :: boolean(),
Expand All @@ -40,6 +42,7 @@
j :: non_neg_integer(),
round = 0 :: non_neg_integer(),
buf = [] :: [binary()],
max_buf = infinity :: infinity | pos_integer(),
acs :: hbbft_acs:acs_serialized_data(),
acs_init = false :: boolean(),
sent_txns = false :: boolean(),
Expand All @@ -63,6 +66,7 @@
status(HBBFTData) ->
#{batch_size => HBBFTData#hbbft_data.batch_size,
buf => length(HBBFTData#hbbft_data.buf),
max_buf => HBBFTData#hbbft_data.max_buf,
round => HBBFTData#hbbft_data.round,
acs_init => HBBFTData#hbbft_data.acs_init,
acs => hbbft_acs:status(HBBFTData#hbbft_data.acs),
Expand All @@ -72,16 +76,34 @@ status(HBBFTData) ->
j => HBBFTData#hbbft_data.j
}.

-spec init(tpke_privkey:privkey(), pos_integer(), non_neg_integer(), non_neg_integer(), pos_integer()) -> hbbft_data().
init(SK, N, F, J, BatchSize) ->
#hbbft_data{secret_key=SK, n=N, f=F, j=J, batch_size=BatchSize, acs=hbbft_acs:init(SK, N, F, J)}.
-spec init(tpke_privkey:privkey(), pos_integer(), non_neg_integer(), non_neg_integer(), pos_integer(), infinity | pos_integer()) -> hbbft_data().
init(SK, N, F, J, BatchSize, MaxBuf) ->
#hbbft_data{secret_key=SK, n=N, f=F, j=J, batch_size=BatchSize, acs=hbbft_acs:init(SK, N, F, J), max_buf=MaxBuf}.

%% start acs on demand
-spec start_on_demand(hbbft_data()) -> {hbbft_data(), already_started | {send, [rbc_wrapped_output()]}}.
start_on_demand(Data = #hbbft_data{buf=Buf, n=N, secret_key=SK, batch_size=BatchSize, acs_init=false}) ->
%% pick proposed whichever is lesser from batchsize/n or buffer
Proposed = hbbft_utils:random_n(min((BatchSize div N), length(Buf)), lists:sublist(Buf, BatchSize)),
%% encrypt x -> tpke.enc(pk, proposed)
EncX = encrypt(tpke_privkey:public_key(SK), term_to_binary(Proposed)),
%% time to kick off a round
{NewACSState, {send, ACSResponse}} = hbbft_acs:input(Data#hbbft_data.acs, EncX),
%% add this to acs set in data and send out the ACS response(s)
{Data#hbbft_data{acs=NewACSState, acs_init=true},
{send, hbbft_utils:wrap({acs, Data#hbbft_data.round}, ACSResponse)}};
start_on_demand(Data) ->
{Data, already_started}.

%% someone submitting a transaction to the replica set
-spec input(hbbft_data(), binary()) -> {hbbft_data(), ok | {send, [rbc_wrapped_output()]}}.
input(Data = #hbbft_data{buf=Buf}, Txn) ->
-spec input(hbbft_data(), binary()) -> {hbbft_data(), ok | {send, [rbc_wrapped_output()]} | full}.
input(Data = #hbbft_data{buf=Buf, max_buf=MaxBuf}, Txn) when length(Buf) < MaxBuf->
%% add this txn to the the buffer
NewBuf = [Txn | Buf],
maybe_start_acs(Data#hbbft_data{buf=NewBuf}).
maybe_start_acs(Data#hbbft_data{buf=NewBuf});
input(Data = #hbbft_data{buf=_Buf}, _Txn) ->
%% drop the txn
{Data, full}.

%% The user has constructed something that looks like a block and is telling us which transactions
%% to remove from the buffer (accepted or invalid). Transactions missing causal context
Expand Down Expand Up @@ -274,6 +296,7 @@ deserialize(#hbbft_serialized_data{batch_size=BatchSize,
j=J,
round=Round,
buf=Buf,
max_buf=MaxBuf,
acs=ACSData,
acs_init=ACSInit,
sent_txns=SentTxns,
Expand All @@ -295,6 +318,7 @@ deserialize(#hbbft_serialized_data{batch_size=BatchSize,
j=J,
round=Round,
buf=Buf,
max_buf=MaxBuf,
acs=hbbft_acs:deserialize(ACSData, SK),
acs_init=ACSInit,
sent_txns=SentTxns,
Expand All @@ -321,6 +345,7 @@ serialize_hbbft_data(#hbbft_data{batch_size=BatchSize,
j=J,
round=Round,
buf=Buf,
max_buf=MaxBuf,
acs=ACSData,
acs_init=ACSInit,
sent_txns=SentTxns,
Expand All @@ -341,6 +366,7 @@ serialize_hbbft_data(#hbbft_data{batch_size=BatchSize,
f=F,
round=Round,
buf=Buf,
max_buf=MaxBuf,
acs=hbbft_acs:serialize(ACSData),
acs_init=ACSInit,
sent_txns=SentTxns,
Expand Down
70 changes: 64 additions & 6 deletions test/hbbft_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
two_actors_no_txns_test/1,
one_actor_missing_test/1,
two_actors_missing_test/1,
encrypt_decrypt_test/1
encrypt_decrypt_test/1,
start_on_demand_test/1
]).

all() ->
Expand All @@ -20,7 +21,8 @@ all() ->
two_actors_no_txns_test,
one_actor_missing_test,
two_actors_missing_test,
encrypt_decrypt_test
encrypt_decrypt_test,
start_on_demand_test
].

init_per_testcase(_, Config) ->
Expand Down Expand Up @@ -93,7 +95,7 @@ one_actor_no_txns_test(Config) ->
Module = proplists:get_value(module, Config),
PrivateKeys = proplists:get_value(privatekeys, Config),

StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize)} || {J, Sk} <- lists:zip(lists:seq(0, N - 1), PrivateKeys)],
StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize, infinity)} || {J, Sk} <- lists:zip(lists:seq(0, N - 1), PrivateKeys)],
Msgs = [ crypto:strong_rand_bytes(128) || _ <- lists:seq(1, N*10)],
%% send each message to a random subset of the HBBFT actors
{NewStates, Replies} = lists:foldl(fun(Msg, {States, Replies}) ->
Expand Down Expand Up @@ -127,7 +129,7 @@ two_actors_no_txns_test(Config) ->
Module = proplists:get_value(module, Config),
PrivateKeys = proplists:get_value(privatekeys, Config),

StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize)} || {J, Sk} <- lists:zip(lists:seq(0, N - 1), PrivateKeys)],
StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize, infinity)} || {J, Sk} <- lists:zip(lists:seq(0, N - 1), PrivateKeys)],
Msgs = [ crypto:strong_rand_bytes(128) || _ <- lists:seq(1, N*10)],
%% send each message to a random subset of the HBBFT actors
{NewStates, Replies} = lists:foldl(fun(Msg, {States, Replies}) ->
Expand Down Expand Up @@ -155,7 +157,7 @@ one_actor_missing_test(Config) ->
Module = proplists:get_value(module, Config),
PrivateKeys = proplists:get_value(privatekeys, Config),

StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize)} || {J, Sk} <- lists:zip(lists:seq(0, N - 2), lists:sublist(PrivateKeys, N-1))],
StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize, infinity)} || {J, Sk} <- lists:zip(lists:seq(0, N - 2), lists:sublist(PrivateKeys, N-1))],
Msgs = [ crypto:strong_rand_bytes(128) || _ <- lists:seq(1, N*10)],
%% send each message to a random subset of the HBBFT actors
{NewStates, Replies} = lists:foldl(fun(Msg, {States, Replies}) ->
Expand Down Expand Up @@ -189,7 +191,7 @@ two_actors_missing_test(Config) ->
Module = proplists:get_value(module, Config),
PrivateKeys = proplists:get_value(privatekeys, Config),

StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize)} || {J, Sk} <- lists:zip(lists:seq(0, N - 3), lists:sublist(PrivateKeys, N-2))],
StatesWithIndex = [{J, hbbft:init(Sk, N, F, J, BatchSize, infinity)} || {J, Sk} <- lists:zip(lists:seq(0, N - 3), lists:sublist(PrivateKeys, N-2))],
Msgs = [ crypto:strong_rand_bytes(128) || _ <- lists:seq(1, N*10)],
%% send each message to a random subset of the HBBFT actors
{NewStates, Replies} = lists:foldl(fun(Msg, {States, Replies}) ->
Expand Down Expand Up @@ -221,6 +223,62 @@ encrypt_decrypt_test(Config) ->
?assertEqual(PlainText, hbbft:decrypt(DecKey, Enc)),
ok.

start_on_demand_test(Config) ->
N = proplists:get_value(n, Config),
F = proplists:get_value(f, Config),
BatchSize = proplists:get_value(batchsize, Config),
PubKey = proplists:get_value(pubkey, Config),
PrivateKeys = proplists:get_value(privatekeys, Config),
Workers = [ element(2, hbbft_worker:start_link(N, F, I, tpke_privkey:serialize(SK), BatchSize, false)) || {I, SK} <- enumerate(PrivateKeys) ],

[W1, _W2 | RemainingWorkers] = Workers,

Msgs = [ crypto:strong_rand_bytes(128) || _ <- lists:seq(1, N*20)],

KnownMsg = crypto:strong_rand_bytes(128),
%% feed the badgers some msgs
lists:foreach(fun(Msg) ->
Destinations = random_n(rand:uniform(length(RemainingWorkers)), RemainingWorkers),
io:format("destinations ~p~n", [Destinations]),
[ok = hbbft_worker:submit_transaction(Msg, D) || D <- Destinations]
end, Msgs),

ok = hbbft_worker:submit_transaction(KnownMsg, W1),

_ = hbbft_worker:start_on_demand(W1),

%% wait for all the worker's mailboxes to settle and
%% wait for the chains to converge
ok = hbbft_ct_utils:wait_until(fun() ->
Chains = sets:from_list(lists:map(fun(W) ->
{ok, Blocks} = hbbft_worker:get_blocks(W),
Blocks
end, Workers)),

0 == lists:sum([element(2, erlang:process_info(W, message_queue_len)) || W <- Workers ]) andalso
1 == sets:size(Chains) andalso
0 /= length(hd(sets:to_list(Chains)))
end, 60*2, 500),


Chains = sets:from_list(lists:map(fun(W) ->
{ok, Blocks} = hbbft_worker:get_blocks(W),
Blocks
end, Workers)),
1 = sets:size(Chains),
[Chain] = sets:to_list(Chains),
io:format("chain is of height ~p~n", [length(Chain)]),
%% verify they are cryptographically linked
true = hbbft_worker:verify_chain(Chain, PubKey),
%% check all the transactions are unique
BlockTxns = lists:flatten([ hbbft_worker:block_transactions(B) || B <- Chain ]),
true = lists:member(KnownMsg, BlockTxns),
true = length(BlockTxns) == sets:size(sets:from_list(BlockTxns)),
%% check they're all members of the original message list
true = sets:is_subset(sets:from_list(BlockTxns), sets:from_list([KnownMsg | Msgs])),
io:format("chain contains ~p distinct transactions~n", [length(BlockTxns)]),
ok.

%% helper functions

enumerate(List) ->
Expand Down
10 changes: 8 additions & 2 deletions test/hbbft_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

-behaviour(gen_server).

-export([start_link/6, submit_transaction/2, get_blocks/1]).
-export([start_link/6, submit_transaction/2, start_on_demand/1, get_blocks/1]).
-export([verify_chain/2, block_transactions/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
Expand Down Expand Up @@ -30,6 +30,9 @@ start_link(N, F, ID, SK, BatchSize, ToSerialize) ->
submit_transaction(Msg, Pid) ->
gen_server:call(Pid, {submit_txn, Msg}, infinity).

start_on_demand(Pid) ->
gen_server:call(Pid, start_on_demand, infinity).

get_blocks(Pid) ->
gen_server:call(Pid, get_blocks, infinity).

Expand Down Expand Up @@ -84,10 +87,13 @@ init([N, F, ID, SK, BatchSize, ToSerialize]) ->
%% deserialize the secret key once
DSK = tpke_privkey:deserialize(SK),
%% init hbbft
HBBFT = hbbft:init(DSK, N, F, ID, BatchSize),
HBBFT = hbbft:init(DSK, N, F, ID, BatchSize, infinity),
%% store the serialized state and serialized SK
{ok, #state{hbbft=HBBFT, blocks=[], id=ID, n=N, sk=DSK, ssk=SK, to_serialize=ToSerialize}}.

handle_call(start_on_demand, _From, State = #state{hbbft=HBBFT, sk=SK}) ->
NewState = dispatch(hbbft:start_on_demand(maybe_deserialize_hbbft(HBBFT, SK)), State),
{reply, ok, NewState};
handle_call({submit_txn, Txn}, _From, State = #state{hbbft=HBBFT, sk=SK}) ->
NewState = dispatch(hbbft:input(maybe_deserialize_hbbft(HBBFT, SK), Txn), State),
{reply, ok, NewState};
Expand Down

0 comments on commit 9371fac

Please sign in to comment.