Skip to content

Commit

Permalink
Change quorum_min/3 to quorum_max/3
Browse files Browse the repository at this point in the history
* Ensure rafter_config:quorum_max/3 supports even numbers of servers
* Make the code a bit clearer hopefully
* Add some comments
  • Loading branch information
andrewjstone committed Dec 28, 2013
1 parent 29f11dd commit 96da687
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
69 changes: 40 additions & 29 deletions src/rafter_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,38 @@
-include("rafter.hrl").

%% API
-export([quorum/3, quorum_min/3, voters/1, voters/2, followers/2,
-export([quorum/3, quorum_max/3, voters/1, voters/2, followers/2,
reconfig/2, allow_config/2, has_vote/2]).

%%====================================================================
%% API
%%====================================================================

-spec quorum_min(term(), #config{} | [], dict()) -> non_neg_integer().
quorum_min(_Me, #config{state=blank}, _) ->
-spec quorum_max(peer(), #config{} | [], dict()) -> non_neg_integer().
quorum_max(_Me, #config{state=blank}, _) ->
0;
quorum_min(Me, #config{state=stable, oldservers=OldServers}, Responses) ->
quorum_min(Me, OldServers, Responses);
quorum_min(Me, #config{state=staging, oldservers=OldServers}, Responses) ->
quorum_min(Me, OldServers, Responses);
quorum_min(Me, #config{state=transitional,
quorum_max(Me, #config{state=stable, oldservers=OldServers}, Responses) ->
quorum_max(Me, OldServers, Responses);
quorum_max(Me, #config{state=staging, oldservers=OldServers}, Responses) ->
quorum_max(Me, OldServers, Responses);
quorum_max(Me, #config{state=transitional,
oldservers=Old,
newservers=New}, Responses) ->
min(quorum_min(Me, Old, Responses), quorum_min(Me, New, Responses));
min(quorum_max(Me, Old, Responses), quorum_max(Me, New, Responses));

%% Responses doesn't contain the local response so it will be marked as 0
%% when it's a member of the consensus group. In this case we must
%% skip past it in a sorted list so we add 1 to the quorum offset.
quorum_min(_, [], _) ->
%% Sort the values received from the peers from lowest to highest
%% Peers that haven't responded have a 0 for their value.
%% This peer (Me) will always have the maximum value
quorum_max(_, [], _) ->
0;
quorum_min(Me, Servers, Responses) ->
Indexes = lists:sort(lists:map(fun(S) -> index(S, Responses) end, Servers)),
case lists:member(Me, Servers) of
true ->
lists:nth(length(Indexes) div 2 + 2, Indexes);
false ->
lists:nth(length(Indexes) div 2 + 1, Indexes)
end.
quorum_max(Me, Servers, Responses) when (length(Servers) rem 2) =:= 0->
Values = sorted_values(Me, Servers, Responses),
lists:nth(length(Values) div 2, Values);
quorum_max(Me, Servers, Responses) ->
Values = sorted_values(Me, Servers, Responses),
lists:nth(length(Values) div 2 + 1, Values).

-spec quorum(term(), #config{} | list(), dict()) -> boolean().
-spec quorum(peer(), #config{} | list(), dict()) -> boolean().
quorum(_Me, #config{state=blank}, _Responses) ->
false;
quorum(Me, #config{state=stable, oldservers=OldServers}, Responses) ->
Expand All @@ -63,7 +61,7 @@ quorum(Me, Servers, Responses) ->
end.

%% @doc list of voters excluding me
-spec voters(term(), #config{}) -> list().
-spec voters(peer(), #config{}) -> list().
voters(Me, Config) ->
lists:delete(Me, voters(Config)).

Expand All @@ -74,7 +72,7 @@ voters(#config{state=transitional, oldservers=Old, newservers=New}) ->
voters(#config{oldservers=Old}) ->
Old.

-spec has_vote(term(), #config{}) -> boolean().
-spec has_vote(peer(), #config{}) -> boolean().
has_vote(_Me, #config{state=blank}) ->
false;
has_vote(Me, #config{state=transitional, oldservers=Old, newservers=New})->
Expand All @@ -83,7 +81,7 @@ has_vote(Me, #config{oldservers=Old}) ->
lists:member(Me, Old).

%% @doc All followers. In staging, some followers are not voters.
-spec followers(term(), #config{}) -> list().
-spec followers(peer(), #config{}) -> list().
followers(Me, #config{state=transitional, oldservers=Old, newservers=New}) ->
lists:delete(Me, sets:to_list(sets:from_list(Old ++ New)));
followers(Me, #config{state=staging, oldservers=Old, newservers=New}) ->
Expand Down Expand Up @@ -114,11 +112,24 @@ allow_config(_Config, _NewServers) ->
%% Internal Functions
%%====================================================================

-spec index(atom() | {atom(), atom()}, dict()) -> non_neg_integer().
index(Peer, Responses) ->
-spec sorted_values(peer(), [peer()], dict()) -> [non_neg_integer()].
sorted_values(Me, Servers, Responses) ->
Vals = lists:sort(lists:map(fun(S) -> value(S, Responses) end, Servers)),
case lists:member(Me, Servers) of
true ->
%% Me is always in front because it is 0 from having no response
%% Strip it off the front, and add the max to the end of the list
[_ | T] = Vals,
lists:reverse([lists:max(Vals) | lists:reverse(T)]);
false ->
Vals
end.

-spec value(peer(), dict()) -> non_neg_integer().
value(Peer, Responses) ->
case dict:find(Peer, Responses) of
{ok, Index} ->
Index;
{ok, Value} ->
Value;
error ->
0
end.
4 changes: 2 additions & 2 deletions src/rafter_consensus_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ maybe_send_client_reply(_, _, State, _) ->
maybe_send_read_replies(#state{me=Me,
config=Config,
send_clock_responses=Responses}=State0) ->
Clock = rafter_config:quorum_min(Me, Config, Responses),
Clock = rafter_config:quorum_max(Me, Config, Responses),
{ok, Requests, State} = find_eligible_read_requests(Clock, State0),
NewState = send_client_read_replies(Requests, State),
NewState.
Expand Down Expand Up @@ -697,7 +697,7 @@ maybe_commit(#state{me=Me,
commit_index=CommitIndex,
config=Config,
responses=Responses}=State) ->
Min = rafter_config:quorum_min(Me, Config, Responses),
Min = rafter_config:quorum_max(Me, Config, Responses),
case Min > CommitIndex andalso safe_to_commit(Min, State) of
true ->
NewState = commit_entries(Min, State),
Expand Down
8 changes: 4 additions & 4 deletions test/rafter_config_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ eqc_test_() ->
?_assertEqual(true,
eqc:quickcheck(
?QC_OUT(eqc:numtests(50, eqc:conjunction(
[{prop_quorum_min,
prop_quorum_min()},
[{prop_quorum_max,
prop_quorum_max()},
{prop_config,
prop_config()}])))))}
]
Expand All @@ -68,11 +68,11 @@ cleanup(_) ->
%% EQC Properties
%% ====================================================================

prop_quorum_min() ->
prop_quorum_max() ->
?FORALL({Config, {Me, Responses}}, {config(), responses()},
begin
ResponsesDict = dict:from_list(Responses),
case rafter_config:quorum_min(Me, Config, ResponsesDict) of
case rafter_config:quorum_max(Me, Config, ResponsesDict) of
0 ->
true;
QuorumMin ->
Expand Down

0 comments on commit 96da687

Please sign in to comment.