Skip to content

Commit

Permalink
Refactor ring_status into riak_core_status and enhance console ouput
Browse files Browse the repository at this point in the history
  • Loading branch information
jtuple committed Aug 16, 2011
1 parent 925506e commit 745f9bc
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 56 deletions.
149 changes: 95 additions & 54 deletions src/riak_core_console.erl
Expand Up @@ -19,81 +19,122 @@
%% -------------------------------------------------------------------

-module(riak_core_console).
-compile(export_all).
-export([member_status/0, ring_status/0]).

member_status() ->
io:format("~33..=s Membership ~34..=s~n", ["", ""]),
io:format("Status Ring Node~n"),
io:format("~79..-s~n", [""]),
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
AllStatus = riak_core_ring:all_member_status(Ring),
{Active, Leaving, Exiting} =
lists:foldl(fun({Node, Status}, {Active0, Leaving0, Exiting0}) ->
AllStatus = lists:keysort(2, riak_core_ring:all_member_status(Ring)),
RingSize = riak_core_ring:num_partitions(Ring),

{Valid, Leaving, Exiting} =
lists:foldl(fun({Node, Status}, {Valid0, Leaving0, Exiting0}) ->
Indices = riak_core_ring:indices(Ring, Node),
RingPercent = length(Indices) * 100 / RingSize,

io:format("~-7s ~5.1f% ~p~n",
[Status, RingPercent, Node]),
case Status of
valid ->
io:format("~p is ACTIVE~n", [Node]),
{Active0 + 1, Leaving0, Exiting0};
{Valid0 + 1, Leaving0, Exiting0};
leaving ->
io:format("~p is LEAVING~n", [Node]),
{Active0, Leaving0 + 1, Exiting0};
{Valid0, Leaving0 + 1, Exiting0};
exiting ->
io:format("~p is EXITING~n", [Node]),
{Active0, Leaving0, Exiting0 + 1}
{Valid0, Leaving0, Exiting0 + 1}
end
end, {0,0,0}, AllStatus),
io:format("~79..-s~n", [""]),
io:format("Active: ~p~n"
"Leaving: ~p~n"
"Exiting: ~p~n", [Active, Leaving, Exiting]),
io:format("Valid:~b / Leaving:~b / Exiting:~b~n",
[Valid, Leaving, Exiting]),
ok.

ring_status() ->
%% TODO: Change this for cases where not all nodes in the
%% cluster are running the same vnode modules.
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Changes = riak_core_ring:pending_changes(Ring),
AllMods = [Mod || {_App, Mod} <- riak_core:vnode_modules()],
{Claimant, RingReady, Down, Changes} = riak_core_status:ring_status(),
claimant_status(Claimant, RingReady),
ownership_status(Down, Changes),
unreachable_status(Down),
ok.

claimant_status(Claimant, RingReady) ->
io:format("~34..=s Claimant ~35..=s~n", ["", ""]),
Claimant = riak_core_ring:claimant(Ring),
io:format("Claimant: ~p~n", [Claimant]),
case rpc:call(Claimant, riak_core_ring, ring_ready, [], 5000) of
{badrpc, _} ->
case RingReady of
undefined ->
io:format("Status: down~n"
"Ring Ready: unknown~n", []);
RingReady ->
_ ->
io:format("Status: up~n"
"Ring Ready: ~p~n", [RingReady])
end,
io:format("~n", []),
io:format("~n", []).

ownership_status(Down, Changes) ->
io:format("~30..=s Ownership Handoff ~30..=s~n", ["", ""]),
ring_status(Changes, AllMods).
case Changes of
[] ->
io:format("No pending changes.~n");
_ ->
orddict:fold(fun print_ownership_status/3, Down, Changes)
end,
io:format("~n", []).

ring_status([], _) ->
io:format("No pending changes.~n"),
ok;
ring_status(Changes, AllMods) ->
io:format("~-49s ~-14s ~-14s~n", ["Index", "Owner", "NextOwner"]),
print_ownership_status({Owner, NextOwner}, Transfers, Down) ->
io:format("Owner: ~s~n"
"Next Owner: ~s~n", [Owner, NextOwner]),
case {lists:member(Owner, Down),
lists:member(NextOwner, Down)} of
{true, true} ->
io:format("~n"),
io:format("!!! ~s is DOWN !!!~n", [Owner]),
io:format("!!! ~s is DOWN !!!~n~n", [NextOwner]),
lists:foreach(fun print_index/1, Transfers);
{true, _} ->
io:format("~n"),
io:format("!!! ~s is DOWN !!!~n~n", [Owner]),
lists:foreach(fun print_index/1, Transfers);
{_, true} ->
io:format("~n"),
io:format("!!! ~s is DOWN !!!~n~n", [NextOwner]),
lists:foreach(fun print_index/1, Transfers);
_ ->
lists:foreach(fun print_transfer_status/1, Transfers)
end,
io:format("~n"),
io:format("~79..-s~n", [""]),
lists:foldl(
fun({Idx, Owner, NextOwner, Mods, Status}, Acc) ->
case Status of
complete ->
io:format("~-49b ~-14s ~-14s~n"
" All transfers complete. Waiting for "
"claimant to change ownership.~n",
[Idx, Owner, NextOwner]),
Acc;
awaiting ->
Waiting = AllMods -- Mods,
io:format("~-49b ~-14s ~-14s~n",
[Idx, Owner, NextOwner]),
io:format(" Waiting on: ~p~n", [Waiting]),
case Mods of
[] ->
ok;
_ ->
io:format(" Complete: ~p~n", [Mods])
end,
io:format("~79..-s~n", [""]),
Acc
end
end, [], Changes),
ok.
Down.

print_index({Idx, _Waiting, _Complete, _Status}) ->
io:format("Index: ~b~n", [Idx]).

print_transfer_status({Idx, Waiting, Complete, Status}) ->
io:format("~nIndex: ~b~n", [Idx]),
case Status of
complete ->
io:format(" All transfers complete. Waiting for "
"claimant to change ownership.~n");
awaiting ->
io:format(" Waiting on: ~p~n", [Waiting]),
case Complete of
[] ->
ok;
_ ->
io:format(" Complete: ~p~n", [Complete])
end
end.

unreachable_status([]) ->
io:format("~30..=s Unreachable Nodes ~30..=s~n", ["", ""]),
io:format("All nodes are up and reachable~n", []),
io:format("~n", []);
unreachable_status(Down) ->
io:format("~30..=s Unreachable Nodes ~30..=s~n", ["", ""]),
io:format("The following nodes are down/unreachable: ~p~n", [Down]),
io:format("~n", []),
io:format("WARNING: The cluster state will not converge until all nodes~n"
"are up. Once the above nodes come back online, convergence~n"
"will continue. If the outages are permanent or long-term, you~n"
"can forcibly remove the nodes from the cluster (riak-admin~n"
"force-remove NODE) to allow the remaining nodes to settle.~n"),
ok.
49 changes: 48 additions & 1 deletion src/riak_core_status.erl
Expand Up @@ -20,7 +20,7 @@
%%
%% -------------------------------------------------------------------
-module(riak_core_status).
-export([ringready/0, transfers/0]).
-export([ringready/0, transfers/0, ring_status/0]).

-spec(ringready() -> {ok, [atom()]} | {error, any()}).
ringready() ->
Expand Down Expand Up @@ -64,6 +64,53 @@ transfers() ->
end,
{Down, lists:foldl(F, [], Rings)}.

ring_status() ->
%% Determine which nodes are reachable as well as what vnode modules
%% are running on each node.
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
{AllMods, Down} =
riak_core_util:rpc_every_member_ann(riak_core, vnode_modules, [], 1000),

%% Check if the claimant is running and if it believes the ring is ready
Claimant = riak_core_ring:claimant(Ring),
case rpc:call(Claimant, riak_core_ring, ring_ready, [], 5000) of
{badrpc, _} ->
Down2 = lists:usort([Claimant|Down]),
RingReady = undefined;
RingReady ->
Down2 = Down,
RingReady = RingReady
end,

%% Get the list of pending ownership changes
Changes = riak_core_ring:pending_changes(Ring),
%% Group pending changes by (Owner, NextOwner)
Merged = lists:foldl(
fun({Idx, Owner, NextOwner, Mods, Status}, Acc) ->
orddict:append({Owner, NextOwner},
{Idx, Mods, Status},
Acc)
end, [], Changes),

%% For each pending transfer, determine which vnode modules have completed
%% handoff and which we are still waiting on.
%% Final result is of the form:
%% [{Owner, NextOwner}, [{Index, WaitingMods, CompletedMods, Status}]]
TransferStatus =
orddict:map(
fun({Owner, _}, Transfers) ->
case orddict:find(Owner, AllMods) of
error ->
[{Idx, down, Mods, Status}
|| {Idx, Mods, Status} <- Transfers];
{ok, OwnerMods} ->
NodeMods = [Mod || {_App, Mod} <- OwnerMods],
[{Idx, NodeMods -- Mods, Mods, Status}
|| {Idx, Mods, Status} <- Transfers]
end
end, Merged),

{Claimant, RingReady, Down2, TransferStatus}.

%% ===================================================================
%% Internal functions
Expand Down
15 changes: 14 additions & 1 deletion src/riak_core_util.erl
Expand Up @@ -35,7 +35,8 @@
chash_bucketonly_keyfun/1,
mkclientid/1,
start_app_deps/1,
rpc_every_member/4]).
rpc_every_member/4,
rpc_every_member_ann/4]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand Down Expand Up @@ -224,6 +225,18 @@ rpc_every_member(Module, Function, Args, Timeout) ->
Nodes = riak_core_ring:all_members(MyRing),
rpc:multicall(Nodes, Module, Function, Args, Timeout).

%% @doc Same as rpc_every_member/4, but annotate the result set with
%% the name of the node returning the result.
-spec rpc_every_member_ann(module(), atom(), [term()], integer()|infinity)
-> {Results::[{node(), term()}], Down::[node()]}.
rpc_every_member_ann(Module, Function, Args, Timeout) ->
{ok, MyRing} = riak_core_ring_manager:get_my_ring(),
Nodes = riak_core_ring:all_members(MyRing),
{Results, Down} = rpc:multicall(Nodes, Module, Function, Args, Timeout),
Up = Nodes -- Down,
TaggedResults = lists:zip(Up, Results),
{TaggedResults, Down}.

%% ===================================================================
%% EUnit tests
%% ===================================================================
Expand Down

0 comments on commit 745f9bc

Please sign in to comment.