Skip to content

Commit

Permalink
Add an optional filter_fun to prevent invalid transactions in RBC
Browse files Browse the repository at this point in the history
To avoid wasting time proposing known invalid transactions into RBC, a
filter_fun can be used to pre-filter known invalid transactions before
proposing them. The transaction may turn out to be invalid later because
of other factors, but this reduces the incidence rate.
  • Loading branch information
Vagabond committed Jan 10, 2019
1 parent 44109bc commit 1343e7a
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 9 deletions.
47 changes: 39 additions & 8 deletions src/hbbft.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

-export([init/6,
init/7,
set_stamp_fun/4,
set_filter_fun/4,
start_on_demand/1,
input/2,
finalize_round/3,
Expand Down Expand Up @@ -38,7 +40,8 @@
sig_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), erlang_pbc:element()}},
thingtosign :: undefined | erlang_pbc:element(),
stampfun :: undefined | {atom(), atom(), list()},
stamps = [] :: [{non_neg_integer(), any()}]
stamps = [] :: [{non_neg_integer(), any()}],
filterfun :: undefined | {atom(), atom(), list()}
}).

-record(hbbft_serialized_data, {
Expand All @@ -59,7 +62,8 @@
dec_shares = #{} :: #{non_neg_integer() => {non_neg_integer(), binary()}},
thingtosign :: undefined | binary(),
stampfun :: undefined | {atom(), atom(), list()},
stamps = [] :: [{non_neg_integer(), any()}]
stamps = [] :: [{non_neg_integer(), any()}],
filterfun :: undefined | {atom(), atom(), list()}
}).

-type hbbft_data() :: #hbbft_data{}.
Expand Down Expand Up @@ -94,11 +98,19 @@ init(SK, N, F, J, BatchSize, MaxBuf) ->
init(SK, N, F, J, BatchSize, MaxBuf, {M, Fn, A}) ->
#hbbft_data{secret_key=SK, n=N, f=F, j=J, batch_size=BatchSize, acs=hbbft_acs:init(SK, N, F, J), max_buf=MaxBuf, stampfun={M, Fn, A}}.

-spec set_stamp_fun(atom(), atom(), list(), hbbft_data()) -> hbbft_data().
set_stamp_fun(M, F, A, Data) when is_atom(M), is_atom(F) ->
Data#hbbft_data{stampfun={M, F, A}}.

-spec set_filter_fun(atom(), atom(), list(), hbbft_data()) -> hbbft_data().
set_filter_fun(M, F, A, Data) when is_atom(M), is_atom(F) ->
Data#hbbft_data{filterfun={M, F, A}}.

