Skip to content

Commit

Permalink
Merge branch 'rdb-stats-re-register' into 1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
russelldb committed Jul 20, 2012
2 parents ea26977 + 46294e8 commit 5404bf8
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
41 changes: 27 additions & 14 deletions src/riak_core.erl
Expand Up @@ -24,6 +24,7 @@
leave/0, remove_from_cluster/1]). leave/0, remove_from_cluster/1]).
-export([vnode_modules/0]). -export([vnode_modules/0]).
-export([register/1, register/2, bucket_fixups/0, bucket_validators/0]). -export([register/1, register/2, bucket_fixups/0, bucket_validators/0]).
-export([stat_mods/0]).


-export([add_guarded_event_handler/3, add_guarded_event_handler/4]). -export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
-export([delete_guarded_event_handler/3]). -export([delete_guarded_event_handler/3]).
Expand Down Expand Up @@ -115,7 +116,7 @@ standard_join(Node, Rejoin, Auto) when is_atom(Node) ->
false -> false ->
standard_join(Node, Ring, Rejoin, Auto) standard_join(Node, Ring, Rejoin, Auto)
end; end;
_ -> _ ->
{error, unable_to_get_join_ring} {error, unable_to_get_join_ring}
end; end;
pang -> pang ->
Expand Down Expand Up @@ -160,11 +161,11 @@ legacy_join(Node) when is_atom(Node) ->
pong -> pong ->
case rpc:call(Node, case rpc:call(Node,
application, application,
get_env, get_env,
[riak_core, ring_creation_size]) of [riak_core, ring_creation_size]) of
{ok, OurRingSize} -> {ok, OurRingSize} ->
riak_core_gossip:send_ring(Node, node()); riak_core_gossip:send_ring(Node, node());
_ -> _ ->
{error, different_ring_sizes} {error, different_ring_sizes}
end; end;
pang -> pang ->
Expand All @@ -190,7 +191,7 @@ remove(Node) ->


