Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed May 16, 2013
0 parents commit 42c2eea
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 0 deletions.
39 changes: 39 additions & 0 deletions include/rafter.hrl
@@ -0,0 +1,39 @@
%% Transport Independent MESSAGES
-record(request_vote, {
msg_id :: binary(), %% for message uniqueness
term :: non_neg_integer(),
from :: term(),
last_log_index :: non_neg_integer(),
last_log_term :: non_neg_term()}).

-record(vote, {
msg_id :: binary(), %% Same Id a request_vote
from :: term(),
term :: non_neg_integer(),
success :: boolean()}).

-record(append_entries, {
msg_id :: binary(), %% for message uniqueness
term :: non_neg_integer(),
from :: term(),
last_log_index :: non_neg_integer(),
last_log_term :: non_neg_term(),
entries :: term(),
commitIndex :: non_neg_integer()}).

-record(append_entries_rpy, {
msg_id :: binary(), %% Same Id as append_entries
from :: term(),
term :: non_neg_integer(),
success :: boolean()});

%% Log Details
-record(log_state, {
current_term :: non_neg_integer(),
voted_for :: term(),
entries:: [term()]}).

-record(log_entry, {
term :: non_neg_integer(),
index :: non_neg_integer(),
command :: binary()}).
Binary file added rebar
Binary file not shown.
12 changes: 12 additions & 0 deletions src/rafter.app.src
@@ -0,0 +1,12 @@
{application, rafter,
[
{description, ""},
{vsn, "1"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { rafter_app, []}},
{env, []}
]}.
16 changes: 16 additions & 0 deletions src/rafter_app.erl
@@ -0,0 +1,16 @@
-module(rafter_app).

-behaviour(application).

%% Application callbacks
-export([start/2, stop/1]).

%% ===================================================================
%% Application callbacks
%% ===================================================================

start(_StartType, _StartArgs) ->
rafter_sup:start_link().

stop(_State) ->
ok.
183 changes: 183 additions & 0 deletions src/rafter_consensus_fsm.erl
@@ -0,0 +1,183 @@
-module(rafter_consensus_fsm).

-behaviour(gen_fsm).

-define(ELECTION_TIMEOUT_MIN, 150).
-define(ELECTION_TIMEOUT_MAX, 300).

%% API
-export([start_link/0]).

%% gen_fsm callbacks
-export([init/1, code_change/4, handle_event/3, handle_info/3,
handle_sync_event/4, terminate/3]).

%% States
-export([leader/2, follower/2, candidate/2]).

-record(state, {
leader :: string(),
term = 0 :: non_neg_integer(),
voted_for :: term(),
last_log_term :: non_neg_integer(),
last_log_index :: non_neg_integer(),

%% The last time a timer was created
timer_start:: non_neg_integer(),

%% leader state
followers = dict:new() :: dict:dict(),

%% Responses from RPCs to other servers
responses = dict:new() :: dict:dict(),

%% All servers making up the ensemble
me :: string(),
peers :: list(string()),

%% Different Transports can be plugged in (erlang messaging, tcp, udp, etc...)
%% To get this thing implemented quickly, erlang messaging is hardcoded for now
transport = erlang :: erlang}).

start_link() ->
gen_fsm:start_link(?MODULE, [Self, Peers], []).

%%=============================================================================
%% gen_fsm callbacks
%%=============================================================================

init([]) ->
random:seed(),
Me = rafter_config:get_id(),
Peers = rafter_config:get_peers(),
State = #state{peers=Peers, me=Me, timer_created=os:timestamp()},
{ok, follower, State, election_timeout()}.

%%=============================================================================
%% States
%%=============================================================================