%% start acs on demand
-spec start_on_demand(hbbft_data()) -> {hbbft_data(), already_started | {send, [rbc_wrapped_output()]}}.
start_on_demand(Data = #hbbft_data{buf=Buf, n=N, secret_key=SK, batch_size=BatchSize, acs_init=false}) ->
start_on_demand(Data0 = #hbbft_data{secret_key=SK, acs_init=false}) ->
%% pick proposed whichever is lesser from batchsize/n or buffer
Proposed = hbbft_utils:random_n(min((BatchSize div N), length(Buf)), lists:sublist(Buf, BatchSize)),
{Proposed, Data} = proposed(Data0),
%% encrypt x -> tpke.enc(pk, proposed)
Stamp = case Data#hbbft_data.stampfun of
undefined -> undefined;
Expand Down Expand Up @@ -286,13 +298,13 @@ handle_msg(_Data, _J, _Msg) ->
ignore.

-spec maybe_start_acs(hbbft_data()) -> {hbbft_data(), ok | {send, [rbc_wrapped_output()]}}.
maybe_start_acs(Data = #hbbft_data{n=N, secret_key=SK, batch_size=BatchSize}) ->
case length(Data#hbbft_data.buf) > BatchSize andalso Data#hbbft_data.acs_init == false of
maybe_start_acs(Data0 = #hbbft_data{secret_key=SK, batch_size=BatchSize}) ->
case length(Data0#hbbft_data.buf) > BatchSize andalso Data0#hbbft_data.acs_init == false of
true ->
%% compose a transaction bundle
%% get the top b elements from buf
%% pick a random B/N selection of them
Proposed = hbbft_utils:random_n(BatchSize div N, lists:sublist(Data#hbbft_data.buf, length(Data#hbbft_data.buf) - BatchSize + 1, BatchSize)),
{Proposed, Data} = proposed(Data0),
%% encrypt x -> tpke.enc(pk, proposed)
Stamp = case Data#hbbft_data.stampfun of
undefined -> undefined;
Expand All @@ -306,7 +318,7 @@ maybe_start_acs(Data = #hbbft_data{n=N, secret_key=SK, batch_size=BatchSize}) ->
{send, hbbft_utils:wrap({acs, Data#hbbft_data.round}, ACSResponse)}};
false ->
%% not enough transactions for this round yet
{Data, ok}
{Data0, ok}
end.

-spec encrypt(tpke_pubkey:pubkey(), binary()) -> binary().
Expand Down Expand Up @@ -461,3 +473,22 @@ group_by([], D) ->
lists:keysort(1, [{K, lists:sort(V)} || {K, V} <- dict:to_list(D)]);
group_by([{K, V}|T], D) ->
group_by(T, dict:append(K, V, D)).


proposed(Data = #hbbft_data{n=N, batch_size=BatchSize, buf=Buf}) ->
Proposed = hbbft_utils:random_n(min((BatchSize div N), length(Buf)), lists:sublist(Buf, BatchSize)),
case Data#hbbft_data.filterfun of
undefined ->
%% everything is valid
{Proposed, Data};
{M, F, A} ->
case lists:partition(fun(E) -> erlang:apply(M, F, [E|A]) end, Proposed) of
{Res, []} ->
%% no invalid transactions detected
{Res, Data};
{_, Invalid} ->
%% remove the invalid transactions from the buffer and retry
NewBuf = Data#hbbft_data.buf -- Invalid,
proposed(Data#hbbft_data{buf= NewBuf})
end
end.
56 changes: 55 additions & 1 deletion test/hbbft_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
-export([all/0, init_per_testcase/2, end_per_testcase/2]).
-export([
init_test/1,
filter_fun_test/1,
one_actor_no_txns_test/1,
two_actors_no_txns_test/1,
one_actor_missing_test/1,
two_actors_missing_test/1,
encrypt_decrypt_test/1,
start_on_demand_test/1
start_on_demand_test/1,
filter_fun/1
]).

all() ->
Expand Down Expand Up @@ -83,6 +85,56 @@ init_test(Config) ->
io:format("chain contains ~p distinct transactions~n", [length(BlockTxns)]),
ok.

filter_fun_test(Config) ->
N = proplists:get_value(n, Config),
F = proplists:get_value(f, Config),
BatchSize = proplists:get_value(batchsize, Config),
PubKey = proplists:get_value(pubkey, Config),
PrivateKeys = proplists:get_value(privatekeys, Config),
Workers = [ element(2, hbbft_worker:start_link(N, F, I, tpke_privkey:serialize(SK), BatchSize, false)) || {I, SK} <- enumerate(PrivateKeys) ],
[ hbbft_worker:set_filter_fun(?MODULE, filter_fun, [], W) || W<- Workers ],
Msgs = [ crypto:strong_rand_bytes(128) || _ <- lists:seq(1, N*20)],
%% feed the badgers some msgs
lists:foreach(fun(Msg) ->
Destinations = random_n(rand:uniform(N), Workers),
io:format("destinations ~p~n", [Destinations]),
[ok = hbbft_worker:submit_transaction(Msg, D) || D <- Destinations]
end, Msgs),

%% wait for all the worker's mailboxes to settle and
%% wait for the chains to converge
ok = hbbft_ct_utils:wait_until(fun() ->
Chains = sets:from_list(lists:map(fun(W) ->
{ok, Blocks} = hbbft_worker:get_blocks(W),
Blocks
end, Workers)),

0 == lists:sum([element(2, erlang:process_info(W, message_queue_len)) || W <- Workers ]) andalso
1 == sets:size(Chains) andalso
0 /= length(hd(sets:to_list(Chains)))
end, 60*2, 500),


Chains = sets:from_list(lists:map(fun(W) ->
{ok, Blocks} = hbbft_worker:get_blocks(W),
Blocks
end, Workers)),
1 = sets:size(Chains),
[Chain] = sets:to_list(Chains),
io:format("chain is of height ~p~n", [length(Chain)]),
%% verify they are cryptographically linked
true = hbbft_worker:verify_chain(Chain, PubKey),
%% check all the transactions are unique
BlockTxns = lists:flatten([ hbbft_worker:block_transactions(B) || B <- Chain ]),
true = length(BlockTxns) == sets:size(sets:from_list(BlockTxns)),
%% check they're all members of the original message list
true = sets:is_subset(sets:from_list(BlockTxns), sets:from_list(Msgs)),
%% check they all passed the filter fun
true = sets:is_subset(sets:from_list(BlockTxns), sets:from_list(lists:filter(fun filter_fun/1, Msgs))),
io:format("chain contains ~p distinct transactions~n", [length(BlockTxns)]),
ok.


one_actor_no_txns_test(Config) ->
N = proplists:get_value(n, Config),
F = proplists:get_value(f, Config),
Expand Down Expand Up @@ -303,3 +355,5 @@ merge_replies(N, NewReplies, Replies) ->
merge_replies(N-1, lists:keydelete(N, 1, NewReplies), lists:keystore(N, 1, Replies, NewSend))
end.

filter_fun(<<X:8/integer, _/binary>>) when X rem 2 == 1 -> false;
filter_fun(_) -> true.
7 changes: 7 additions & 0 deletions test/hbbft_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

-export([start_link/6, submit_transaction/2, start_on_demand/1, get_blocks/1]).
-export([verify_chain/2, block_transactions/1]).
-export([set_filter_fun/4]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

Expand Down Expand Up @@ -83,6 +84,9 @@ verify_block_fit([A, B | _], PubKey) ->
block_transactions(Block) ->
Block#block.transactions.

set_filter_fun(M, F, A, Pid) ->
gen_server:call(Pid, {set_filter_fun, M, F, A}).

init([N, F, ID, SK, BatchSize, ToSerialize]) ->
%% deserialize the secret key once
DSK = tpke_privkey:deserialize(SK),
Expand All @@ -99,6 +103,9 @@ handle_call({submit_txn, Txn}, _From, State = #state{hbbft=HBBFT, sk=SK}) ->
{reply, ok, NewState};
handle_call(get_blocks, _From, State) ->
{reply, {ok, State#state.blocks}, State};
handle_call({set_filter_fun, M, F, A}, _From, State = #state{hbbft=HBBFT}) ->
NewHBBFT = hbbft:set_filter_fun(M, F, A, HBBFT),
{reply, ok, State#state{hbbft=NewHBBFT}};
handle_call(Msg, _From, State) ->
io:format("unhandled msg ~p~n", [Msg]),
{reply, ok, State}.
Expand Down

0 comments on commit 1343e7a

Please sign in to comment.