Skip to content

Commit

Permalink
Re-use the stat calc funs in riak_core
Browse files Browse the repository at this point in the history
Use the cached riak_pipe stats for legacy stats

When a broken stat is detected, register it.

Fix bug where stats endpoints were calculating _all_ riak_kv stats

Since adding many more stats, and most of the infrastructure for
ad hoc querying of stats, the stat calculation code for the
(not yet legacy) endpoints was calculating all stats for riak_kv.

As there are about (ring_size * vnode stats) + (fsm stages * fsm stats)
more stats now, this calculation, understandbly, took a long time.

This patch instead only calculates the minimum subset of stats
needed to support the (not yet legacy) stats endpoints.
  • Loading branch information
russelldb committed Mar 21, 2013
1 parent 98aba59 commit fd2e527
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 37 deletions.
94 changes: 91 additions & 3 deletions src/riak_kv_stat.erl
Expand Up @@ -47,6 +47,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {repair_mon}).

-define(SERVER, ?MODULE).
-define(APP, riak_kv).

Expand Down Expand Up @@ -82,7 +84,8 @@ update(Arg) ->
ok
catch
ErrClass:Err ->
lager:error("~p:~p updating stat ~p.", [ErrClass, Err, Arg])
lager:error("~p:~p updating stat ~p.", [ErrClass, Err, Arg]),
gen_server:cast(?SERVER, {re_register_stat, Arg})
end.

track_bucket(Bucket) when is_binary(Bucket) ->
Expand All @@ -106,18 +109,32 @@ stop() ->

