Skip to content
Browse files

Add cluster capability system + API

-- Provides API to register supported protocols/modes associated
   with a given capability.

-- Auto-negotiates the mode across the cluster, choosing the most
   preferred mode supported by all members.
  • Loading branch information...
1 parent 07e68d6 commit d83fee8d5a453495d48330deea1a83572ec183f6 @jtuple jtuple committed Apr 24, 2012
Showing with 558 additions and 1 deletion.
  1. +1 −0 ebin/riak_core.app
  2. +2 −1 src/riak_core.erl
  3. +529 −0 src/riak_core_capability.erl
  4. +1 −0 src/riak_core_ring_handler.erl
  5. +1 −0 src/riak_core_sup.erl
  6. +24 −0 src/riak_core_util.erl
View
1 ebin/riak_core.app
@@ -22,6 +22,7 @@
riak_core_cinfo_core,
riak_core_claim,
riak_core_new_claim,
+ riak_core_capability,
riak_core_config,
riak_core_console,
riak_core_coverage_fsm,
View
3 src/riak_core.erl
@@ -134,7 +134,8 @@ standard_join(Node, Ring, Rejoin) ->
node(),
gossip_vsn,
GossipVsn),
- riak_core_ring_manager:set_my_ring(Ring4),
+ {_, Ring5} = riak_core_capability:publish_supported(Ring4),
+ riak_core_ring_manager:set_my_ring(Ring5),
riak_core_gossip:send_ring(Node, node())
end.
View
529 src/riak_core_capability.erl
@@ -0,0 +1,529 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core: Core Riak Application
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_capability).
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0,
+ register/4,
+ register/3,
+ get/1,
+ all/0,
+ publish_supported/1,
+ ring_changed/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-type capability() :: atom().
+-type mode() :: term().
+
+-record(capability, {supported :: [mode()],
+ default :: mode(),
+ legacy}).
+
+-type registered() :: [{capability(), #capability{}}].
+
+-record(state, {registered :: registered(),
+ supported :: [{node(), [{capability(), [mode()]}]}],
+ unknown :: [node()],
+ negotiated :: [{capability(), mode()}]
+ }).
+
+-define(ETS, riak_capability_ets).
+-define(CAPS, '$riak_capabilities').
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% @doc Register a new capability providing a list of supported modes, the
+%% default mode, and an optional mapping of how a legacy application variable
+%% maps to different modes
+register(Capability, Supported, Default, LegacyVar) ->
+ Info = capability_info(Supported, Default, LegacyVar),
+ gen_server:call(?MODULE, {register, Capability, Info}, infinity),
+ ok.
+
+%% @doc Register a new capability providing a list of supported modes as well
+%% as the default value
+register(Capability, Supported, Default) ->
+ register(Capability, Supported, Default, []).
+
+%% @doc Query the current negotiated mode for a given capability
+get(Capability) ->
+ try
+ case ets:lookup(?ETS, Capability) of
+ [] ->
+ undefined;
+ [{Capability, Choice}] ->
+ Choice
+ end
+ catch
+ _:_ ->
+ undefined
+ end.
+
+%% @doc Return a list of all negotiated capabilities
+all() ->
+ ets:tab2list(?ETS).
+
+%% @doc Add the local node's supported capabilities to the given
+%% ring. Currently used during the `riak-admin join' process
+publish_supported(Ring) ->
+ [{_, Supported}] = ets:lookup(?ETS, '$supported'),
+ publish_supported(node(), Supported, Ring).
+
+%% @doc Internal callback used by `riak_core_ring_handler' to notify the
+%% capability manager of a new ring
+%% @hidden
+ring_changed(Ring) ->
+ gen_server:call(?MODULE, {ring_changed, Ring}, infinity).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ ets:new(?ETS, [named_table, {read_concurrency, true}]),
+ schedule_tick(),
+ Registered = load_registered(),
+ State = init_state(Registered),
+ State2 = reload(State),
+ {ok, State2}.
+
+init_state(Registered) ->
+ #state{registered=Registered,
+ supported=[],
+ unknown=[],
+ negotiated=[]}.
+
+handle_call({register, Capability, Info}, _From, State) ->
+ State2 = register_capability(node(), Capability, Info, State),
+ State3 = renegotiate_capabilities(State2),
+ update_ring(State3),
+ update_local_cache(State3),
+ save_registered(State3#state.registered),
+ {reply, ok, State3};
+
+handle_call({ring_changed, Ring}, _From, State) ->
+ State2 = update_supported(Ring, State),
+ {reply, ok, State2}.
+
+handle_cast(tick, State) ->
+ schedule_tick(),
+ State2 =
+ lists:foldl(fun(Node, StateAcc) ->
+ add_node(Node, [], StateAcc)
+ end, State, State#state.unknown),
+ State3 = renegotiate_capabilities(State2),
+ {noreply, State3}.
+
+schedule_tick() ->
+ timer:apply_after(1000, gen_server, cast, [?MODULE, tick]).
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+capability_info(Supported, Default, Legacy) ->
+ #capability{supported=Supported, default=Default, legacy=Legacy}.
+
+%% Capabilities are re-initialized if riak_core_capability server crashes
+reload(State=#state{registered=[]}) ->
+ State;
+reload(State) ->
+ lager:info("Reloading capabilities"),
+ State2 =
+ orddict:fold(
+ fun(Capability, Info, S) ->
+ S2 = add_registered(Capability, Info, S),
+ S3 = add_supported(node(), Capability,
+ Info#capability.supported, S2),
+ S3
+ end, State, State#state.registered),
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ State3 = update_supported(Ring, State2),
+ update_local_cache(State3),
+ save_registered(State3#state.registered),
+ State3.
+
+%% Update this node's view of cluster capabilities based on a received ring
+update_supported(Ring, State) ->
+ AllSupported = get_supported_from_ring(Ring),
+ State2 = remove_members(Ring, State),
+ State3 =
+ lists:foldl(fun({Node, _}, StateAcc) when Node == node() ->
+ StateAcc;
+ ({Node, Supported}, StateAcc) ->
+ Known = get_supported(Node, StateAcc),
+ case {Supported, Known} of
+ {[], []} ->
+ add_node(Node, Supported, StateAcc);
+ {[], _} ->
+ StateAcc;
+ {Same, Same} ->
+ StateAcc;
+ {_, _} ->
+ add_node(Node, Supported, StateAcc)
+ end
+ end, State2, AllSupported),
+ State4 = renegotiate_capabilities(State3),
+ State4.
+
+register_capability(Node, Capability, Info, State) ->
+ State2 = add_registered(Capability, Info, State),
+ State3 =
+ add_supported(Node, Capability, Info#capability.supported, State2),
+ State3.
+
+add_registered(Capability, Info, State) ->
+ Registered = orddict:store(Capability, Info, State#state.registered),
+ State#state{registered=Registered}.
+
+get_supported(Node, #state{supported=Supported}) ->
+ case orddict:find(Node, Supported) of
+ {ok, Caps} ->
+ Caps;
+ error ->
+ orddict:new()
+ end.
+
+add_supported(Node, Capability, Supported, State) ->
+ Sup = get_supported(Node, State),
+ Sup2 = orddict:store(Capability, Supported, Sup),
+ NodeSupported = orddict:store(Node, Sup2, State#state.supported),
+ State2 = State#state{supported=NodeSupported},
+ State2.
+
+%% Clear any capability information associated with nodes that are no longer
+%% members of the cluster
+remove_members(Ring, State=#state{supported=Supported}) ->
+ Members = riak_core_ring:all_members(Ring),
+ Supported2 =
+ orddict:filter(fun(Node, _) ->
+ lists:member(Node, Members)
+ end, Supported),
+ State2 = State#state{supported=Supported2},
+ State2.
+
+%% Add another member to the local view of cluster capabilities. If the node
+%% has published capability information in the ring, use it. Otherwise, try
+%% to determine capabilities through RPC to the node. If RPC fails, use
+%% default values. However, unresolved nodes will be marked as such and RPC
+%% re-attempted at the next server tick.
+add_node(Node, [], State=#state{unknown=Unknown}) ->
+ {Capabilities, Resolved} = query_capabilities(Node, State),
+ Unknown2 = case Resolved of
+ true ->
+ ordsets:del_element(Node, Unknown);
+ false ->
+ ordsets:add_element(Node, Unknown)
+ end,
+ State2 = State#state{unknown=Unknown2},
+ add_node_capabilities(Node, Capabilities, State2);
+add_node(Node, Capabilities, State) ->
+ add_node_capabilities(Node, Capabilities, State).
+
+add_node_capabilities(Node, Capabilities, State) ->
+ lists:foldl(fun({Capability, Supported}, StateAcc) ->
+ add_supported(Node, Capability, Supported, StateAcc)
+ end, State, Capabilities).
+
+%% We maintain a cached-copy of the local node's supported capabilities
+%% in our existing capability ETS table. This allows publish_supported/1
+%% to update rings without going through the capability server.
+update_local_cache(State) ->
+ Supported = get_supported(node(), State),
+ ets:insert(?ETS, {'$supported', Supported}),
+ ok.
+
+%% Publish the local node's supported modes in the ring
+update_ring(State) ->
+ Node = node(),
+ Supported = get_supported(Node, State),
+ F = fun(Ring, _) ->
+ {Changed, Ring2} = publish_supported(Node, Supported, Ring),
+ case Changed of
+ true ->
+ {new_ring, Ring2};
+ false ->
+ ignore
+ end
+ end,
+ spawn(fun() ->
+ riak_core_ring_manager:ring_trans(F, ok)
+ end),
+ ok.
+
+%% Add a node's capabilities to the provided ring
+publish_supported(Node, Supported, Ring) ->
+ Current = riak_core_ring:get_member_meta(Ring, Node, ?CAPS),
+ case Current of
+ Supported ->
+ {false, Ring};
+ _ ->
+ Ring2 = riak_core_ring:update_member_meta(Node, Ring, Node,
+ ?CAPS, Supported),
+ {true, Ring2}
+ end.
+
+%% Given the current view of each node's supported capabilities, determine
+%% the most preferred mode for each capability that is supported by all nodes
+%% in the cluster.
+negotiate_capabilities(Node, Override, State=#state{registered=Registered,
+ supported=Capabilities}) ->
+ MyCaps = orddict:fetch(Node, Capabilities),
+ N1 = reformat_capabilities(Registered, Capabilities),
+ N2 = intersect_capabilities(N1),
+ N3 = order_by_preference(MyCaps, N2),
+ N4 = override_capabilities(N3, Override),
+ N5 = [{Cap, hd(Common)} || {Cap, Common} <- N4],
+ State#state{negotiated=N5}.
+
+renegotiate_capabilities(State=#state{supported=[]}) ->
+ State;
+renegotiate_capabilities(State) ->
+ Caps = orddict:fetch(node(), State#state.supported),
+ Overrides = get_overrides(Caps),
+ State2 = negotiate_capabilities(node(), Overrides, State),
+ process_capability_changes(State#state.negotiated,
+ State2#state.negotiated),
+ State2.
+
+%% Known capabilities are tracked based on node:
+%%
+%% [{Node1, [{capability1, [x,y,z]},
+%% {capability2, [x,y,z]}]},
+%% {Node2, [{capability1, [a,b,z]}]}].
+%%
+%% Here we convert this data into a capability-centric structure:
+%%
+%% [{capability1, [{Node1, [x,y,z,default]}, {Node2, [a,b,c,default]}]},
+%% {capability2, [{Node1, [x,y,z,default]}, {Node2, [default]}]}]
+%%
+-spec reformat_capabilities(registered(),
+ [{node(), [{capability(), [mode()]}]}])
+ -> [{capability(), [{node(), [mode()]}]}].
+reformat_capabilities(Registered, Capabilities) ->
+ DefaultsL = [{Cap, [Info#capability.default]} || {Cap,Info} <- Registered],
+ Defaults = orddict:from_list(DefaultsL),
+ lists:foldl(fun({Node, NodeCaps}, Acc) ->
+ update_capability(Node, NodeCaps, Defaults, Acc)
+ end, orddict:new(), Capabilities).
+
+update_capability(Node, NodeCaps, Defaults, Acc0) ->
+ NodeCaps2 = extend(orddict:from_list(NodeCaps), Defaults),
+ lists:foldl(fun({Cap, Supported}, Acc) ->
+ S = ordsets:from_list(Supported),
+ orddict:append(Cap, {Node, S}, Acc)
+ end, Acc0, NodeCaps2).
+
+extend(A, B) ->
+ orddict:merge(fun(_, L, X) -> X++L end, A, B).
+
+%% For each capability, determine the modes supported by all nodes
+-spec intersect_capabilities([{capability(), [{node(), [mode()]}]}])
+ -> [{capability(), [mode()]}].
+intersect_capabilities(Capabilities) ->
+ lists:map(fun intersect_supported/1, Capabilities).
+
+intersect_supported({Capability, NodeSupported}) ->
+ {_, Supported0} = hd(NodeSupported),
+ Common =
+ lists:foldl(fun({_Node, Supported}, Acc) ->
+ ordsets:intersection(Acc, Supported)
+ end, Supported0, tl(NodeSupported)),
+ {Capability, Common}.
+
+%% For each capability, re-order the computed mode list by local preference.
+%% In reality, this is just an order-sensitive intersection between the local
+%% node's list of supported modes and the computed list.
+order_by_preference(MyCapabilities, Common) ->
+ [order_by_preference(Cap, Pref, Common) || {Cap, Pref} <- MyCapabilities].
+
+order_by_preference(Capability, Preferred, Common) ->
+ Modes = orddict:fetch(Capability, Common),
+ Preferred2 = [Mode || Mode <- Preferred,
+ lists:member(Mode, Modes)],
+ {Capability, Preferred2}.
+
+%% Override computed capabilities based on app.config settings
+override_capabilities(Caps, AppOver) ->
+ [override_capability(Cap, Modes, AppOver) || {Cap, Modes} <- Caps].
+
+override_capability(AC={App, Cap}, Modes, AppOver) ->
+ Over = orddict:fetch(App, AppOver),
+ case orddict:find(Cap, Over) of
+ error ->
+ {AC, Modes};
+ {ok, Opts} ->
+ {AC, override_capability(Opts, Modes)}
+ end.
+
+override_capability(Opts, Modes) ->
+ Use = proplists:get_value(use, Opts),
+ Prefer = proplists:get_value(prefer, Opts),
+ case {Use, Prefer} of
+ {undefined, undefined} ->
+ Modes;
+ {undefined, Val} ->
+ [Mode || Mode <- Modes,
+ Mode == Val];
+ {Val, _} ->
+ [Val]
+ end.
+
+get_overrides(Caps) ->
+ Apps = lists:usort([App || {{App, _}, _} <- Caps]),
+ AppOver = [{App, get_app_overrides(App)} || App <- Apps],
+ AppOver.
+
+get_app_overrides(App) ->
+ case application:get_env(App, override_capability) of
+ undefined ->
+ [];
+ {ok, L} ->
+ orddict:from_list(L)
+ end.
+
+%% Log capability changes as well as update the capability ETS table.
+%% The ETS table allows other processes to query current capabilities
+%% without going through the capability server.
+process_capability_changes(OldModes, NewModes) ->
+ Diff = riak_core_util:orddict_delta(OldModes, NewModes),
+ orddict:fold(fun(Capability, {'$none', New}, _) ->
+ ets:insert(?ETS, {Capability, New}),
+ lager:info("New capability: ~p = ~p", [Capability, New]);
+ (Capability, {Old, '$none'}, _) ->
+ ets:delete(?ETS, Capability),
+ lager:info("Removed capability ~p (previously: ~p)",
+ [Capability, Old]);
+ (Capability, {Old, New}, _) ->
+ ets:insert(?ETS, {Capability, New}),
+ lager:info("Capability changed: ~p / ~p -> ~p",
+ [Capability, Old, New])
+ end, ok, Diff).
+
+%% Determine the capabilities supported by each cluster member based on the
+%% information published in the ring
+get_supported_from_ring(Ring) ->
+ Members = riak_core_ring:all_members(Ring),
+ [begin
+ Caps = riak_core_ring:get_member_meta(Ring, Member, ?CAPS),
+ case Caps of
+ undefined ->
+ {Member, []};
+ _ ->
+ {Member, Caps}
+ end
+ end || Member <- Members].
+
+%% Determine capabilities of legacy nodes based on app.config settings and
+%% the provided app-var -> mode mapping associated with capabilities when
+%% registered.
+query_capabilities(Node, #state{registered=Registered}) ->
+ lists:mapfoldl(fun({Capability, Info}, ResolvedAcc) ->
+ {Resv, Cap} = query_capability(Node,
+ Capability,
+ Info#capability.default,
+ Info#capability.legacy),
+ {Cap, ResolvedAcc and Resv}
+ end, true, Registered).
+
+query_capability(Node, Capability, DefaultSup, {App, Var, Map}) ->
+ Default = {Capability, [DefaultSup]},
+ Result = rpc:call(Node, application, get_env, [App, Var]),
+ case Result of
+ {badrpc, _} ->
+ {false, Default};
+ undefined ->
+ {true, Default};
+ {ok, Value} ->
+ case lists:keyfind(Value, 1, Map) of
+ false ->
+ {true, Default};
+ {Value, Supported} ->
+ {true, {Capability, [Supported]}}
+ end
+ end.
+
+save_registered(Registered) ->
+ application:set_env(riak_core, registered_capabilities, Registered).
+
+load_registered() ->
+ case application:get_env(riak_core, registered_capabilities) of
+ undefined -> [];
+ {ok, Caps} -> Caps
+ end.
+
+%% ===================================================================
+%% EUnit tests
+%% ===================================================================
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+basic_test() ->
+ S1 = init_state([]),
+
+ S2 = register_capability(n1,
+ {riak_core, test},
+ capability_info([x,a,c,y], y, []),
+ S1),
+ S3 = add_node_capabilities(n2,
+ [{{riak_core, test}, [a,b,c,y]}],
+ S2),
+ S4 = negotiate_capabilities(n1, [{riak_core, []}], S3),
+ ?assertEqual([{{riak_core, test}, a}], S4#state.negotiated),
+
+ S5 = negotiate_capabilities(n1,
+ [{riak_core, [{test, [{prefer, c}]}]}],
+ S4),
+ ?assertEqual([{{riak_core, test}, c}], S5#state.negotiated),
+
+ S6 = add_node_capabilities(n3,
+ [{{riak_core, test}, [b]}],
+ S5),
+ S7 = negotiate_capabilities(n1, [{riak_core, []}], S6),
+ ?assertEqual([{{riak_core, test}, y}], S7#state.negotiated),
+
+ S8 = negotiate_capabilities(n1,
+ [{riak_core, [{test, [{use, x}]}]}],
+ S7),
+ ?assertEqual([{{riak_core, test}, x}], S8#state.negotiated),
+ ok.
+
+-endif.
View
1 src/riak_core_ring_handler.erl
@@ -37,6 +37,7 @@ handle_event({ring_update, Ring}, State) ->
%% Make sure all vnodes are started...
ensure_vnodes_started(Ring),
riak_core_vnode_manager:ring_changed(Ring),
+ riak_core_capability:ring_changed(Ring),
{ok, State}.
handle_call(_Event, State) ->
View
1 src/riak_core_sup.erl
@@ -69,6 +69,7 @@ init([]) ->
?CHILD(riak_core_node_watcher_events, worker),
?CHILD(riak_core_node_watcher, worker),
?CHILD(riak_core_vnode_manager, worker),
+ ?CHILD(riak_core_capability, worker),
?CHILD(riak_core_gossip, worker),
RiakWebs
]),
View
24 src/riak_core_util.erl
@@ -36,6 +36,7 @@
mkclientid/1,
start_app_deps/1,
build_tree/3,
+ orddict_delta/2,
rpc_every_member/4,
rpc_every_member_ann/4,
is_arch/1]).
@@ -263,6 +264,29 @@ build_tree(N, Nodes, Opts) ->
end, {[], tl(Expand)}, Nodes),
orddict:from_list(Tree).
+orddict_delta(A, B) ->
+ %% Pad both A and B to the same length
+ DummyA = [{Key, '$none'} || {Key, _} <- B],
+ A2 = orddict:merge(fun(_, Value, _) ->
+ Value
+ end, A, DummyA),
+
+ DummyB = [{Key, '$none'} || {Key, _} <- A],
+ B2 = orddict:merge(fun(_, Value, _) ->
+ Value
+ end, B, DummyB),
+
+ %% Merge and filter out equal values
+ Merged = orddict:merge(fun(_, AVal, BVal) ->
+ {AVal, BVal}
+ end, A2, B2),
+ Diff = orddict:filter(fun(_, {Same, Same}) ->
+ false;
+ (_, _) ->
+ true
+ end, Merged),
+ Diff.
+
%% Returns a forced-lowercase architecture for this node
-spec get_arch () -> string().
get_arch () -> string:to_lower(erlang:system_info(system_architecture)).

0 comments on commit d83fee8

Please sign in to comment.
Something went wrong with that request. Please try again.