Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Backport changes to join/claim implementation to model + minor changes

  • Loading branch information...
commit 1ffdd8b4a0a9a7aed4f96fb329a203aeae93a908 1 parent a335dbe
@jtuple jtuple authored
Showing with 202 additions and 224 deletions.
  1. +19 −5 src/riak_core.erl
  2. +18 −17 src/riak_core_ring.erl
  3. +165 −202 test/join_eqc.erl
View
24 src/riak_core.erl
@@ -49,14 +49,21 @@ stop(Reason) ->
join(NodeStr) when is_list(NodeStr) ->
join(riak_core_util:str_to_node(NodeStr));
join(Node) when is_atom(Node) ->
+ {ok, OurRingSize} = application:get_env(riak_core, ring_creation_size),
case net_adm:ping(Node) of
pong ->
case rpc:call(Node, riak_core_ring_manager, get_my_ring, []) of
{ok, Ring} ->
- Ring2 = riak_core_ring:add_member(node(), Ring, node()),
- Ring3 = riak_core_ring:set_owner(Ring2, node()),
- riak_core_ring_manager:set_my_ring(Ring3),
- riak_core_gossip:send_ring(Node, node());
+ case riak_core_ring:num_partitions(Ring) of
+ OurRingSize ->
+ Ring2 = riak_core_ring:add_member(node(), Ring,
+ node()),
+ Ring3 = riak_core_ring:set_owner(Ring2, node()),
+ riak_core_ring_manager:set_my_ring(Ring3),
+ riak_core_gossip:send_ring(Node, node());
+ _ ->
+ {error, different_ring_sizes}
+ end;
_ ->
{error, unable_to_get_join_ring}
end;
@@ -96,7 +103,14 @@ leave() ->
RandomNode ->
riak_core_gossip:send_ring(Node, RandomNode),
ok
- end
+ end;
+ invalid ->
+ io:format("~p isn't a member of the cluster.~n", [Node]),
+ ok;
+ _ ->
+ io:format("~p is in the process of leaving the cluster.~n",
+ [Node]),
+ ok
end.
%% @spec remove_from_cluster(ExitingNode :: atom()) -> term()
View
35 src/riak_core_ring.erl
@@ -495,7 +495,8 @@ maybe_remove_exiting(Node, CState) ->
lists:foldl(fun(ENode, CState0) ->
%% Tell exiting node to shutdown.
riak_core_ring_manager:refresh_ring(ENode),
- set_member(Node, CState0, ENode, invalid)
+ set_member(Node, CState0, ENode,
+ invalid, same_vclock)
end, CState, Exiting),
{Changed, CState2};
_ ->
@@ -738,7 +739,8 @@ reconcile_next(Next1, Next2) ->
end, Next1, Next2).
%% @private
-reconcile_next2(Next1, Next2) ->
+reconcile_divergent_next(BaseNext, OtherNext) ->
+ MergedNext = substitute(1, BaseNext, OtherNext),
lists:zipwith(fun({Idx, Owner1, Node1, Transfers1, Status1},
{Idx, Owner2, Node2, Transfers2, Status2}) ->
Same = ({Owner1, Node1} =:= {Owner2, Node2}),
@@ -750,7 +752,7 @@ reconcile_next2(Next1, Next2) ->
ordsets:union(Transfers1, Transfers2),
merge_next_status(Status1, Status2)}
end
- end, Next1, Next2).
+ end, BaseNext, MergedNext).
%% @private
substitute(Idx, TL1, TL2) ->
@@ -777,24 +779,20 @@ reconcile_ring(StateA=#chstate{claimant=Claimant1, rvsn=VC1, next=Next1},
Next = reconcile_next(Next1, Next2),
StateA#chstate{next=Next};
{_, true, false} ->
- MergedNext = substitute(1, Next1, Next2),
- Next = reconcile_next2(Next1, MergedNext),
+ Next = reconcile_divergent_next(Next1, Next2),
StateA#chstate{next=Next};
{_, false, true} ->
- MergedNext = substitute(1, Next2, Next1),
- Next = reconcile_next2(Next2, MergedNext),
+ Next = reconcile_divergent_next(Next2, Next1),
StateB#chstate{next=Next};
{_, _, _} ->
CValid1 = lists:member(Claimant1, Members),
CValid2 = lists:member(Claimant2, Members),
case {CValid1, CValid2} of
{true, false} ->
- MergedNext = substitute(1, Next1, Next2),
- Next = reconcile_next2(Next1, MergedNext),
+ Next = reconcile_divergent_next(Next1, Next2),
StateA#chstate{next=Next};
{false, true} ->
- MergedNext = substitute(1, Next2, Next1),
- Next = reconcile_next2(Next2, MergedNext),
+ Next = reconcile_divergent_next(Next2, Next1),
StateB#chstate{next=Next};
{false, false} ->
throw("Neither claimant valid");
@@ -803,12 +801,10 @@ reconcile_ring(StateA=#chstate{claimant=Claimant1, rvsn=VC1, next=Next1},
%% But, we need to handle it for exceptional cases.
case Claimant1 < Claimant2 of
true ->
- MNext = substitute(1, Next1, Next2),
- Next = reconcile_next2(Next1, MNext),
+ Next = reconcile_divergent_next(Next1, Next2),
StateA#chstate{next=Next};
false ->
- MNext = substitute(1, Next2, Next1),
- Next = reconcile_next2(Next2, MNext),
+ Next = reconcile_divergent_next(Next2, Next1),
StateB#chstate{next=Next}
end
end
@@ -867,6 +863,12 @@ get_members(Members, Types) ->
%% @private
set_member(Node, CState, Member, Status) ->
+ VClock = vclock:increment(Node, CState#chstate.vclock),
+ CState2 = set_member(Node, CState, Member, Status, same_vclock),
+ CState2#chstate{vclock=VClock}.
+
+%% @private
+set_member(Node, CState, Member, Status, same_vclock) ->
Members2 = orddict:update(Member,
fun({_, VC}) ->
{Status, vclock:increment(Node, VC)}
@@ -874,8 +876,7 @@ set_member(Node, CState, Member, Status) ->
{Status, vclock:increment(Node,
vclock:fresh())},
CState#chstate.members),
- VClock = vclock:increment(Node, CState#chstate.vclock),
- CState#chstate{members=Members2, vclock=VClock}.
+ CState#chstate{members=Members2}.
%% @private
update_seen(Node, CState=#chstate{vclock=VClock, seen=Seen}) ->
View
367 test/join_eqc.erl
@@ -210,9 +210,9 @@ all_owners(#chstate{chring=Ring}) ->
chash:nodes(Ring).
all_members(#chstate{members=Members}) ->
- get_members(Members, [valid, leaving]).
+ get_members(Members, [valid, leaving, exiting]).
-all_valid_members(#chstate{members=Members}) ->
+claiming_members(#chstate{members=Members}) ->
get_members(Members, [valid]).
indices(CState, Node) ->
@@ -222,6 +222,9 @@ indices(CState, Node) ->
owner(#chstate{chring=Ring}, Idx) ->
chash:lookup(Idx, Ring).
+owner_node(CState) ->
+ CState#chstate.nodename.
+
init_node_state(State, Node) ->
Ring = chash:fresh(State#state.ring_size, Node),
Indices = indices(#chstate{chring=Ring}, Node),
@@ -490,7 +493,7 @@ postcondition(State, {call, _, read, [Node, Idx]}, _Result) ->
CState = get_cstate(State, Node),
Owner = owner(CState, Idx),
Comm = can_communicate(State, Node, Owner),
- Status = get_status(CState, Node),
+ Status = member_status(CState, Node),
case {Comm, Status} of
{_, exiting} ->
%% Exiting nodes don't respond to read requests. We consider
@@ -668,8 +671,8 @@ s_gossip(State, {Node, NState}, OtherNode, OtherCS) ->
InvalidNode = true,
Changed = false;
false ->
- {Changed, CState2} = merge_cstate(State, Node, CState, OtherCS),
- InvalidNode = (get_status(CState2, OtherNode) =:= invalid)
+ {Changed, CState2} = reconcile(State, CState, OtherCS),
+ InvalidNode = (member_status(CState2, OtherNode) =:= invalid)
end,
case {WrongCluster, InvalidNode, Changed} of
{true, _, _} ->
@@ -683,7 +686,7 @@ s_gossip(State, {Node, NState}, OtherNode, OtherCS) ->
State3 = maybe_shutdown(State2, Node),
State3;
{_, true, _} ->
- OtherStatus = get_status(OtherCS, OtherNode),
+ OtherStatus = member_status(OtherCS, OtherNode),
OtherRemoved = lists:member(OtherNode, State#state.removed),
case {OtherRemoved, OtherStatus} of
{_, exiting} ->
@@ -709,7 +712,7 @@ s_gossip(State, {Node, NState}, OtherNode, OtherCS) ->
State2;
_ ->
%% No legitimate reason this should occur in the model.
- ?debugFmt("S(~p): ~p~n", [OtherNode, get_status(OtherCS, OtherNode)]),
+ ?debugFmt("S(~p): ~p~n", [OtherNode, member_status(OtherCS, OtherNode)]),
throw(invalid_node_still_alive),
State2 = maybe_shutdown(State, Node),
State2
@@ -760,15 +763,7 @@ s_join(State, Node, NState, PNode) ->
end.
add_member(PNode, CState, Node) ->
- Members2 = orddict:update(Node,
- fun({_, VC}) ->
- {valid, vclock:increment(PNode, VC)}
- end,
- {valid, vclock:increment(PNode,
- vclock:fresh())},
- CState#chstate.members),
- VClock2 = vclock:increment(PNode, CState#chstate.vclock),
- CState#chstate{members=Members2, vclock=VClock2}.
+ set_member(PNode, CState, Node, valid).
s_leave(State, Node, PNode) ->
LeaveCS = get_cstate(State, PNode),
@@ -789,15 +784,10 @@ s_leave(State, Node, PNode) ->
end.
leave_member(PNode, CState, Node) ->
- Members2 = orddict:update(Node,
- fun({_, VC}) ->
- {leaving, vclock:increment(PNode, VC)}
- end,
- {leaving, vclock:increment(PNode,
- vclock:fresh())},
- CState#chstate.members),
- VClock2 = vclock:increment(PNode, CState#chstate.vclock),
- CState#chstate{members=Members2, vclock=VClock2}.
+ set_member(PNode, CState, Node, leaving).
+
+exit_member(PNode, CState, Node) ->
+ set_member(PNode, CState, Node, exiting).
all_pending(State, Node) ->
Pending = dict:fold(fun(_, NState, Pending0) ->
@@ -841,17 +831,14 @@ s_remove(State, Node, PNode, Shutdown) ->
end.
remove_member(PNode, CState, Node) ->
- Members2 = orddict:update(Node,
- fun({_, VC}) ->
- {invalid, vclock:increment(PNode, VC)}
- end,
- {invalid, vclock:increment(PNode,
- vclock:fresh())},
- CState#chstate.members),
- VClock2 = vclock:increment(PNode, CState#chstate.vclock),
- CState#chstate{members=Members2, vclock=VClock2}.
+ set_member(PNode, CState, Node, invalid).
set_member(Node, CState, Member, Status) ->
+ VClock = vclock:increment(Node, CState#chstate.vclock),
+ CState2 = set_member(Node, CState, Member, Status, same_vclock),
+ CState2#chstate{vclock=VClock}.
+
+set_member(Node, CState, Member, Status, same_vclock) ->
Members2 = orddict:update(Member,
fun({_, VC}) ->
{Status, vclock:increment(Node, VC)}
@@ -883,9 +870,8 @@ handle_cast(State=#state{random_ring=RRing, others=Others, nstates=NStates}, _,
handle_cast(State, _, Node, {remove_member, OtherNode}) ->
CState = get_cstate(State, Node),
- CState2 = set_member(Node, CState, OtherNode, invalid),
- CState3 = increment_cstate_vclock(Node, CState2),
- update_cstate(State, Node, CState3);
+ CState2 = remove_member(Node, CState, OtherNode),
+ update_cstate(State, Node, CState2);
handle_cast(State, OtherNode, Node, {gossip, OtherCS}) ->
NState = get_nstate(State, Node),
@@ -931,7 +917,7 @@ cast(State, Sender, Receiver, Msg) ->
s_maybe_handoff(State, Mod, Node, NState, Idx) ->
CState = NState#nstate.chstate,
Active0 = State#state.active_handoffs,
- {NextOwner, _} = get_next_owner(State, CState, Idx, Mod),
+ {_, NextOwner, _} = next_owner(State, CState, Idx, Mod),
Owner = owner(CState, Idx),
%%io:format("Owner/Next: ~p / ~p~n", [Owner, NextOwner]),
Ready = ring_ready(CState),
@@ -952,7 +938,7 @@ s_maybe_handoff(State, Mod, Node, NState, Idx) ->
s_finish_handoff(State, AH={Mod, Idx, Prev, New}) ->
PrevCS1 = get_cstate(State, Prev),
Owner = owner(PrevCS1, Idx),
- {NextOwner, Status} = get_next_owner(State, PrevCS1, Idx, Mod),
+ {_, NextOwner, Status} = next_owner(State, PrevCS1, Idx, Mod),
Active = State#state.active_handoffs -- [AH],
State2 = State#state{active_handoffs=Active},
@@ -961,10 +947,8 @@ s_finish_handoff(State, AH={Mod, Idx, Prev, New}) ->
{Prev, New, awaiting} ->
PrevNS1 = get_nstate(State2, Prev),
PrevNS2 = update_partition_status(Mod, PrevNS1, Idx, true, false),
- PrevCS2 = mark_transfer_complete(State, PrevCS1, Mod, Idx),
- VClock2 = vclock:increment(Prev, PrevCS1#chstate.vclock),
- PrevCS3 = PrevCS2#chstate{vclock=VClock2},
- PrevNS3 = PrevNS2#nstate{chstate=PrevCS3},
+ PrevCS2 = mark_transfer_complete(State, PrevCS1, Idx, Mod),
+ PrevNS3 = PrevNS2#nstate{chstate=PrevCS2},
State3 = update_nstate(State2, Prev, PrevNS3),
%% New owner is considered to now have data for test purposes
@@ -1014,18 +998,18 @@ maybe_shutdown(State, Node) ->
NoPendingIndices = ([] =:= ([Idx || {Idx, _, NextOwner, _} <- Next,
NextOwner =:= Node])),
Shutdown = (NoIndices and NoPartitions and NoPendingIndices),
- Status = get_status(CState, Node),
+ Status = member_status(CState, Node),
case {Ready, Shutdown, Status} of
{true, true, leaving} ->
?OUT("Exiting node ~p~n", [Node]),
- CState2 = set_member(Node, CState, Node, exiting),
- CState3 = increment_cstate_vclock(Node, CState2),
- update_cstate(State, Node, CState3);
+ CState2 = exit_member(Node, CState, Node),
+ update_cstate(State, Node, CState2);
_ ->
State
end.
-merge_cstate(State, VNode, CS01, CS02) ->
+reconcile(State, CS01, CS02) ->
+ VNode = owner_node(CS01),
CS03 = update_seen(VNode, CS01),
CS04 = update_seen(VNode, CS02),
Seen = reconcile_seen(CS03, CS04),
@@ -1052,22 +1036,16 @@ merge_cstate(State, VNode, CS01, CS02) ->
io:format("C1: ~p~nC2: ~p~n", [CS1, CS2]),
throw("Equal vclocks, but cstate unequal");
{_, false, false} ->
- CS3 = reconcile_cstate(State, VNode, CS1, CS2, VC1, VC2),
+ CS3 = reconcile_divergent(State, VNode, CS1, CS2),
{true, CS3#chstate{nodename=VNode}}
end.
-reconcile_cstate(State, VNode, CS1, CS2, VC1, VC2) ->
- VClock2 = case VNode of
- global ->
- vclock:merge([VC1, VC2]);
- _ ->
- vclock:increment(VNode, vclock:merge([VC1, VC2]))
- end,
- %%VClock2 = vclock:merge([VC1, VC2]),
-
+reconcile_divergent(State, VNode, CS1, CS2) ->
+ VClock = vclock:increment(VNode, vclock:merge([CS1#chstate.vclock,
+ CS2#chstate.vclock])),
Members = reconcile_members(CS1, CS2),
- CS3 = reconcile_ring(State, VNode, CS1, CS2, get_members(Members)),
- CS3#chstate{vclock=VClock2, members=Members}.
+ CS3 = reconcile_ring(State, CS1, CS2, get_members(Members)),
+ CS3#chstate{vclock=VClock, members=Members}.
reconcile_members(CS1, CS2) ->
%%?debugFmt("M1: ~p~nM2: ~p~n", [CS1#chstate.members, CS2#chstate.members]),
@@ -1108,7 +1086,8 @@ reconcile_next(Next1, Next2) ->
merge_next_status(Status1, Status2)}
end, Next1, Next2).
-reconcile_next2(Next1, Next2) ->
+reconcile_divergent_next(BaseNext, OtherNext) ->
+ MergedNext = substitute(1, BaseNext, OtherNext),
lists:zipwith(fun({Idx, Owner1, Node1, Transfers1, Status1},
{Idx, Owner2, Node2, Transfers2, Status2}) ->
Same = ({Owner1, Node1} =:= {Owner2, Node2}),
@@ -1120,7 +1099,7 @@ reconcile_next2(Next1, Next2) ->
ordsets:union(Transfers1, Transfers2),
merge_next_status(Status1, Status2)}
end
- end, Next1, Next2).
+ end, BaseNext, MergedNext).
substitute(Idx, TL1, TL2) ->
lists:map(fun(T) ->
@@ -1133,58 +1112,50 @@ substitute(Idx, TL1, TL2) ->
end
end, TL1).
-reconcile_ring(_State, _VNode,
- CS1=#chstate{claimant=Claimant1, rvsn=VC1},
- CS2=#chstate{claimant=Claimant2, rvsn=VC2}, Members) ->
+reconcile_ring(_State,
+ CS1=#chstate{claimant=Claimant1, rvsn=VC1, next=Next1},
+ CS2=#chstate{claimant=Claimant2, rvsn=VC2, next=Next2},
+ Members) ->
V1Newer = vclock:descends(VC1, VC2),
V2Newer = vclock:descends(VC2, VC1),
EqualVC = (vclock:equal(VC1, VC2) and (Claimant1 =:= Claimant2)),
- Next1 = CS1#chstate.next,
- Next2 = CS2#chstate.next,
%%io:format("Next1: ~p~nNext2: ~p~n", [Next1, Next2]),
- CS3 = case {EqualVC, V1Newer, V2Newer} of
- {true, _, _} ->
- ?assertEqual(CS1#chstate.chring, CS2#chstate.chring),
- Next = reconcile_next(Next1, Next2),
- CS1#chstate{next=Next};
- {_, true, false} ->
- MNext = substitute(1, Next1, Next2),
- Next = reconcile_next2(Next1, MNext),
- CS1#chstate{next=Next};
- {_, false, true} ->
- MNext = substitute(1, Next2, Next1),
- Next = reconcile_next2(Next2, MNext),
- CS2#chstate{next=Next};
- {_, _, _} ->
- CValid1 = lists:member(Claimant1, Members),
- CValid2 = lists:member(Claimant2, Members),
- case {CValid1, CValid2} of
- {true, false} ->
- MNext = substitute(1, Next1, Next2),
- Next = reconcile_next2(Next1, MNext),
- CS1#chstate{next=Next};
- {false, true} ->
- MNext = substitute(1, Next2, Next1),
- Next = reconcile_next2(Next2, MNext),
- CS2#chstate{next=Next};
- {false, false} ->
- throw("Neither claimant valid");
- {true, true} ->
- %% This should never happen in normal practice.
- %% But, we need to handle it in case of user error.
- case Claimant1 < Claimant2 of
- true ->
- MNext = substitute(1, Next1, Next2),
- Next = reconcile_next2(Next1, MNext),
- CS1#chstate{next=Next};
- false ->
- MNext = substitute(1, Next2, Next1),
- Next = reconcile_next2(Next2, MNext),
- CS2#chstate{next=Next}
- end
- end
- end,
- CS3.
+ case {EqualVC, V1Newer, V2Newer} of
+ {true, _, _} ->
+ ?assertEqual(CS1#chstate.chring, CS2#chstate.chring),
+ Next = reconcile_next(Next1, Next2),
+ CS1#chstate{next=Next};
+ {_, true, false} ->
+ Next = reconcile_divergent_next(Next1, Next2),
+ CS1#chstate{next=Next};
+ {_, false, true} ->
+ Next = reconcile_divergent_next(Next2, Next1),
+ CS2#chstate{next=Next};
+ {_, _, _} ->
+ CValid1 = lists:member(Claimant1, Members),
+ CValid2 = lists:member(Claimant2, Members),
+ case {CValid1, CValid2} of
+ {true, false} ->
+ Next = reconcile_divergent_next(Next1, Next2),
+ CS1#chstate{next=Next};
+ {false, true} ->
+ Next = reconcile_divergent_next(Next2, Next1),
+ CS2#chstate{next=Next};
+ {false, false} ->
+ throw("Neither claimant valid");
+ {true, true} ->
+ %% This should never happen in normal practice.
+ %% But, we need to handle it in case of user error.
+ case Claimant1 < Claimant2 of
+ true ->
+ Next = reconcile_divergent_next(Next1, Next2),
+ CS1#chstate{next=Next};
+ false ->
+ Next = reconcile_divergent_next(Next2, Next1),
+ CS2#chstate{next=Next}
+ end
+ end
+ end.
merge_status(invalid, _) ->
invalid;
@@ -1254,7 +1225,7 @@ update_nstate(State, Node, NState) ->
get_cstate(State, Node) ->
(get_nstate(State, Node))#nstate.chstate.
-get_status(CState, Node) ->
+member_status(CState, Node) ->
{Status, _} = orddict:fetch(Node, CState#chstate.members),
Status.
@@ -1264,15 +1235,12 @@ get_members(Members) ->
get_members(Members, Types) ->
[Node || {Node, {V, _}} <- Members, lists:member(V, Types)].
-update_seen(global, CState) ->
- CState;
-update_seen(Node, CState) ->
+update_seen(Node, CState=#chstate{vclock=VClock, seen=Seen}) ->
Seen2 = orddict:update(Node,
fun(SeenVC) ->
- vclock:merge([SeenVC, CState#chstate.vclock])
+ vclock:merge([SeenVC, VClock])
end,
- CState#chstate.vclock,
- CState#chstate.seen),
+ VClock, Seen),
CState#chstate{seen=Seen2}.
update_seen(State, Node, CState) ->
@@ -1343,14 +1311,15 @@ save_random() ->
random:seed(Seed),
Seed.
-ring_changed(State, _RRing, {Node, _NState}, CState) ->
+ring_changed(State, _RRing, {Node, _NState}, CState0) ->
+ CState = update_seen(node(), CState0),
case ring_ready(CState) of
false ->
update_cstate(State, Node, CState);
true ->
- {C1, State2, CState2} = n_maybe_update_claimant(State, Node, CState),
- {C2, State3, CState3} = n_maybe_update_ring(State2, Node, CState2),
- {C3, State4, CState4} = n_maybe_remove_exiting(State3, Node, CState3),
+ {C1, State2, CState2} = maybe_update_claimant(State, Node, CState),
+ {C2, State3, CState3} = maybe_update_ring(State2, Node, CState2),
+ {C3, State4, CState4} = maybe_remove_exiting(State3, Node, CState3),
case (C1 or C2 or C3) of
true ->
@@ -1361,7 +1330,7 @@ ring_changed(State, _RRing, {Node, _NState}, CState) ->
end
end.
-n_maybe_update_claimant(State, Node, CState) ->
+maybe_update_claimant(State, Node, CState) ->
Members = get_members(CState#chstate.members, [valid, leaving]),
Claimant = CState#chstate.claimant,
RVsn = CState#chstate.rvsn,
@@ -1379,25 +1348,35 @@ n_maybe_update_claimant(State, Node, CState) ->
{false, State, CState}
end.
-n_maybe_update_ring(State, Node, CState) ->
+maybe_update_ring(State, Node, CState) ->
Claimant = CState#chstate.claimant,
case Claimant of
Node ->
- RRing = State#state.random_ring,
- {Changed, State2, CState2} = update_ring(State, RRing, Node, CState),
- {Changed, State2, CState2};
+ case claiming_members(CState) of
+ [] ->
+ {false, State, CState};
+ _ ->
+ RRing = State#state.random_ring,
+ {Changed, State2, CState2} = update_ring(State, RRing, Node, CState),
+ {Changed, State2, CState2}
+ end;
_ ->
{false, State, CState}
end.
-n_maybe_remove_exiting(State, Node, CState) ->
+maybe_remove_exiting(State, Node, CState) ->
Claimant = CState#chstate.claimant,
case Claimant of
Node ->
Exiting = get_members(CState#chstate.members, [exiting]),
%%io:format("Claimant ~p removing exiting ~p~n", [Node, Exiting]),
Changed = (Exiting /= []),
- {State2, CState2} = remove_exiting(State, Node, CState),
+ {State2, CState2} =
+ lists:foldl(fun(ENode, {State0, CState0}) ->
+ {_, State02} = cast(State0, Node, ENode, shutdown),
+ CState02 = set_member(Node, CState0, ENode, invalid, same_vclock),
+ {State02, CState02}
+ end, {State, CState}, Exiting),
{Changed, State2, CState2};
_ ->
{false, State, CState}
@@ -1415,24 +1394,7 @@ get_counts(Nodes, Ring) ->
end, dict:from_list(Empty), Ring),
dict:to_list(Counts).
-remove_exiting(State0, Node, CState0) ->
- Exiting = get_members(CState0#chstate.members, [exiting]),
- {State2, CState2} =
- lists:foldl(fun(ENode, {State, CState}) ->
- {_, State2} = cast(State, Node, ENode, shutdown),
- CState2 = set_member(Node, CState, ENode, invalid),
- {State2, CState2}
- end, {State0, CState0}, Exiting),
- {State2, CState2}.
-
-%% TODO: Matching on active is bad when all leaving and want to remove indices
-update_ring(State, RRing, CNode, CState) ->
- Active = get_members(CState#chstate.members, [valid]),
- update_ring(State, RRing, Active, CNode, CState).
-
-update_ring(State, _RRing, [], _CNode, CState) ->
- {false, State, CState};
-update_ring(State, _RRing, Active, CNode, CState) ->
+update_ring(State, _RRing, CNode, CState) ->
Next0 = CState#chstate.next,
?ROUT("Members: ~p~n", [CState#chstate.members]),
@@ -1441,7 +1403,7 @@ update_ring(State, _RRing, Active, CNode, CState) ->
%% Remove tuples from next for removed nodes
InvalidMembers = get_members(CState#chstate.members, [invalid]),
Next2 = lists:filter(fun(NInfo) ->
- {NextOwner, Status} = get_next_owner(State, NInfo),
+ {_, NextOwner, Status} = next_owner(State, NInfo),
(Status =:= complete) or
not lists:member(NextOwner, InvalidMembers)
end, Next0),
@@ -1455,7 +1417,7 @@ update_ring(State, _RRing, Active, CNode, CState) ->
{RingChanged2, State2, CState4} = reassign_indices(State, CState3),
?ROUT("Updating ring :: next2 : ~p~n", [CState4#chstate.next]),
- Next3 = rebalance_ring(Active, CNode, CState4),
+ Next3 = rebalance_ring(CNode, CState4),
NextChanged = (Next0 /= Next3),
Changed = (NextChanged or RingChanged1 or RingChanged2),
case Changed of
@@ -1470,14 +1432,14 @@ update_ring(State, _RRing, Active, CNode, CState) ->
transfer_ownership(State, CState=#chstate{next=Next}) ->
%% Remove already completed and transfered changes
Next2 = lists:filter(fun(NInfo={Idx, _, _, _, _}) ->
- {NewOwner, S} = get_next_owner(State, NInfo),
+ {_, NewOwner, S} = next_owner(State, NInfo),
not ((S == complete) and
(owner(CState, Idx) =:= NewOwner))
end, Next),
CState2 = lists:foldl(fun(NInfo={Idx, _, _, _, _}, CState0) ->
- case get_next_owner(State, NInfo) of
- {Node, complete} ->
+ case next_owner(State, NInfo) of
+ {_, Node, complete} ->
riak_core_ring:transfer_node(Idx, Node, CState0);
_ ->
CState0
@@ -1513,8 +1475,8 @@ reassign_indices(State, CState=#chstate{next=Next}) ->
NextChanged = (Next /= CState3#chstate.next),
{RingChanged or NextChanged, State2, CState3}.
-rebalance_ring(_Active, _CNode, CState=#chstate{next=[]}) ->
- Members = get_members(CState#chstate.members, [valid]),
+rebalance_ring(_CNode, CState=#chstate{next=[]}) ->
+ Members = claiming_members(CState),
CState2 = lists:foldl(fun(Node, Ring0) ->
claim_until_balanced(Ring0, Node)
end, CState, Members),
@@ -1526,7 +1488,7 @@ rebalance_ring(_Active, _CNode, CState=#chstate{next=[]}) ->
PrevOwner /= NewOwner],
%% ?debugFmt("Next: ~p~n", [Next]),
Next;
-rebalance_ring(_Active, _CNode, _CState=#chstate{next=Next}) ->
+rebalance_ring(_CNode, _CState=#chstate{next=Next}) ->
Next.
claim_until_balanced(Ring, Node) ->
@@ -1543,11 +1505,7 @@ claim_until_balanced(Ring, Node) ->
claim_until_balanced(NewRing, Node)
end.
-remove_node(CState, Node, Status) ->
- Indices = indices(CState, Node),
- remove_node(CState, Node, Status, Indices).
-
-next_owners(#chstate{next=Next}) ->
+all_next_owners(#chstate{next=Next}) ->
[{Idx, NextOwner} || {Idx, _, NextOwner, _, _} <- Next].
change_owners(CState, Reassign) ->
@@ -1555,11 +1513,15 @@ change_owners(CState, Reassign) ->
riak_core_ring:transfer_node(Idx, NewOwner, CState0)
end, CState, Reassign).
+remove_node(CState, Node, Status) ->
+ Indices = indices(CState, Node),
+ remove_node(CState, Node, Status, Indices).
+
remove_node(CState, _Node, _Status, []) ->
CState;
remove_node(CState, Node, Status, Indices) ->
%% ?debugFmt("Reassigning from ~p: ~p~n", [Node, indices(CState, Node)]),
- CStateT1 = change_owners(CState, next_owners(CState)),
+ CStateT1 = change_owners(CState, all_next_owners(CState)),
CStateT2 = remove_from_cluster(CStateT1, Node),
Owners1 = all_owners(CState),
Owners2 = all_owners(CStateT2),
@@ -1576,9 +1538,9 @@ remove_node(CState, Node, Status, Indices) ->
|| {{Idx, PrevOwner}, {Idx, NewOwner}} <- Owners3,
PrevOwner /= NewOwner,
not lists:member(Idx, RemovedIndices)],
-
+
CState2 = change_owners(CState, Reassign),
- CState2#chstate{next=Next}. %% TODO: Could next already be set?
+ CState2#chstate{next=Next}.
remove_from_cluster(Ring, ExitingNode) ->
%% % Set the remote node to stop claiming.
@@ -1601,7 +1563,7 @@ remove_from_cluster(Ring, ExitingNode) ->
%% first hand off all claims to *any* one else,
%% just so rebalance doesn't include exiting node
%%Members = riak_core_ring:all_members(Ring),
- Members = all_valid_members(Ring),
+ Members = claiming_members(Ring),
Other = hd(lists:delete(ExitingNode, Members)),
TempRing = lists:foldl(
fun({I,N}, R) when N == ExitingNode ->
@@ -1628,7 +1590,7 @@ attempt_simple_transfer(Ring, Owners, ExitingNode) ->
TargetN,
ExitingNode, 0,
%%[{O,-TargetN} || O <- riak_core_ring:all_members(Ring),
- [{O,-TargetN} || O <- all_valid_members(Ring),
+ [{O,-TargetN} || O <- claiming_members(Ring),
O /= ExitingNode]).
attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) ->
%% handoff
@@ -1687,33 +1649,33 @@ can_communicate(State, N1, N2) ->
dict:is_key(N2, State#state.nstates) and
(not dict:is_key({N1, N2}, State#state.split)).
-get_next_owner(State, CState, Idx) ->
+next_owner(State, CState, Idx) ->
case lists:keyfind(Idx, 1, CState#chstate.next) of
false ->
- {undefined, undefined};
+ {undefined, undefined, undefined};
NInfo ->
- get_next_owner(State, NInfo)
+ next_owner(State, NInfo)
end.
-get_next_owner(_State, {_, _Owner, NextOwner, _Transfers, Status}) ->
- {NextOwner, Status}.
-
-get_next_owner(_State, CState, Idx, Mod) ->
+next_owner(_State, CState, Idx, Mod) ->
case lists:keyfind(Idx, 1, CState#chstate.next) of
false ->
- {undefined, undefined};
- {_, _Owner, NextOwner, _Transfers, complete} ->
- {NextOwner, complete};
- {_, _Owner, NextOwner, Transfers, _Status} ->
+ {undefined, undefined, undefined};
+ {_, Owner, NextOwner, _Transfers, complete} ->
+ {Owner, NextOwner, complete};
+ {_, Owner, NextOwner, Transfers, _Status} ->
case ordsets:is_element(Mod, Transfers) of
true ->
- {NextOwner, complete};
+ {Owner, NextOwner, complete};
false ->
- {NextOwner, awaiting}
+ {Owner, NextOwner, awaiting}
end
end.
-mark_transfer_complete(State, CState=#chstate{next=Next}, Mod, Idx) ->
+next_owner(_State, {_, Owner, NextOwner, _Transfers, Status}) ->
+ {Owner, NextOwner, Status}.
+
+mark_transfer_complete(State, CState=#chstate{next=Next, vclock=VClock}, Idx, Mod) ->
{Idx, Owner, NextOwner, Transfers, Status} = lists:keyfind(Idx, 1, Next),
Transfers2 = ordsets:add_element(Mod, Transfers),
VNodeMods = vnode_modules(State, Owner),
@@ -1725,39 +1687,32 @@ mark_transfer_complete(State, CState=#chstate{next=Next}, Mod, Idx) ->
_ ->
awaiting
end,
- Next2 = lists:keyreplace(Idx, 1, Next, {Idx, Owner, NextOwner, Transfers2, Status2}),
- CState#chstate{next=Next2}.
+ Next2 = lists:keyreplace(Idx, 1, Next,
+ {Idx, Owner, NextOwner, Transfers2, Status2}),
+ VClock2 = vclock:increment(Owner, VClock),
+ CState#chstate{next=Next2, vclock=VClock2}.
%% VClock timestamps may be different for test generation versus
%% shrinking/checking phases. Normalize to test for equality.
equal_cstate(CS1, CS2) ->
T1 = equal_members(CS1#chstate.members, CS2#chstate.members),
- T2 = true, %%equal_vclock(CS1#chstate.vclock, CS2#chstate.vclock),
- T3 = equal_vclock(CS1#chstate.rvsn, CS2#chstate.rvsn),
- T4 = equal_seen(CS1, CS2),
- CS3=CS1#chstate{nodename=undefined, vclock=undefined, members=undefined, rvsn=undefined, seen=[]},
- CS4=CS2#chstate{nodename=undefined, vclock=undefined, members=undefined, rvsn=undefined, seen=[]},
- T5 = (CS3 =:= CS4),
- T1 and T2 and T3 and T4 and T5.
+ T2 = equal_vclock(CS1#chstate.rvsn, CS2#chstate.rvsn),
+ T3 = equal_seen(CS1, CS2),
+
+ %% Clear fields checked manually and test remaining through equality.
+ CS3=CS1#chstate{nodename=ok, members=ok, vclock=ok, rvsn=ok, seen=ok},
+ CS4=CS2#chstate{nodename=ok, members=ok, vclock=ok, rvsn=ok, seen=ok},
+ T4 = (CS3 =:= CS4),
+ T1 and T2 and T3 and T4.
equal_members(M1, M2) ->
- L = orddict:merge(fun(_, {V1, VC1}, {V2, VC2}) ->
- V1 =:= V2 andalso
+ L = orddict:merge(fun(_, {Status1, VC1}, {Status2, VC2}) ->
+ (Status1 =:= Status2) andalso
equal_vclock(VC1, VC2)
end, M1, M2),
{_, R} = lists:unzip(L),
lists:all(fun(X) -> X =:= true end, R).
-filtered_seen(CS) ->
- Members = get_members(CS#chstate.members),
- filtered_seen(Members, CS).
-
-filtered_seen([], CS) ->
- CS#chstate.seen;
-filtered_seen(Members, CS) ->
- orddict:filter(fun(N, _) -> lists:member(N, Members) end, CS#chstate.seen).
-
-
equal_seen(CS1, CS2) ->
Seen1 = filtered_seen(CS1),
Seen2 = filtered_seen(CS2),
@@ -1766,7 +1721,15 @@ equal_seen(CS1, CS2) ->
end, Seen1, Seen2),
{_, R} = lists:unzip(L),
lists:all(fun(X) -> X =:= true end, R).
-
+
+filtered_seen(CS=#chstate{seen=Seen}) ->
+ case get_members(CS#chstate.members) of
+ [] ->
+ Seen;
+ Members ->
+ orddict:filter(fun(N, _) -> lists:member(N, Members) end, Seen)
+ end.
+
equal_vclock(VC1, VC2) ->
VC3 = [{Node, {Count, 1}} || {Node, {Count, _TS}} <- VC1],
VC4 = [{Node, {Count, 1}} || {Node, {Count, _TS}} <- VC2],
@@ -1793,8 +1756,8 @@ check_read2(State, Owner, Idx) ->
%%io:format("Checking forward~n", []),
%% Check if we should forward
OwnerCS = get_cstate(State, Owner),
- case get_next_owner(State, OwnerCS, Idx, riak_kv) of
- {Node, complete} ->
+ case next_owner(State, OwnerCS, Idx, riak_kv) of
+ {_, Node, complete} ->
?assert(Node /= Owner),
case can_communicate(State, Owner, Node) of
true ->
Please sign in to comment.
Something went wrong with that request. Please try again.