standard_remove(Node) -> standard_remove(Node) ->
riak_core_ring_manager:ring_trans( riak_core_ring_manager:ring_trans(
fun(Ring2, _) -> fun(Ring2, _) ->
Ring3 = riak_core_ring:remove_member(node(), Ring2, Node), Ring3 = riak_core_ring:remove_member(node(), Ring2, Node),
Ring4 = riak_core_ring:ring_changed(node(), Ring3), Ring4 = riak_core_ring:ring_changed(node(), Ring3),
{new_ring, Ring4} {new_ring, Ring4}
Expand All @@ -215,7 +216,7 @@ down(false, Node) ->
{error, only_member}; {error, only_member};
_ -> _ ->
riak_core_ring_manager:ring_trans( riak_core_ring_manager:ring_trans(
fun(Ring2, _) -> fun(Ring2, _) ->
Ring3 = riak_core_ring:down_member(node(), Ring2, Node), Ring3 = riak_core_ring:down_member(node(), Ring2, Node),
Ring4 = riak_core_ring:ring_changed(node(), Ring3), Ring4 = riak_core_ring:ring_changed(node(), Ring3),
{new_ring, Ring4} {new_ring, Ring4}
Expand Down Expand Up @@ -246,7 +247,7 @@ leave() ->


standard_leave(Node) -> standard_leave(Node) ->
riak_core_ring_manager:ring_trans( riak_core_ring_manager:ring_trans(
fun(Ring2, _) -> fun(Ring2, _) ->
Ring3 = riak_core_ring:leave_member(Node, Ring2, Node), Ring3 = riak_core_ring:leave_member(Node, Ring2, Node),
{new_ring, Ring3} {new_ring, Ring3}
end, []), end, []),
Expand Down Expand Up @@ -289,6 +290,12 @@ bucket_validators() ->
{ok, Mods} -> Mods {ok, Mods} -> Mods
end. end.


stat_mods() ->
case application:get_env(riak_core, stat_mods) of
undefined -> [];
{ok, Mods} -> Mods
end.

%% Get the application name if not supplied, first by get_application %% Get the application name if not supplied, first by get_application
%% then by searching by module name %% then by searching by module name
get_app(undefined, Module) -> get_app(undefined, Module) ->
Expand Down Expand Up @@ -324,12 +331,18 @@ register(App, [{vnode_module, VNodeMod}|T]) ->
register(App, T); register(App, T);
register(App, [{bucket_validator, ValidationMod}|T]) -> register(App, [{bucket_validator, ValidationMod}|T]) ->
register_mod(get_app(App, ValidationMod), ValidationMod, bucket_validators), register_mod(get_app(App, ValidationMod), ValidationMod, bucket_validators),
register(App, T);
register(App, [{stat_mod, StatMod}|T]) ->
register_mod(App, StatMod, stat_mods),
register(App, T). register(App, T).



register_mod(App, Module, Type) when is_atom(Module), is_atom(Type) -> register_mod(App, Module, Type) when is_atom(Module), is_atom(Type) ->
case Type of case Type of
vnode_modules -> vnode_modules ->
riak_core_vnode_proxy_sup:start_proxies(Module); riak_core_vnode_proxy_sup:start_proxies(Module);
stat_mods ->
Module:register_stats();
_ -> _ ->
ok ok
end, end,
Expand All @@ -356,9 +369,9 @@ add_guarded_event_handler(HandlerMod, Handler, Args) ->
%% ExitFun = fun(Handler, Reason::term()) %% ExitFun = fun(Handler, Reason::term())
%% AddResult = ok | {error, Reason::term()} %% AddResult = ok | {error, Reason::term()}
%% %%
%% @doc Add a "guarded" event handler to a gen_event instance. %% @doc Add a "guarded" event handler to a gen_event instance.
%% A guarded handler is implemented as a supervised gen_server %% A guarded handler is implemented as a supervised gen_server
%% (riak_core_eventhandler_guard) that adds a supervised handler in its %% (riak_core_eventhandler_guard) that adds a supervised handler in its
%% init() callback and exits when the handler crashes so it can be %% init() callback and exits when the handler crashes so it can be
%% restarted by the supervisor. %% restarted by the supervisor.
add_guarded_event_handler(HandlerMod, Handler, Args, ExitFun) -> add_guarded_event_handler(HandlerMod, Handler, Args, ExitFun) ->
Expand All @@ -372,13 +385,13 @@ add_guarded_event_handler(HandlerMod, Handler, Args, ExitFun) ->
%% Reason = term() %% Reason = term()
%% %%
%% @doc Delete a guarded event handler from a gen_event instance. %% @doc Delete a guarded event handler from a gen_event instance.
%% %%
%% Args is an arbitrary term which is passed as one of the arguments to %% Args is an arbitrary term which is passed as one of the arguments to
%% Module:terminate/2. %% Module:terminate/2.
%% %%
%% The return value is the return value of Module:terminate/2. If the %% The return value is the return value of Module:terminate/2. If the
%% specified event handler is not installed, the function returns %% specified event handler is not installed, the function returns
%% {error,module_not_found}. If the callback function fails with Reason, %% {error,module_not_found}. If the callback function fails with Reason,
%% the function returns {'EXIT',Reason}. %% the function returns {'EXIT',Reason}.
delete_guarded_event_handler(HandlerMod, Handler, Args) -> delete_guarded_event_handler(HandlerMod, Handler, Args) ->
riak_core_eventhandler_sup:stop_guarded_handler(HandlerMod, Handler, Args). riak_core_eventhandler_sup:stop_guarded_handler(HandlerMod, Handler, Args).
Expand Down
2 changes: 1 addition & 1 deletion src/riak_core_app.erl
Expand Up @@ -75,7 +75,7 @@ start(_StartType, _StartArgs) ->
%% Spin up the supervisor; prune ring files as necessary %% Spin up the supervisor; prune ring files as necessary
case riak_core_sup:start_link() of case riak_core_sup:start_link() of
{ok, Pid} -> {ok, Pid} ->
riak_core_stat:register_stats(), riak_core:register(riak_core, [{stat_mod, riak_core_stat}]),
ok = riak_core_ring_events:add_guarded_handler(riak_core_ring_handler, []), ok = riak_core_ring_events:add_guarded_handler(riak_core_ring_handler, []),
%% App is running; search for latest ring file and initialize with it %% App is running; search for latest ring file and initialize with it
riak_core_ring_manager:prune_ringfiles(), riak_core_ring_manager:prune_ringfiles(),
Expand Down
15 changes: 8 additions & 7 deletions src/riak_core_stat.erl
Expand Up @@ -42,6 +42,7 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).


register_stats() -> register_stats() ->
[(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
[register_stat({?APP, Name}, Type) || {Name, Type} <- stats()], [register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}). riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).


Expand All @@ -57,6 +58,13 @@ get_stats() ->
update(Arg) -> update(Arg) ->
gen_server:cast(?SERVER, {update, Arg}). gen_server:cast(?SERVER, {update, Arg}).


% @spec produce_stats(state(), integer()) -> proplist()
%% @doc Produce a proplist-formatted view of the current aggregation
%% of stats.
produce_stats() ->
lists:append([gossip_stats(),
vnodeq_stats()]).

%% gen_server %% gen_server


init([]) -> init([]) ->
Expand Down Expand Up @@ -124,13 +132,6 @@ register_stat(Name, spiral) ->
register_stat(Name, duration) -> register_stat(Name, duration) ->
folsom_metrics:new_duration(Name). folsom_metrics:new_duration(Name).


% @spec produce_stats(state(), integer()) -> proplist()
%% @doc Produce a proplist-formatted view of the current aggregation
%% of stats.
produce_stats() ->
lists:append([gossip_stats(),
vnodeq_stats()]).

gossip_stats() -> gossip_stats() ->
lists:flatten([backwards_compat(Stat, Type, folsom_metrics:get_metric_value({?APP, Stat})) || lists:flatten([backwards_compat(Stat, Type, folsom_metrics:get_metric_value({?APP, Stat})) ||
{Stat, Type} <- stats(), Stat /= riak_core_rejected_handoffs]). {Stat, Type} <- stats(), Stat /= riak_core_rejected_handoffs]).
Expand Down
5 changes: 4 additions & 1 deletion src/riak_core_stat_cache.erl
Expand Up @@ -68,7 +68,10 @@ stop() ->
init([]) -> init([]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
Tab = ets:new(?MODULE, [protected, set, named_table]), Tab = ets:new(?MODULE, [protected, set, named_table]),
{ok, #state{tab=Tab}}. TTL = app_helper:get_env(riak_core, stat_cache_ttl, ?TTL),
%% re-register mods, if this is a restart after a crash
RegisteredMods = [{App, {Mod, produce_stats, [], TTL}} || {App, Mod} <- riak_core:stat_mods()],
{ok, #state{tab=Tab, apps=orddict:from_list(RegisteredMods)}}.


handle_call({register, App, {Mod, Fun, Args}, TTL}, _From, State0=#state{apps=Apps0}) -> handle_call({register, App, {Mod, Fun, Args}, TTL}, _From, State0=#state{apps=Apps0}) ->
Apps = case registered(App, Apps0) of Apps = case registered(App, Apps0) of
Expand Down

0 comments on commit 5404bf8

Please sign in to comment.