Skip to content

Commit

Permalink
Register stat mods with the riak_core app
Browse files Browse the repository at this point in the history
When the stat cache crashes, we must re-register stat mods with
the cache so that it works when re-started.

Delete stats before register

This is to ensure that a restarted riak_core_stat will not
leave any orphaned folsom stats. Folsom needs some work to handle
crashing owners better. Some table in folsom are owned
by the creating process, and some by folsom. If riak_core_stat
crashes some folsom can be left inconsistent. This cleans up
at start time.
  • Loading branch information
russelldb committed Jul 20, 2012
1 parent ea26977 commit 46294e8
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
41 changes: 27 additions & 14 deletions src/riak_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
leave/0, remove_from_cluster/1]).
-export([vnode_modules/0]).
-export([register/1, register/2, bucket_fixups/0, bucket_validators/0]).
-export([stat_mods/0]).

-export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
-export([delete_guarded_event_handler/3]).
Expand Down Expand Up @@ -115,7 +116,7 @@ standard_join(Node, Rejoin, Auto) when is_atom(Node) ->
false ->
standard_join(Node, Ring, Rejoin, Auto)
end;
_ ->
_ ->
{error, unable_to_get_join_ring}
end;
pang ->
Expand Down Expand Up @@ -160,11 +161,11 @@ legacy_join(Node) when is_atom(Node) ->
pong ->
case rpc:call(Node,
application,
get_env,
get_env,
[riak_core, ring_creation_size]) of
{ok, OurRingSize} ->
riak_core_gossip:send_ring(Node, node());
_ ->
_ ->
{error, different_ring_sizes}
end;
pang ->
Expand All @@ -190,7 +191,7 @@ remove(Node) ->

