Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into az462-join-claim-improvement

  • Loading branch information...
commit 69e4040eb435f52852e19c16779c5309b6cf7c01 2 parents fde0f37 + 945fd24
@jtuple jtuple authored
View
1  ebin/riak_core.app
@@ -36,6 +36,7 @@
riak_core_ring_handler,
riak_core_ring_manager,
riak_core_ring_util,
+ riak_core_status,
riak_core_sup,
riak_core_sysmon_handler,
riak_core_sysmon_minder,
View
32 src/riak_core.erl
@@ -2,7 +2,7 @@
%%
%% Riak: A lightweight, decentralized key-value store.
%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@@ -20,7 +20,7 @@
%%
%% -------------------------------------------------------------------
-module(riak_core).
--export([stop/0, stop/1]).
+-export([stop/0, stop/1, join/1, remove_from_cluster/1]).
-export([register_vnode_module/1, vnode_modules/0]).
-export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
-export([delete_guarded_event_handler/3]).
@@ -43,6 +43,34 @@ stop(Reason) ->
init:stop().
-endif.
+%%
+%% @doc Join the ring found on the specified remote node
+%%
+join(NodeStr) when is_list(NodeStr) ->
+ join(riak_core_util:str_to_node(NodeStr));
+join(Node) when is_atom(Node) ->
+ {ok, OurRingSize} = application:get_env(riak_core, ring_creation_size),
+ case net_adm:ping(Node) of
+ pong ->
+ case rpc:call(Node,
+ application,
+ get_env,
+ [riak_core, ring_creation_size]) of
+ {ok, OurRingSize} ->
+ riak_core_gossip:send_ring(Node, node());
+ _ ->
+ {error, different_ring_sizes}
+ end;
+ pang ->
+ {error, not_reachable}
+ end.
+
+%% @spec remove_from_cluster(ExitingNode :: atom()) -> term()
+%% @doc Cause all partitions owned by ExitingNode to be taken over
+%% by other nodes.
+remove_from_cluster(ExitingNode) when is_atom(ExitingNode) ->
+ riak_core_gossip:remove_from_cluster(ExitingNode).
+
vnode_modules() ->
case application:get_env(riak_core, vnode_modules) of
undefined -> [];
View
123 src/riak_core_status.erl
@@ -0,0 +1,123 @@
+%% -------------------------------------------------------------------
+%%
+%% Riak: A lightweight, decentralized key-value store.
+%%
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_core_status).
+-export([ringready/0, transfers/0]).
+
+-spec(ringready() -> {ok, [atom()]} | {error, any()}).
+ringready() ->
+ case get_rings() of
+ {[], Rings} ->
+ {N1,R1}=hd(Rings),
+ case rings_match(hash_ring(R1), tl(Rings)) of
+ true ->
+ Nodes = [N || {N,_} <- Rings],
+ {ok, Nodes};
+
+ {false, N2} ->
+ {error, {different_owners, N1, N2}}
+ end;
+
+ {Down, _Rings} ->
+ {error, {nodes_down, Down}}
+ end.
+
+
+-spec(transfers() -> {[atom()], [{waiting_to_handoff, atom(), integer()} |
+ {stopped, atom(), integer()}]}).
+transfers() ->
+ {Down, Rings} = get_rings(),
+
+ %% Work out which vnodes are running and which partitions they claim
+ F = fun({N,R}, Acc) ->
+ {_Pri, Sec, Stopped} = partitions(N, R),
+ Acc1 = case Sec of
+ [] ->
+ [];
+ _ ->
+ [{waiting_to_handoff, N, length(Sec)}]
+ end,
+ case Stopped of
+ [] ->
+ Acc1 ++ Acc;
+ _ ->
+ Acc1 ++ [{stopped, N, length(Stopped)} | Acc]
+ end
+ end,
+ {Down, lists:foldl(F, [], Rings)}.
+
+
+%% ===================================================================
+%% Internal functions
+%% ===================================================================
+
+%% Retrieve the rings for all other nodes by RPC
+get_rings() ->
+ {RawRings, Down} = riak_core_util:rpc_every_member(
+ riak_core_ring_manager, get_my_ring, [], 30000),
+ Rings = orddict:from_list([{riak_core_ring:owner_node(R), R} || {ok, R} <- RawRings]),
+ {lists:sort(Down), Rings}.
+
+%% Produce a hash of the 'chash' portion of the ring
+hash_ring(R) ->
+ erlang:phash2(riak_core_ring:all_owners(R)).
+
+%% Check if all rings match given a hash and a list of [{N,P}] to check
+rings_match(_, []) ->
+ true;
+rings_match(R1hash, [{N2, R2} | Rest]) ->
+ case hash_ring(R2) of
+ R1hash ->
+ rings_match(R1hash, Rest);
+ _ ->
+ {false, N2}
+ end.
+
+
+%% Get a list of active partition numbers - regardless of vnode type
+active_partitions(Node) ->
+ lists:foldl(fun({_,P}, Ps) ->
+ ordsets:add_element(P, Ps)
+ end, [], running_vnodes(Node)).
+
+%% Get a list of running vnodes for a node
+running_vnodes(Node) ->
+ Pids = vnode_pids(Node),
+ [rpc:call(Node, riak_core_vnode, get_mod_index, [Pid], 30000) || Pid <- Pids].
+
+%% Get a list of vnode pids for a node
+vnode_pids(Node) ->
+ [Pid || {_,Pid,_,_} <- supervisor:which_children({riak_core_vnode_sup, Node})].
+
+%% Return a list of active primary partitions, active secondary partitions (to be handed off)
+%% and stopped partitions that should be started
+partitions(Node, Ring) ->
+ Owners = riak_core_ring:all_owners(Ring),
+ Owned = ordsets:from_list(owned_partitions(Owners, Node)),
+ Active = ordsets:from_list(active_partitions(Node)),
+ Stopped = ordsets:subtract(Owned, Active),
+ Secondary = ordsets:subtract(Active, Owned),
+ Primary = ordsets:subtract(Active, Secondary),
+ {Primary, Secondary, Stopped}.
+
+%% Return the list of partitions owned by a node
+owned_partitions(Owners, Node) ->
+ [P || {P, Owner} <- Owners, Owner =:= Node].
View
35 src/spiraltime.erl
@@ -29,6 +29,14 @@
-module(spiraltime).
-author('Justin Sheehy <justin@basho.com>').
+
+-ifdef(TEST).
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-endif.
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-export([fresh/0,fresh/1,n/0,incr/2,incr/3,
rep_second/1,rep_minute/1,
test_spiraltime/0]).
@@ -84,7 +92,7 @@ incr(N, Moment, Spiral) when Spiral#spiral.moment =:= Moment ->
% common case -- updates for "now"
Spiral#spiral{seconds=[hd(Spiral#spiral.seconds)+N|
tl(Spiral#spiral.seconds)]};
-incr(_N, Moment, Spiral) when Spiral#spiral.moment - Moment > 60 ->
+incr(_N, Moment, Spiral) when Spiral#spiral.moment - Moment > 59 ->
Spiral; % updates more than a minute old are dropped! whee!
incr(N, Moment, Spiral) ->
S1 = update_moment(Moment, Spiral),
@@ -125,4 +133,29 @@ test_spiraltime() ->
S2 = incr(3, PlusOne, S1),
{PlusOne, 3} = rep_second(S2),
{PlusOne, 20} = rep_minute(S2),
+ %% Drops items 60 seconds or older
+ S2 = incr(1, PlusOne-60, S2),
true.
+
+-ifdef(TEST).
+
+all_test() ->
+ true = test_spiraltime().
+
+-ifdef(EQC).
+
+prop_dontcrash() ->
+ ?FORALL(Mods, list({choose(0, 65), choose(-10, 10)}),
+ begin
+ Start = n(),
+ lists:foldl(fun({When, Amt}, Sp) ->
+ incr(Amt, Start + When, Sp)
+ end, fresh(Start), Mods),
+ true
+ end).
+
+eqc_test() ->
+ eqc:quickcheck(eqc:numtests(5*1000, prop_dontcrash())).
+
+-endif.
+-endif.
Please sign in to comment.
Something went wrong with that request. Please try again.