follower(timeout, State) ->
{ok, candidate, State, election_timeout()};
follower(#request_vote{from=CandidateId}=RequestVote, State) ->
NewState = set_term(RequestVote#request_vote.term, State),
{ok, Vote} = vote(RequestVote, NewState),
%% rafter_log:write(NewState),
transfer:send(CandidateId, Vote);
case Vote#vote.success of
true ->
{ok, follower, NewState, election_timeout()};
false ->
{ok, follower, NewState, election_timeout(State#state.timer_start)}
end;
follower(#append_entries{}=AppendEntries, State) ->
{ok, follower, State, election_timeout()}.

candidate(timeout, #state{term=CurrentTerm}=State) ->
NewState = State#state{term = CurrentTerm + 1,
responses = dict:new()},
request_votes(State),
{ok, candidate, NewState, election_timeout()};
candidate(#vote{term=VoteTerm, success=false, from=From}=Vote, #state{term=Term}=State)
when VoteTerm > Term ->
{ok, follower, State#state{responses=dict:new()}, election_timeout()};
candidate(#vote{success=false, from=From}=Vote, State#state{responses=Responses}) ->
NewState = State#state{responses = dict:store(From, false, Responses)},
{ok, candidate, NewState, election_timeout()};
candidate(#vote{success=true, term=Term}, State#state{responses=Responses}) ->
NewResponses = dict:store(From, true, Responses),
case is_leader(NewResponses) of
true ->
NewState = State#state{responses=dict:new()},
ok = gen_fsm:send_event(self(), become_leader),
{ok, leader, NewState};
false ->
NewState = State#state{responses=NewResponses},
{ok, candidate, NewState, election_timeout()}
end;
candidate(#request_vote{term=RequestTerm}, #state{term=Term}=State) when RequestTerm > Term ->
{ok, follower, State#state{term = RequestTerm, responses=dict:new()}, election_timeout()};
candidate(#request_vote{}, State) ->
{ok, candidate, State, election_timeout()};

candidate(#append_entries{term=RequestTerm}, State) ->
%% TODO: Should we reset the current term in State here?
{ok, follower, State, election_timeout()}.

leader(become_leader, State) ->
Followers = initialize_followers(State),
NewState = State#state{followers=Followers},
ok = send_empty_append_entries(NewState),
%% TODO: Put a timeout for retries here?
{ok, leader, NewState};
leader(#append_entries_rpy{from=From, success=false}, #state{followers=Followers}=State) ->
NextIndex = decrement_follower_index(From, Followers),
ok = send_append_entries(From, NextIndex),
NewState = State#state{followers=dict:store(From, NextIndex, Followers)},
{ok, leader, NewState};
leader(#append_entries_rpy{from=From, success=true}, #state{responses=Responses}=State) ->

%%=============================================================================
%% Internal Functions
%%=============================================================================

set_term(Term, #state{term=CurrentTerm}=State) when Term < CurrentTerm ->
State;
set_term(Term, #state{term=CurrentTerm}=State) when Term > CurrentTerm ->
State#state{term=Term, voted_for=undefined};
set_term(Term, #state{term=Term}=State) ->
State.

vote(#request_vote{term=Term}=RequestVote, #state{term=CurrentTerm, me=Me})
when Term < CurrentTerm ->
fail_vote(RequestVote, CurrentTerm, Me);
vote(#request_vote{from=CandidateId, term=CurrentTerm}=RequestVote,
#state{voted_for=CandidateId, term=CurrentTerm, me=Me}=State) ->
maybe_successful_vote(RequestVote, CurrentTerm, Me, State);
vote(#request_vote{term=CurrentTerm}=RequestVote,
#state{voted_for=undefined, term=CurrentTerm, me=Me}=State) ->
maybe_successful_vote(RequestVote, CurrentTerm, Me, State);
vote(#request_vote{from=CandidateId, term=CurrentTerm}=RequestVote,
#state{voted_for=AnotherId, term=CurrentTerm, me=Me})
when AnotherId != CandidateId ->
fail_vote(RequestVote, CurrentTerm, Me).

maybe_successful_vote(RequestVote, CurrentTerm, Me, State) ->
case check_log(RequestVote, State) of
true ->
successful_vote(RequestVote, CurrentTerm, Me);
false ->
fail_vote(RequestVote, CurrentTerm, Me)
end.

successful_vote(#request_vote{msg_id=MsgId}, CurrentTerm, Me) ->
{ok, #vote{msg_id=MsgId, term=CurrentTerm, from=Me, success=true}.

fail_vote(#request_vote{msg_id=MsgId}, CurrentTerm, Me) ->
{ok, #vote{msg_id=MsgId, term=CurrentTerm, from=Me, success=false}.

check_log(#request_vote{last_log_term=CandidateLogTerm},
#state{last_log_term=LogTerm}) when CandidateLogTerm > LogTerm ->
true;
check_log(#request_vote{last_log_term=CandidateLogTerm},
#state{last_log_term=LogTerm}) when CandidateLogTerm < LogTerm ->
false;
check_log(#request_vote{last_log_index=CandidateLogIndex}, #state{last_log_index=LogIndex})
when CandidateLogIndex > LogIndex->
true;
check_log(#request_vote{last_log_index=LogIndex}, #state{last_log_index=LogIndex})
true;
check_log(_RequestVote, _State) ->
false;

initialize_followers(State) ->
NextIndex = log:get_index() + 1,
Followers = [{Peer, NextIndex} || Peer <- Peers],
dict:from_list(Followers).

election_timeout(StartTime) ->
timer:diff(os:timestamp(), StartTime) div 1000.

election_timeout() ->
?ELECTION_TIMEOUT_MIN + random:uniform(?ELECTION_TIMEOUT_MAX -
?ELECTION_TIMEOUT_MIN).
4 changes: 4 additions & 0 deletions src/rafter_log.erl
@@ -0,0 +1,4 @@
-module(rafter_log).

%% API
-export([write/1, get_last_entry/0]).
31 changes: 31 additions & 0 deletions src/rafter_sup.erl
@@ -0,0 +1,31 @@

-module(rafter_sup).

-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1]).

%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

%% ===================================================================
%% API functions
%% ===================================================================

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================

init([]) ->
ConsensusFsm = { rafter_consensus_fsm,
{rafter_consensus_fsm, start_link, []},
permanent, 5000, worker, [rafter_consensus_fsm]},
{ok, { {one_for_one, 5, 10}, [ConsensusFsm]} }.

0 comments on commit 42c2eea

Please sign in to comment.