Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge remote-tracking branch 'vagabond/netsplit-tolerance'

  • Loading branch information...
commit 38902e10907f2b734ca3256dfa0fe0d82ad98fd5 2 parents ea14513 + ad57a96
@garret-smith authored
Showing with 192 additions and 12 deletions.
  1. +3 −0  .gitignore
  2. +70 −0 include/gen_leader_specs.hrl
  3. +119 −12 src/gen_leader.erl
View
3  .gitignore
@@ -13,3 +13,6 @@ profile
# Vim
*.swp
*.swo
+
+# Emacs
+*~
View
70 include/gen_leader_specs.hrl
@@ -0,0 +1,70 @@
+
+-spec init (Args::term()) ->
+ {'ok', State::term()} |
+ 'ignore' |
+ {'stop', Reason::term()}.
+
+%% elected(_,_,node()) - captured a single candidate
+%% reply notifies only the new candidate of the
+%% sync state, ok notifies all candidates
+%% elected(_,_,undefined) - won a general election
+%% ok broadcasts sync to all candidates
+-spec elected(State, gen_leader:election(), node() | 'undefined') ->
+ {'ok', Sync::term(), State} |
+ {'reply', Sync::term(), State}.
+
+-spec surrendered(State, Sync::term(), gen_leader:election()) ->
+ {'ok', State}.
+
+-spec handle_leader_call(Request::term(),
+ From::{pid(), reference()},
+ State,
+ gen_leader:election()) ->
+ {'reply', Reply::term(), State} |
+ {'reply', Reply::term(), Broadcast::term(), State} |
+ {'noreply', State} |
+ {'stop', Reason::term(), Reply::term(), State}.
+
+-spec handle_leader_cast(Msg::term(),
+ State,
+ gen_leader:election()) ->
+ {'ok', Broadcast::term(), State} |
+ {'noreply', State} |
+ {'stop', Reason::term(), State}.
+
+-spec from_leader(Msg::term(),
+ State,
+ gen_leader:election()) ->
+ {'noreply', State} |
+ {'ok', State} |
+ {'stop', Reason::term(), State}.
+
+-spec handle_call(Request::term(),
+ From::{pid(), reference()},
+ State,
+ gen_leader:election()) ->
+ {'reply', Reply::term(), State} |
+ {'noreply', State} |
+ {'stop', Reason::term(), State} |
+ {'stop', Reason::term(), Reply::term(), State}.
+
+-spec handle_cast(Msg::term(),
+ State,
+ gen_leader:election()) ->
+ {'noreply', State} |
+ {'stop', Reason::term(), State}.
+
+-spec handle_DOWN(node(), State, gen_leader:election()) ->
+ {'ok', State} |
+ {'ok', Sync::term(), State}.
+
+-spec handle_info(Msg::term(), State) ->
+ {'noreply', State} |
+ {'stop', Reason::term(), State} |
+ {'ok', State}.
+
+-spec terminate(Reason::term(), State::term()) -> any().
+
+-spec code_change(OldVsn::term(), State, gen_leader:election(), Extra::term()) ->
+ {'ok', State} |
+ {'ok', State, gen_leader:election()}.
View
131 src/gen_leader.erl
@@ -156,6 +156,10 @@
bcast_type :: bcast_type()
}).
+-opaque election() :: #election{}.
+
+-export_type([election/0]).
+
-record(server, {
parent,
mod,
@@ -242,27 +246,27 @@ start_link(Name, CandidateNodes, OptArgs, Mod, Arg, Options)
%% Query functions to be used from the callback module
%% @doc Returns list of alive nodes.
--spec alive(#election{}) -> [node()].
+-spec alive(election()) -> [node()].
alive(E) ->
candidates(E) -- down(E).
%% @doc Returns list of down nodes.
--spec down(#election{}) -> [node()].
+-spec down(election()) -> [node()].
down(#election{down = Down}) ->
Down.
%% @doc Returns the current leader node.
--spec leader_node(#election{}) -> node() | 'none'.
+-spec leader_node(election()) -> node() | 'none'.
leader_node(#election{leadernode=Leader}) ->
Leader.
%% @doc Returns a list of known candidates.
--spec candidates(#election{}) -> [node()].
+-spec candidates(election()) -> [node()].
candidates(#election{candidate_nodes = Cands}) ->
Cands.
%% @doc Returns a list of known workers.
--spec workers(#election{}) -> [node()].
+-spec workers(election()) -> [node()].
workers(#election{worker_nodes = Workers}) ->
Workers.
@@ -344,23 +348,23 @@ leader_call(Name, Request, Timeout) ->
%% @equiv gen_server:cast/2
--spec cast(Name::name()|pid(), Request::term()) -> 'ok'.
+-spec cast(Name::server_ref(), Request::term()) -> 'ok'.
cast(Name, Request) ->
catch do_cast('$gen_cast', Name, Request),
ok.
%% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
%% the leader via the local gen_leader instance.
--spec leader_cast(Name::name()|pid(), Request::term()) -> 'ok'.
+-spec leader_cast(Name::server_ref(), Request::term()) -> 'ok'.
leader_cast(Name, Request) ->
catch do_cast('$leader_cast', Name, Request),
ok.
-do_cast(Tag, Name, Request) when is_atom(Name) ->
- Name ! {Tag, Request};
-do_cast(Tag, Pid, Request) when is_pid(Pid) ->
- Pid ! {Tag, Request}.
+do_cast(Tag, {global, Name}, Request) ->
+ global:send(Name, {Tag, Request});
+do_cast(Tag, ServerRef, Request) ->
+ ServerRef ! {Tag, Request}.
%% @equiv gen_server:reply/2
@@ -632,7 +636,9 @@ safe_loop(#server{mod = Mod, state = State} = Server, Role,
%% This process is no longer the leader!
%% The sender will notice this via a DOWN message
safe_loop(Server,Role,E,Msg);
-
+ {election} = Msg ->
+ %% We're already in an election, so this is likely an old message.
+ safe_loop(Server, Role, E, Msg);
{heartbeat, _Node} = Msg ->
safe_loop(Server,Role,E,Msg);
{candidate_timer} = Msg ->
@@ -642,6 +648,8 @@ safe_loop(#server{mod = Mod, state = State} = Server, Role,
timer:cancel(E#election.cand_timer),
E#election{cand_timer = undefined};
Down ->
+ %% get rid of any queued up candidate_timers, since we just handled one
+ flush_candidate_timers(),
%% Some of potential master candidate nodes are down.
%% Try to wake them up
F = fun(N) ->
@@ -651,6 +659,30 @@ safe_loop(#server{mod = Mod, state = State} = Server, Role,
E
end,
safe_loop(Server,Role,NewE,Msg);
+ {checklead, Node} = Msg ->
+ %% in the very exceptional case when a candidate comes up when the
+ %% elected leader is *behind* it in the candidate list *and* all nodes
+ %% before it in the candidate list are up, the candidate will be stuck in
+ %% the safe_loop forever. This is because gen_leader relies on either
+ %% one of the nodes being down, or the nodes responding to the heartbeat
+ %% sent as part of stage1. However, nodes that are up but are NOT the
+ %% leader do not respond to heartbeats. In this very exceptional case,
+ %% we send a heartbeat to the leader in response to the checklead it
+ %% sent us to bootstrap things and get out of this quagmire.
+ case lists:member(Node,E#election.candidate_nodes) and
+ (E#election.status == elec1) of
+ true ->
+ case ( pos(Node,E#election.candidate_nodes) >
+ pos(node(),E#election.candidate_nodes) ) of
+ true ->
+ {Name, Node} ! {heartbeat, self()};
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end,
+ safe_loop(Server,Role,E,Msg);
{ldr, 'DOWN', Node} = Msg when Role == waiting_worker ->
NewE =
case Node == E#election.leadernode of
@@ -829,6 +861,54 @@ loop(#server{parent = Parent,
false ->
loop(Server,Role,E,Msg)
end;
+ {election} ->
+ %% Told to do an election because of a leader conflict.
+ E1 = startStage1(E, Server),
+ safe_loop(Server, candidate, E1, Msg);
+ {checklead, Node} ->
+ case (E#election.leadernode == Node) of
+ true ->
+ %% Leaders match, nothing to do
+ loop(Server, Role, E, Msg);
+ false when E#election.leader == self() ->
+ %% We're a leader and we disagree with the other
+ %% leader. Tell everyone else to have an election.
+ Newdown = E#election.down -- [Node],
+ E1 = E#election{down = Newdown},
+ lists:foreach(
+ fun(N) ->
+ {Name, N} ! {election}
+ end, E1#election.candidate_nodes),
+ %% Start participating in the election ourselves.
+ E1 = startStage1(E, Server),
+ safe_loop(Server, candidate, E1, Msg);
+ false ->
+ %% Not a leader, just wait to be told to do an
+ %% election, if applicable.
+ loop(Server, Role, E, Msg)
+ end;
+ {send_checklead} ->
+ case (E#election.leader == self()) of
+ true ->
+ case E#election.down of
+ [] ->
+ loop(Server, Role, E, Msg);
+ Down ->
+ %% For any nodes which are down, send them
+ %% a message comparing their leader to our
+ %% own. This allows us to trigger an
+ %% election after a netsplit is healed.
+ F = fun(N) ->
+ {Name, N} ! {checklead, node()}
+ end,
+ [F(N) || N <- Down],
+ %% schedule another heartbeat
+ timer:send_after(E#election.cand_timer_int, {send_checklead}),
+ loop(Server, Role, E, Msg)
+ end;
+ false ->
+ loop(Server, Role, E, Msg)
+ end;
{heartbeat, _Node} ->
case (E#election.leader == self()) of
true ->
@@ -853,6 +933,9 @@ loop(#server{parent = Parent,
timer:cancel(E#election.cand_timer),
E#election{cand_timer=undefined};
true ->
+ %% get rid of any queued up candidate_timers,
+ %% since we just handled one
+ flush_candidate_timers(),
E
end,
%% This shouldn't happen in the leader - just ignore
@@ -886,6 +969,13 @@ loop(#server{parent = Parent,
{ok, Synch, NewState1} ->
{NewState1, broadcast({from_leader,Synch}, E1)}
end,
+ %% We're the leader and one of our
+ %% candidates has gone down. Start sending
+ %% out checklead messages to the downed
+ %% candidates so we can quickly trigger an
+ %% election, if this was a netsplit when
+ %% its healed.
+ {Name, node()} ! {send_checklead},
loop(Server#server{state=NewState}, Role, NewE, Msg);
false ->
loop(Server, Role, E1,Msg)
@@ -1282,6 +1372,7 @@ hasBecomeLeader(E,Server,Msg) ->
%% io:format("==> I am the leader! (acks: ~200p)\n", [E#election.acks]),
%% Set the internal timeout (corresponds to Periodically)
timer:send_after(E#election.cand_timer_int, {heartbeat, node()}),
+ {E#election.name, node()} ! {send_checklead},
%% trigger handle_DOWN callback if previous leader is down
PrevLeader = E#election.previous_leader,
@@ -1482,3 +1573,19 @@ mon_handle_down(Ref, Parent, Refs) ->
mon_reply(From, Reply) ->
From ! {mon_reply, Reply}.
+
+%% the heartbeat messages sent to the downed nodes when the candicate_timer
+%% message is received can take a very long time in the case of a partitioned
+%% network (7 seconds in my testing). Since the candidate_timer is generated
+%% by a send_interval, this means many candidate_timer messages can accumulate
+%% in the mailbox. This function is used to clear them out after handling one
+%% of the candidate_timers, so gen_leader doesn't spend all its time sending
+%% heartbeats.
+flush_candidate_timers() ->
+ receive
+ {candidate_timer} ->
+ flush_candidate_timers()
+ after
+ 0 ->
+ ok
+ end.
Please sign in to comment.
Something went wrong with that request. Please try again.