standard_remove(Node) ->
riak_core_ring_manager:ring_trans(
fun(Ring2, _) ->
fun(Ring2, _) ->
Ring3 = riak_core_ring:remove_member(node(), Ring2, Node),
Ring4 = riak_core_ring:ring_changed(node(), Ring3),
{new_ring, Ring4}
Expand All @@ -215,7 +216,7 @@ down(false, Node) ->
{error, only_member};
_ ->
riak_core_ring_manager:ring_trans(
fun(Ring2, _) ->
fun(Ring2, _) ->
Ring3 = riak_core_ring:down_member(node(), Ring2, Node),
Ring4 = riak_core_ring:ring_changed(node(), Ring3),
{new_ring, Ring4}
Expand Down Expand Up @@ -246,7 +247,7 @@ leave() ->

standard_leave(Node) ->
riak_core_ring_manager:ring_trans(
fun(Ring2, _) ->
fun(Ring2, _) ->
Ring3 = riak_core_ring:leave_member(Node, Ring2, Node),
{new_ring, Ring3}
end, []),
Expand Down Expand Up @@ -289,6 +290,12 @@ bucket_validators() ->
{ok, Mods} -> Mods
end.

stat_mods() ->
case application:get_env(riak_core, stat_mods) of
undefined -> [];
{ok, Mods} -> Mods
end.

%% Get the application name if not supplied, first by get_application
%% then by searching by module name
get_app(undefined, Module) ->
Expand Down Expand Up @@ -324,12 +331,18 @@ register(App, [{vnode_module, VNodeMod}|T]) ->
register(App, T);
register(App, [{bucket_validator, ValidationMod}|T]) ->
register_mod(get_app(App, ValidationMod), ValidationMod, bucket_validators),
register(App, T);
register(App, [{stat_mod, StatMod}|T]) ->
register_mod(App, StatMod, stat_mods),
register(App, T).


register_mod(App, Module, Type) when is_atom(Module), is_atom(Type) ->
case Type of
vnode_modules ->
riak_core_vnode_proxy_sup:start_proxies(Module);
stat_mods ->
Module:register_stats();
_ ->
ok
end,
Expand All @@ -356,9 +369,9 @@ add_guarded_event_handler(HandlerMod, Handler, Args) ->
%% ExitFun = fun(Handler, Reason::term())
%% AddResult = ok | {error, Reason::term()}
%%
%% @doc Add a "guarded" event handler to a gen_event instance.
%% A guarded handler is implemented as a supervised gen_server
%% (riak_core_eventhandler_guard) that adds a supervised handler in its
%% @doc Add a "guarded" event handler to a gen_event instance.
%% A guarded handler is implemented as a supervised gen_server
%% (riak_core_eventhandler_guard) that adds a supervised handler in its
%% init() callback and exits when the handler crashes so it can be
%% restarted by the supervisor.
add_guarded_event_handler(HandlerMod, Handler, Args, ExitFun) ->
Expand All @@ -372,13 +385,13 @@ add_guarded_event_handler(HandlerMod, Handler, Args, ExitFun) ->
%% Reason = term()
%%
%% @doc Delete a guarded event handler from a gen_event instance.
%%
%% Args is an arbitrary term which is passed as one of the arguments to
%%
%% Args is an arbitrary term which is passed as one of the arguments to
%% Module:terminate/2.
%%
%% The return value is the return value of Module:terminate/2. If the
%% specified event handler is not installed, the function returns
%% {error,module_not_found}. If the callback function fails with Reason,
%% The return value is the return value of Module:terminate/2. If the
%% specified event handler is not installed, the function returns
%% {error,module_not_found}. If the callback function fails with Reason,
%% the function returns {'EXIT',Reason}.
delete_guarded_event_handler(HandlerMod, Handler, Args) ->
riak_core_eventhandler_sup:stop_guarded_handler(HandlerMod, Handler, Args).
Expand Down
2 changes: 1 addition & 1 deletion src/riak_core_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ start(_StartType, _StartArgs) ->
%% Spin up the supervisor; prune ring files as necessary
case riak_core_sup:start_link() of
{ok, Pid} ->
riak_core_stat:register_stats(),
riak_core:register(riak_core, [{stat_mod, riak_core_stat}]),
ok = riak_core_ring_events:add_guarded_handler(riak_core_ring_handler, []),
%% App is running; search for latest ring file and initialize with it
riak_core_ring_manager:prune_ringfiles(),
Expand Down
15 changes: 8 additions & 7 deletions src/riak_core_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

register_stats() ->
[(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
[register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).

Expand All @@ -57,6 +58,13 @@ get_stats() ->
update(Arg) ->
gen_server:cast(?SERVER, {update, Arg}).

% @spec produce_stats(state(), integer()) -> proplist()
%% @doc Produce a proplist-formatted view of the current aggregation
%% of stats.
produce_stats() ->
lists:append([gossip_stats(),
vnodeq_stats()]).

%% gen_server

init([]) ->
Expand Down Expand Up @@ -124,13 +132,6 @@ register_stat(Name, spiral) ->
register_stat(Name, duration) ->
folsom_metrics:new_duration(Name).

% @spec produce_stats(state(), integer()) -> proplist()
%% @doc Produce a proplist-formatted view of the current aggregation
%% of stats.
produce_stats() ->
lists:append([gossip_stats(),
vnodeq_stats()]).

gossip_stats() ->
lists:flatten([backwards_compat(Stat, Type, folsom_metrics:get_metric_value({?APP, Stat})) ||
{Stat, Type} <- stats(), Stat /= riak_core_rejected_handoffs]).
Expand Down
5 changes: 4 additions & 1 deletion src/riak_core_stat_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ stop() ->
init([]) ->
process_flag(trap_exit, true),
Tab = ets:new(?MODULE, [protected, set, named_table]),
{ok, #state{tab=Tab}}.
TTL = app_helper:get_env(riak_core, stat_cache_ttl, ?TTL),
%% re-register mods, if this is a restart after a crash
RegisteredMods = [{App, {Mod, produce_stats, [], TTL}} || {App, Mod} <- riak_core:stat_mods()],
{ok, #state{tab=Tab, apps=orddict:from_list(RegisteredMods)}}.

handle_call({register, App, {Mod, Fun, Args}, TTL}, _From, State0=#state{apps=Apps0}) ->
Apps = case registered(App, Apps0) of
Expand Down

0 comments on commit 46294e8

Please sign in to comment.