init([]) ->
register_stats(),
{ok, ok}.
RepairMonitor = spawn_monitor(fun() -> stat_repair_loop() end),
{ok, #state{repair_mon=RepairMonitor}}.

handle_call({register, Name, Type}, _From, State) ->
Rep = do_register_stat(Name, Type),
{reply, Rep, State}.


handle_cast({re_register_stat, Arg}, State) ->
%% To avoid massive message queues
%% riak_kv stats are updated in the calling process
%% @see `update/1'.
%% The downside is that errors updating a stat don't crash
%% the server, so broken stats stay broken.
%% This re-creates the same behaviour as when a brokwn stat
%% crashes the gen_server by re-registering that stat.
#state{repair_mon={Pid, _Mon}} = State,
Pid ! {re_register_stat, Arg},
{noreply, State};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Req, State) ->
{noreply, State}.

handle_info({'DOWN', MonRef, process, Pid, _Cause}, State=#state{repair_mon={Pid, MonRef}}) ->
RepairMonitor = spawn_monitor(fun() -> stat_repair_loop() end),
{noreply, State#state{repair_mon=RepairMonitor}};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -368,3 +385,74 @@ rbe_val(undefined) ->
undefined;
rbe_val(Bin) ->
list_to_integer(binary_to_list(Bin)).

%% All stat creation is serialized through riak_kv_stat.
%% Some stats are created on demand as part of the call to `update/1'.
%% When a stat error is caught, the stat must be deleted and recreated.
%% Since stat updates can happen from many processes concurrently
%% a stat that throws an error may already have been deleted and
%% recreated. To protect against needlessly deleting and recreating
%% an already 'fixed stat' first retry the stat update. There is a chance
%% that the retry succeeds as the stat has been recreated, but some on
%% demand stat it uses has not yet. Since stat creates are serialized
%% in riak_kv_stat re-registering a stat could cause a deadlock.
%% This loop is spawned as a process to avoid that.
stat_repair_loop() ->
receive
{re_register_stat, Arg} ->
re_register_stat(Arg),
stat_repair_loop();
_ ->
stat_repair_loop()
end.

re_register_stat(Arg) ->
case (catch do_update(Arg)) of
{'EXIT', _} ->
Stats = stats_from_update_arg(Arg),
[begin
(catch folsom_metrics:delete_metric(Name)),
do_register_stat(Name, Type)
end || {Name, {metric, _, Type, _}} <- Stats];
ok ->
ok
end.

%% Map from application argument used in call to `update/1' to
%% folsom stat names and types.
%% Updates that create dynamic stats must select all
%% related stats.
stats_from_update_arg({vnode_get, _, _}) ->
riak_core_stat_q:names_and_types([?APP, vnode, gets]);
stats_from_update_arg({vnode_put, _, _}) ->
riak_core_stat_q:names_and_types([?APP, vnode, puts]);
stats_from_update_arg(vnode_index_read) ->
riak_core_stat_q:names_and_types([?APP, vnode, index, reads]);
stats_from_update_arg({vnode_index_write, _, _}) ->
riak_core_stat_q:names_and_types([?APP, vnode, index, writes]) ++
riak_core_stat_q:names_and_types([?APP, vnode, index, deletes]);
stats_from_update_arg({vnode_index_delete, _}) ->
riak_core_stat_q:names_and_types([?APP, vnode, index, deletes]);
stats_from_update_arg({get_fsm, _, _, _, _, _, _}) ->
riak_core_stat_q:names_and_types([?APP, node, gets]);
stats_from_update_arg({put_fsm_time, _, _, _, _}) ->
riak_core_stat_q:names_and_types([?APP, node, puts]);
stats_from_update_arg({read_repairs, _, _}) ->
riak_core_stat_q:names_and_types([?APP, nodes, gets, read_repairs]);
stats_from_update_arg(coord_redirs) ->
[{?APP, node, puts, coord_redirs}, {metric,[],counter,undefined}];
stats_from_update_arg(mapper_start) ->
[{?APP, mapper_count}, {metric,[],counter,undefined}];
stats_from_update_arg(mapper_end) ->
stats_from_update_arg(mapper_start);
stats_from_update_arg(precommit_fail) ->
[{?APP, precommit_fail}, {metric,[],counter,undefined}];
stats_from_update_arg(postcommit_fail) ->
[{?APP, postcommit_fail}, {metric,[],counter,undefined}];
stats_from_update_arg({fsm_spawned, Type}) ->
[{?APP, node, Type, fsm, active}, {metric,[],counter,undefined}];
stats_from_update_arg({fsm_exit, Type}) ->
stats_from_update_arg({fsm_spawned, Type});
stats_from_update_arg({fsm_error, Type}) ->
stats_from_update_arg({fsm_spawned, Type}) ++
[{?APP, node, Type, fsm, errors}, {metric,[], spiral, undefined}].
89 changes: 55 additions & 34 deletions src/riak_kv_stat_bc.erl
Expand Up @@ -143,8 +143,7 @@
%% of stats.
produce_stats() ->
lists:append(
[lists:flatten(backwards_compat(riak_core_stat_q:get_stats([riak_kv]))),
backwards_compat_pb(riak_core_stat_q:get_stats([riak_api])),
[lists:flatten(legacy_stats()),
read_repair_stats(),
level_stats(),
pipe_stats(),
Expand All @@ -163,30 +162,56 @@ produce_stats() ->
%% naming constraints the new names are not simply the old names
%% with commas for underscores. Uses legacy_stat_map to generate
%% legacys stats from the new list of stats.
backwards_compat(Stats) ->
[bc_stat(Old, New, Type, Stats) || {Old, New, Type} <- legacy_stat_map()].
legacy_stats() ->
{Legacy, _Calculated} = lists:foldl(fun({Old, New, Type}, {Acc, Cache}) ->
bc_stat({Old, New, Type}, Acc, Cache) end,
{[], []},
legacy_stat_map()),
lists:reverse(Legacy).

bc_stat(Old, {New, Field}, histogram_percentile, Stats) ->
Stat = proplists:get_value(New, Stats),
Percentile = proplists:get_value(percentile, Stat),
Val = proplists:get_value(Field, Percentile),
{Old, trunc(Val)};
bc_stat(Old, {New, Field}, histogram, Stats) ->
Stat = proplists:get_value(New, Stats),
Val = proplists:get_value(Field, Stat),
{Old, trunc(Val)};
bc_stat(Old, {New, Field}, spiral, Stats) ->
Stat = proplists:get_value(New, Stats),
Val = proplists:get_value(Field, Stat),
{Old, Val};
bc_stat(Old, New, counter, Stats) ->
Stat = proplists:get_value(New, Stats),
{Old, Stat}.
%% @doc legacy stats uses multifield stats for multiple stats
%% don't calculate the same stat many times
get_stat(Name, Type, Cache) ->
get_stat(Name, Type, Cache, fun(S) -> S end).

get_stat(Name, Type, Cache, ValFun) ->
case proplists:get_value(Name, Cache) of
undefined ->
case riak_core_stat_q:calc_stat({Name, Type}) of
unavailable -> {unavailable, Cache};
Stat ->
{ValFun(Stat), [{Name, Stat} | Cache]}
end;
Cached -> {ValFun(Cached), Cache}
end.

bc_stat({Old, {NewName, Field}, histogram}, Acc, Cache) ->
ValFun = fun(Stat) -> trunc(proplists:get_value(Field, Stat)) end,
{Val, Cache1} = get_stat(NewName, histogram, Cache, ValFun),
{[{Old, Val} | Acc], Cache1};
bc_stat({Old, {NewName, Field}, histogram_percentile}, Acc, Cache) ->
ValFun = fun(Stat) ->
Percentile = proplists:get_value(percentile, Stat),
Val = proplists:get_value(Field, Percentile),
trunc(Val) end,
{Val, Cache1} = get_stat(NewName, histogram, Cache, ValFun),
{[{Old, Val} | Acc], Cache1};
bc_stat({Old, {NewName, Field}, spiral}, Acc, Cache) ->
ValFun = fun(Stat) ->
proplists:get_value(Field, Stat)
end,
{Val, Cache1} = get_stat(NewName, spiral, Cache, ValFun),
{[{Old, Val} | Acc], Cache1};
bc_stat({Old, NewName, counter}, Acc, Cache) ->
{Val, Cache1} = get_stat(NewName, counter, Cache),
{[{Old, Val} | Acc], Cache1};
bc_stat({Old, NewName, function}, Acc, Cache) ->
{Val, Cache1} = get_stat(NewName, gauge, Cache),
{[{Old, Val} | Acc], Cache1}.

%% hard coded mapping of stats to legacy format
%% There was a enough variation in the old names that a simple
%% concatenation of the elements in the new stat key would not suffice
%% concatenation of the elements in the new stat key would not suffice
%% applications depend on these exact legacy names.
legacy_stat_map() ->
[{vnode_gets, {{riak_kv, vnode, gets}, one}, spiral},
Expand Down Expand Up @@ -232,17 +257,12 @@ legacy_stat_map() ->
{coord_redirs_total, {riak_kv,node,puts,coord_redirs}, counter},
{executing_mappers, {riak_kv,mapper_count}, counter},
{precommit_fail, {riak_kv, precommit_fail}, counter},
{postcommit_fail, {riak_kv, postcommit_fail}, counter}
{postcommit_fail, {riak_kv, postcommit_fail}, counter},
{pbc_active, {riak_api, pbc_connects, active}, function},
{pbc_connects, {{riak_api, pbc_connects}, one}, spiral},
{pbc_connects_total, {{riak_api, pbc_connects}, count}, spiral}
].

%% PB stats are now under riak_api. In the past they were part of riak_kv.
%% This function maps those new values to the old names.
backwards_compat_pb(Stats) ->
[bc_stat(Old, New, Type, Stats) || {Old, New, Type} <-
[{pbc_active, {riak_api, pbc_connects, active}, counter},
{pbc_connects, {{riak_api, pbc_connects}, one}, spiral},
{pbc_connects_total, {{riak_api, pbc_connects}, count}, spiral}]].

%% @spec cpu_stats() -> proplist()
%% @doc Get stats on the cpu, as given by the cpu_sup module
%% of the os_mon application.
Expand Down Expand Up @@ -319,14 +339,15 @@ config_stats() ->
%% @doc add the pipe stats to the blob in a style consistent
%% with those stats already in the blob
pipe_stats() ->
Stats = riak_core_stat_q:get_stats([riak_pipe]),
lists:flatten([bc_stat(Name, Val) || {Name, Val} <- Stats]).
lists:flatten([bc_stat(Name, Val) || {Name, Val} <- riak_pipe_stat:get_stats()]).

%% old style blob stats don't have the app name
%% and they have underscores, not commas
bc_stat(Name, Val) ->
bc_stat(Name, Val) when is_tuple(Name) ->
StatName = join(tl(tuple_to_list(Name))),
bc_stat_val(StatName, Val).
bc_stat_val(StatName, Val);
bc_stat(Name, Val) ->
bc_stat_val(Name, Val).

%% Old style stats don't have tuple lists as values
%% they have an entry per element in the complex stats tuple list
Expand Down

0 comments on commit fd2e527

Please sign in to comment.