Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Integrate the Exometer metrics package #465

Closed
wants to merge 40 commits into from

3 participants

@uwiger

This PR is part of a set of PRs aimed at integrating the Exometer metrics package into Riak.

From our measurements so far, Exometer offers both better throughput and lower footprint than the previous metrics management, and at the same time offers more flexible and uniform handling and better extensibility.

In addition to maintaining the console command riak-admin status and the HTTP JSON report (which aim to be backwards-compatible), a new console command, riak-admin stat <cmd> has been added, for selective reporting of statistics as well as some management (ability to enable/disable metrics on the fly).

Other noteworthy changes:

  • Exometer entry names are always lists. In riak, only atoms and numbers should be used as list elements.
  • A top-level 'prefix' (default: riak) has been added, in order to differentiate between stats from different riak-style products (e.g. when reporting stats to collectd). The function riak_core_stat:prefix() is generated as a constant expression through the parse transform riak_core_stat_xform, which in its turn checks the OS env variable RIAK_CORE_STAT_PREFIX. A typical entry would thus be e.g. [riak,riak_kv,node,gets,siblings].
  • Internally in riak, stats are referred to symbolically using the same (tuple-based) names as before. These often refer to more than one low-level metric, so it seemed reasonable to keep this naming scheme.
  • Exometer provides similar functionality as 'sidejob' and the riak_core stat cache, so these are no longer used for stats management (although sidejob still maintains some stats on its own, which are accessible via Exometer). Other apps still register with riak_core_stat, but need not provide callbacks in other to query the stats. The query style of exometer is the same as that of riak_core_stat.
@jrwest

marked as 2.1 for same reason as #487 (comment)

@jrwest jrwest added this to the 2.1 milestone
uwiger and others added some commits
@uwiger uwiger added exometer & riak_exoport deps def619f
@uwiger uwiger integrated exometer bbd1ce4
@uwiger uwiger further exometer fixes 2a7258a
@uwiger uwiger find_entries() new return value d6ac01e
Ubuntu select only enabled metrics c875d96
@uwiger uwiger counter datapoint extraction in backwards_compat 464ceb3
@uwiger uwiger simplifications for exometer f954548
@uwiger uwiger integrated exometer 2aa8e34
@uwiger uwiger further exometer fixes 293a091
@uwiger uwiger find_entries() new return value 8ac5e5d
@uwiger uwiger Started adding console 'stat' commands a071bbd
@uwiger uwiger JSON stats fixed ddc8fd1
@uwiger uwiger remove debug printout f19e5e2
@uwiger uwiger Forgot prefix when updating stat 73603a8
@uwiger uwiger wrong git protocol for exometer.git 8b5cfaf
@uwiger uwiger export vnodeq_stats (for riak_kv_wm_stat) 64d69c2
@uwiger uwiger typo in vnodeq aggregate 7a1cea8
@uwiger uwiger testing after rebase 4753805
@uwiger uwiger undo erroneous rebase results c61a754
@uwiger uwiger added exometer & riak_exoport deps e3fb73b
@uwiger uwiger integrated exometer 8399ef3
@uwiger uwiger integrated exometer 187997e
@uwiger uwiger further exometer fixes 3ea742f
@uwiger uwiger further exometer fixes 4d943c2
@uwiger uwiger find_entries() new return value 2940ab0
@uwiger uwiger find_entries() new return value a730c2e
Ubuntu select only enabled metrics fbd3896
@uwiger uwiger counter datapoint extraction in backwards_compat 90f0808
@uwiger uwiger simplifications for exometer b38c39f
@uwiger uwiger Started adding console 'stat' commands 1e1d7f2
@uwiger uwiger JSON stats fixed eb08dc9
@uwiger uwiger remove debug printout 54a0af4
@uwiger uwiger Forgot prefix when updating stat ed6ada8
@uwiger uwiger wrong git protocol for exometer.git d824085
@uwiger uwiger export vnodeq_stats (for riak_kv_wm_stat) 102a84a
@uwiger uwiger typo in vnodeq aggregate e25b322
@uwiger uwiger testing after rebase 75ff57f
@uwiger uwiger undo erroneous rebase results 9ae73f4
@uwiger uwiger add vnode stats (exometer) 4e976a4
@uwiger uwiger Merge branch 'feuerlabs-exometer' of git://github.com/basho/riak_core…
… into feuerlabs-exometer
c51f5e5
@uwiger uwiger referenced this pull request
Closed

use exometer metrics #616

@jburwell

This PR has been superseded by PR 616.

@jburwell jburwell closed this
@seancribbs seancribbs deleted the feuerlabs-exometer branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 17, 2014
  1. @uwiger
  2. @uwiger

    integrated exometer

    uwiger authored
  3. @uwiger

    further exometer fixes

    uwiger authored
  4. @uwiger

    find_entries() new return value

    uwiger authored
  5. @uwiger

    select only enabled metrics

    Ubuntu authored uwiger committed
  6. @uwiger
  7. @uwiger

    simplifications for exometer

    uwiger authored
  8. @uwiger

    integrated exometer

    uwiger authored
Commits on Jun 18, 2014
  1. @uwiger

    further exometer fixes

    uwiger authored
  2. @uwiger

    find_entries() new return value

    uwiger authored
  3. @uwiger
  4. @uwiger

    JSON stats fixed

    uwiger authored
  5. @uwiger

    remove debug printout

    uwiger authored
  6. @uwiger
  7. @uwiger
  8. @uwiger
  9. @uwiger

    typo in vnodeq aggregate

    uwiger authored
Commits on Jun 24, 2014
  1. @uwiger

    testing after rebase

    uwiger authored
  2. @uwiger

    undo erroneous rebase results

    uwiger authored
Commits on Jul 16, 2014
  1. @uwiger @tolbrino

    added exometer & riak_exoport deps

    uwiger authored tolbrino committed
  2. @uwiger @tolbrino

    integrated exometer

    uwiger authored tolbrino committed
  3. @uwiger @tolbrino

    integrated exometer

    uwiger authored tolbrino committed
  4. @uwiger @tolbrino

    further exometer fixes

    uwiger authored tolbrino committed
  5. @uwiger @tolbrino

    further exometer fixes

    uwiger authored tolbrino committed
  6. @uwiger @tolbrino

    find_entries() new return value

    uwiger authored tolbrino committed
  7. @uwiger @tolbrino

    find_entries() new return value

    uwiger authored tolbrino committed
  8. @tolbrino

    select only enabled metrics

    Ubuntu authored tolbrino committed
  9. @uwiger @tolbrino

    counter datapoint extraction in backwards_compat

    uwiger authored tolbrino committed
  10. @uwiger @tolbrino

    simplifications for exometer

    uwiger authored tolbrino committed
  11. @uwiger @tolbrino

    Started adding console 'stat' commands

    uwiger authored tolbrino committed
  12. @uwiger @tolbrino

    JSON stats fixed

    uwiger authored tolbrino committed
  13. @uwiger @tolbrino

    remove debug printout

    uwiger authored tolbrino committed
  14. @uwiger @tolbrino

    Forgot prefix when updating stat

    uwiger authored tolbrino committed
  15. @uwiger @tolbrino

    wrong git protocol for exometer.git

    uwiger authored tolbrino committed
  16. @uwiger @tolbrino

    export vnodeq_stats (for riak_kv_wm_stat)

    uwiger authored tolbrino committed
  17. @uwiger @tolbrino

    typo in vnodeq aggregate

    uwiger authored tolbrino committed
  18. @uwiger @tolbrino

    testing after rebase

    uwiger authored tolbrino committed
  19. @uwiger @tolbrino

    undo erroneous rebase results

    uwiger authored tolbrino committed
  20. @uwiger

    add vnode stats (exometer)

    uwiger authored
  21. @uwiger
This page is out of date. Refresh to see the latest.
View
8 rebar.config
@@ -1,4 +1,5 @@
-{erl_first_files, ["src/gen_nb_server.erl", "src/riak_core_gen_server.erl"]}.
+{erl_first_files, ["src/gen_nb_server.erl", "src/riak_core_gen_server.erl",
+ "src/riak_core_stat_xform"]}.
{cover_enabled, true}.
{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
{edoc_opts, [{preprocess, true}]}.
@@ -12,8 +13,9 @@
{poolboy, ".*", {git, "git://github.com/basho/poolboy.git", {tag, "0.8.1p2"}}},
{basho_stats, ".*", {git, "git://github.com/basho/basho_stats.git", {tag, "1.0.3"}}},
{riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon.git", {branch, "develop"}}},
- {folsom, "0.7.4p4", {git, "git://github.com/basho/folsom.git", {tag, "0.7.4p4"}}},
+ {folsom, ".*", {git, "git://github.com/basho/folsom.git", {branch, "master"}}},
{riak_ensemble, ".*", {git, "git://github.com/basho/riak_ensemble", {branch, "develop"}}},
{pbkdf2, ".*", {git, "git://github.com/basho/erlang-pbkdf2.git", {tag, "2.0.0"}}},
- {eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {branch, "develop"}}}
+ {eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {branch, "develop"}}},
+ {exometer, ".*", {git, "git://github.com/Feuerlabs/exometer.git", {branch, "master"}}}
]}.
View
9 src/riak_core.app.src
@@ -18,7 +18,8 @@
basho_stats,
eleveldb,
pbkdf2,
- poolboy
+ poolboy,
+ exometer
]},
{mod, {riak_core_app, []}},
{env, [
@@ -55,11 +56,11 @@
%% Handoff IP/port
{handoff_port, 8099},
{handoff_ip, "0.0.0.0"},
-
+
%% Disterl buffer sizes in bytes.
%% These sizes (3*128*1024 & 6*128*1024) were
- %% derived from a limited amount of testing in a
- %% 10GE environment, and may need tuning for your
+ %% derived from a limited amount of testing in a
+ %% 10GE environment, and may need tuning for your
%% network and workload. In particular they're likely
%% too small to be optimal for larger object sizes.
{dist_send_buf_size, 393216},
View
144 src/riak_core_console.erl
@@ -28,7 +28,8 @@
add_source/1, del_source/1, grant/1, revoke/1,
print_users/1, print_user/1, print_sources/1,
print_groups/1, print_group/1, print_grants/1,
- security_enable/1, security_disable/1, security_status/1, ciphers/1]).
+ security_enable/1, security_disable/1, security_status/1, ciphers/1,
+ stat_show/1, stat_showall/1, stat_info/1, stat_enable/1, stat_disable/1]).
%% @doc Return for a given ring and node, percentage currently owned and
%% anticipated after the transitions have been completed.
@@ -1141,3 +1142,144 @@ parse_cidr(CIDR) ->
[IP, Mask] = string:tokens(CIDR, "/"),
{ok, Addr} = inet_parse:address(IP),
{Addr, list_to_integer(Mask)}.
+
+
+stat_show(Arg) ->
+ print_stats(find_entries(Arg)).
+
+stat_showall(Arg) ->
+ print_stats(find_entries(Arg, '_')).
+
+find_entries(Arg) ->
+ find_entries(Arg, enabled).
+
+find_entries(Arg, Status) ->
+ Patterns = lists:flatten([parse_stat_entry(S, Status) || S <- Arg]),
+ exometer:select(Patterns).
+
+print_stats(Entries) ->
+ io:fwrite(
+ [io_lib:fwrite("~p: ~p~n", [E, get_value(E, Status)]) || {E, _T, Status} <- Entries]).
+
+get_value(_, disabled) ->
+ disabled;
+get_value(E, _Status) ->
+ case exometer:get_value(E) of
+ {ok, V} -> V;
+ {error,_} -> unavailable
+ end.
+
+stat_enable(Arg) ->
+ [io:fwrite("~p: ~p~n", [N, change_status(N, enabled)])
+ || {N, _, _} <- find_entries(Arg, disabled)].
+
+stat_disable(Arg) ->
+ [io:fwrite("~p: ~p~n", [N, change_status(N, disabled)])
+ || {N, _, _} <- find_entries(Arg, enabled)].
+
+change_status(N, St) ->
+ case exometer:setopts(N, [{status, St}]) of
+ ok ->
+ St;
+ Error ->
+ Error
+ end.
+
+stat_info(Arg) ->
+ {Attrs, RestArg} = pick_info_attrs(split_arg(Arg)),
+ [print_info(E, Attrs) || E <- find_entries(RestArg, '_')].
+
+pick_info_attrs(Arg) ->
+ lists:foldr(
+ fun("-name" , {As, Ps}) -> {[name |As], Ps};
+ ("-type" , {As, Ps}) -> {[type |As], Ps};
+ ("-module" , {As, Ps}) -> {[module |As], Ps};
+ ("-value" , {As, Ps}) -> {[value |As], Ps};
+ ("-cache" , {As, Ps}) -> {[cache |As], Ps};
+ ("-status" , {As, Ps}) -> {[status |As], Ps};
+ ("-timestamp", {As, Ps}) -> {[timestamp|As], Ps};
+ ("-options" , {As, Ps}) -> {[options |As], Ps};
+ (P, {As, Ps}) -> {As, [P|Ps]}
+ end, {[], []}, Arg).
+
+print_info({N, _Type, _Status}, [A|Attrs]) ->
+ Hdr = lists:flatten(io_lib:fwrite("~p: ", [N])),
+ Pad = lists:duplicate(length(Hdr), $\s),
+ Info = exometer:info(N),
+ Body = [io_lib:fwrite("~w = ~p~n", [A, proplists:get_value(A, Info)])
+ | lists:map(fun(Ax) ->
+ io_lib:fwrite(Pad ++ "~w = ~p~n",
+ [Ax, proplists:get_value(Ax, Info)])
+ end, Attrs)],
+ io:fwrite([Hdr, Body]).
+
+split_arg([Str]) ->
+ re:split(Str, "\\s", [{return,list}]).
+
+parse_stat_entry([], Status) ->
+ {{[riak_core_stat:prefix() | '_'], '_', Status}, [], ['$_']};
+parse_stat_entry("[" ++ _ = Expr, _Status) ->
+ case erl_scan:string(ensure_trailing_dot(Expr)) of
+ {ok, Toks, _} ->
+ case erl_parse:parse_exprs(Toks) of
+ {ok, [Abst]} ->
+ partial_eval(Abst);
+ Error ->
+ io:fwrite("(Parse error for ~p: ~p~n", [Expr, Error]),
+ []
+ end;
+ ScanErr ->
+ io:fwrite("(Scan error for ~p: ~p~n", [Expr, ScanErr]),
+ []
+ end;
+parse_stat_entry(Str, Status) when Status==enabled; Status==disabled; Status=='_' ->
+ Parts = re:split(Str, "\\.", [{return,list}]),
+ {{replace_parts(Parts),'_',Status}, [], ['$_']};
+parse_stat_entry(_, Status) ->
+ io:fwrite("(Illegal status: ~p~n", [Status]).
+
+
+ensure_trailing_dot(Str) ->
+ case lists:reverse(Str) of
+ "." ++ _ ->
+ Str;
+ _ ->
+ Str ++ "."
+ end.
+
+partial_eval({cons,_,H,T}) ->
+ [partial_eval(H) | partial_eval(T)];
+%% partial_eval({nil,_}) ->
+%% [];
+partial_eval({tuple,_,Elems}) ->
+ list_to_tuple([partial_eval(E) || E <- Elems]);
+%% partial_eval({T,_,X}) when T==atom; T==integer; T==float ->
+%% X;
+partial_eval({op,_,'++',L1,L2}) ->
+ partial_eval(L1) ++ partial_eval(L2);
+partial_eval(X) ->
+ erl_parse:normalise(X).
+
+replace_parts([H|T]) ->
+ R = case H of
+ "*" -> '_';
+ "'" ++ _ ->
+ case erl_scan:string(H) of
+ {ok, [{atom, _, A}], _} ->
+ A;
+ Error ->
+ error(Error)
+ end;
+ [C|_] when C >= $0, C =< $9 ->
+ try list_to_integer(H)
+ catch
+ error:_ -> list_to_atom(H)
+ end;
+ _ -> list_to_atom(H)
+ end,
+ case T of
+ ["**"] -> [R] ++ '_';
+ _ -> [R|replace_parts(T)]
+ end;
+replace_parts([]) ->
+ [].
View
177 src/riak_core_stat.erl
@@ -23,13 +23,18 @@
-behaviour(gen_server).
%% API
--export([start_link/0, get_stats/0, update/1,
- register_stats/0, produce_stats/0]).
+-export([start_link/0, get_stats/0, get_stats/1, update/1,
+ register_stats/0, produce_stats/0, vnodeq_stats/0,
+ register_stats/2,
+ register_vnode_stats/3, unregister_vnode_stats/2,
+ prefix/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-compile({parse_transform, riak_core_stat_xform}).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
@@ -42,18 +47,55 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
register_stats() ->
- _ = [(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
- _ = [register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
- riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
+ register_stats(common, system_stats()),
+ register_stats(?APP, stats()).
+
+%% @spec register_stats(App, Stats) -> ok
+%% @doc (Re-)Register a list of metrics for App.
+register_stats(App, Stats) ->
+ P = prefix(),
+ lists:foreach(fun(Stat) ->
+ register_stat(P, App, Stat)
+ end, Stats).
+
+register_stat(P, App, Stat) ->
+ {Name, Type, Opts} = case Stat of
+ {N, T} -> {N, T, []};
+ {N, T, Os} -> {N, T, Os}
+ end,
+ exometer:re_register(stat_name(P,App,Name), Type, Opts).
+
+register_vnode_stats(Module, Index, Pid) ->
+ P = prefix(),
+ exometer:ensure([P, ?APP, vnodes_running, Module],
+ { function, exometer, select_count,
+ [[{ {[P, ?APP, vnodeq, Module, '_'], '_', '_'},
+ [], [true] }]], value, [value] }, []),
+ exometer:re_register(
+ [P, ?APP, vnodeq, Module, Index],
+ { function, erlang, process_info, [Pid, message_queue_len],
+ match, {'_', value} }, []).
+
+unregister_vnode_stats(Module, Index) ->
+ exometer:delete([riak_core_stat:prefix(), ?APP, vnodeq, Module, Index]).
+
+stat_name(P, App, N) when is_atom(N) ->
+ stat_name_([P, App, N]);
+stat_name(P, App, N) when is_list(N) ->
+ stat_name_([P, App | N]).
+
+stat_name_([P, [] | Rest]) -> [P | Rest];
+stat_name_(N) -> N.
+
%% @spec get_stats() -> proplist()
%% @doc Get the current aggregation of stats.
get_stats() ->
- case riak_core_stat_cache:get_stats(?APP) of
- {ok, Stats, _TS} ->
- Stats;
- Error -> Error
- end.
+ get_stats(?APP) ++ vnodeq_stats().
+
+get_stats(App) ->
+ P = prefix(),
+ exometer:get_values([P, App]).
update(Arg) ->
gen_server:cast(?SERVER, {update, Arg}).
@@ -65,6 +107,9 @@ produce_stats() ->
lists:append([gossip_stats(),
vnodeq_stats()]).
+prefix() ->
+ app_helper:get_env(riak_core, stat_prefix, riak).
+
%% gen_server
init([]) ->
@@ -75,7 +120,8 @@ handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast({update, Arg}, State) ->
- ok = update1(Arg),
+ exometer:update([prefix(), ?APP, Arg], update_value(Arg)),
+ %% update1(Arg),
{noreply, State};
handle_cast(_Req, State) ->
{noreply, State}.
@@ -89,35 +135,11 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%% @spec update1(term()) -> ok
-%% @doc Update the given stat.
-update1(rejected_handoffs) ->
- folsom_metrics:notify_existing_metric({?APP, rejected_handoffs}, {inc, 1}, counter);
-
-update1(handoff_timeouts) ->
- folsom_metrics:notify_existing_metric({?APP, handoff_timeouts}, {inc, 1}, counter);
-
-update1(ignored_gossip) ->
- folsom_metrics:notify_existing_metric({?APP, ignored_gossip_total}, {inc, 1}, counter);
-
-update1(gossip_received) ->
- folsom_metrics:notify_existing_metric({?APP, gossip_received}, 1, spiral);
-
-update1(rings_reconciled) ->
- folsom_metrics:notify_existing_metric({?APP, rings_reconciled}, 1, spiral);
-
-update1(dropped_vnode_requests) ->
- folsom_metrics:notify_existing_metric({?APP, dropped_vnode_requests_total}, {inc, 1}, counter);
-
-update1(converge_timer_begin) ->
- folsom_metrics:notify_existing_metric({?APP, converge_delay}, timer_start, duration);
-update1(converge_timer_end) ->
- folsom_metrics:notify_existing_metric({?APP, converge_delay}, timer_end, duration);
-
-update1(rebalance_timer_begin) ->
- folsom_metrics:notify_existing_metric({?APP, rebalance_delay}, timer_start, duration);
-update1(rebalance_timer_end) ->
- folsom_metrics:notify_existing_metric({?APP, rebalance_delay}, timer_end, duration).
+update_value(converge_timer_begin) -> timer_begin;
+update_value(rebalance_timer_begin) -> timer_begin;
+update_value(converge_timer_end) -> timer_begin;
+update_value(rebalance_timer_end) -> timer_begin;
+update_value(_) -> 1.
%% private
stats() ->
@@ -130,12 +152,9 @@ stats() ->
{converge_delay, duration},
{rebalance_delay, duration}].
-register_stat(Name, counter) ->
- folsom_metrics:new_counter(Name);
-register_stat(Name, spiral) ->
- folsom_metrics:new_spiral(Name);
-register_stat(Name, duration) ->
- folsom_metrics:new_duration(Name).
+system_stats() ->
+ [{cpu_stats, cpu, [{sample_interval, 5000}]}].
+
gossip_stats() ->
lists:flatten([backwards_compat(Stat, Type, riak_core_stat_q:calc_stat({{?APP, Stat}, Type})) ||
@@ -143,13 +162,15 @@ gossip_stats() ->
backwards_compat(Name, Type, unavailable) when Type =/= counter ->
backwards_compat(Name, Type, []);
+backwards_compat(Name, Type, {error,not_found}) when Type =/= counter ->
+ backwards_compat(Name, Type, []);
backwards_compat(rings_reconciled, spiral, Stats) ->
[{rings_reconciled_total, proplists:get_value(count, Stats, unavailable)},
{rings_reconciled, safe_trunc(proplists:get_value(one, Stats, unavailable))}];
backwards_compat(gossip_received, spiral, Stats) ->
{gossip_received, safe_trunc(proplists:get_value(one, Stats, unavailable))};
backwards_compat(Name, counter, Stats) ->
- {Name, Stats};
+ {Name, proplists:get_value(value, Stats)};
backwards_compat(Name, duration, Stats) ->
[{join(Name, min), safe_trunc(proplists:get_value(min, Stats, unavailable))},
{join(Name, max), safe_trunc(proplists:get_value(max, Stats, unavailable))},
@@ -197,17 +218,15 @@ vnodeq_aggregate(Service, MQLs0) ->
1 ->
lists:nth(Len div 2 + 1, MQLs)
end,
- [{vnodeq_atom(Service, <<"s_running">>), Len},
- {vnodeq_atom(Service, <<"q_min">>), lists:nth(1, MQLs)},
- {vnodeq_atom(Service, <<"q_median">>), Median},
- {vnodeq_atom(Service, <<"q_mean">>), Mean},
- {vnodeq_atom(Service, <<"q_max">>), lists:nth(Len, MQLs)},
- {vnodeq_atom(Service, <<"q_total">>), Total}].
+ P = prefix(),
+ [{[P, riak_core, vnodeq_atom(Service,<<"s_running">>)], [{value, Len}]},
+ {[P, riak_core, vnodeq_atom(Service,<<"q">>)],
+ [{min, lists:nth(1, MQLs)}, {median, Median}, {mean, Mean},
+ {max, lists:nth(Len, MQLs)}, {total, Total}]}].
vnodeq_atom(Service, Desc) ->
binary_to_atom(<<(atom_to_binary(Service, latin1))/binary, Desc/binary>>, latin1).
-
-ifdef(TEST).
%% Check vnodeq aggregation function
@@ -215,48 +234,38 @@ vnodeq_aggregate_empty_test() ->
?assertEqual([], vnodeq_aggregate(service_vnode, [])).
vnodeq_aggregate_odd1_test() ->
- ?assertEqual([{service_vnodes_running, 1},
- {service_vnodeq_min, 10},
- {service_vnodeq_median, 10},
- {service_vnodeq_mean, 10},
- {service_vnodeq_max, 10},
- {service_vnodeq_total, 10}],
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 1},
+ {[P, riak_core, service_vnodeq],
+ [{min, 10}, {median, 10}, {mean, 10}, {max, 10}, {total, 10}]}],
vnodeq_aggregate(service_vnode, [10])).
vnodeq_aggregate_odd3_test() ->
- ?assertEqual([{service_vnodes_running, 3},
- {service_vnodeq_min, 1},
- {service_vnodeq_median, 2},
- {service_vnodeq_mean, 2},
- {service_vnodeq_max, 3},
- {service_vnodeq_total, 6}],
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 3},
+ {[P, riak_core, service_vnodeq],
+ [{min, 1}, {median, 2}, {mean, 2}, {max, 3}, {total, 6}]}],
vnodeq_aggregate(service_vnode, [1, 2, 3])).
vnodeq_aggregate_odd5_test() ->
- ?assertEqual([{service_vnodes_running, 5},
- {service_vnodeq_min, 0},
- {service_vnodeq_median, 1},
- {service_vnodeq_mean, 2},
- {service_vnodeq_max, 5},
- {service_vnodeq_total, 10}],
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 5},
+ {[P, riak_core, service_vnodeq],
+ [{min, 0}, {median, 1}, {mean, 2}, {max, 5}, {total, 10}]}],
vnodeq_aggregate(service_vnode, [1, 0, 5, 0, 4])).
vnodeq_aggregate_even2_test() ->
- ?assertEqual([{service_vnodes_running, 2},
- {service_vnodeq_min, 10},
- {service_vnodeq_median, 15},
- {service_vnodeq_mean, 15},
- {service_vnodeq_max, 20},
- {service_vnodeq_total, 30}],
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 2},
+ {[P, riak_core, service_vnodeq],
+ [{min, 10}, {median, 15}, {mean, 15}, {max, 20}, {total, 30}]}],
vnodeq_aggregate(service_vnode, [10, 20])).
vnodeq_aggregate_even4_test() ->
- ?assertEqual([{service_vnodes_running, 4},
- {service_vnodeq_min, 0},
- {service_vnodeq_median, 5},
- {service_vnodeq_mean, 7},
- {service_vnodeq_max, 20},
- {service_vnodeq_total, 30}],
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 4},
+ {[P, riak_core, service_vnodeq]
+ [{min, 0}, {median, 5}, {mean, 7}, {max, 20}, {total, 30}]}],
vnodeq_aggregate(service_vnode, [0, 10, 0, 20])).
-endif.
View
243 src/riak_core_stat_cache.erl
@@ -30,28 +30,23 @@
-behaviour(gen_server).
%% API
--export([start_link/0, get_stats/1, register_app/2, register_app/3,
- clear_cache/1, stop/0]).
+-export([start_link/0, get_stats/1, register_app/2, stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--type registered_app() :: {MFA::{module(), atom(), [term()]}, RerfreshRateMillis::non_neg_integer()}.
+-type registered_app() :: MFA::mfa().
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(SERVER, ?MODULE).
-%% @doc Cache item refresh rate in seconds
--define(REFRESH_RATE, 1).
--define(REFRSH_MILLIS(N), timer:seconds(N)).
--define(MAX_REFRESH, timer:seconds(60)).
-define(ENOTREG(App), {error, {not_registered, App}}).
--define(DEFAULT_REG(Mod, RefreshRateMillis), {{Mod, produce_stats, []}, RefreshRateMillis}).
+-define(DEFAULT_REG(Mod), {Mod, produce_stats, []}).
--record(state, {tab, active=orddict:new(), apps=orddict:new()}).
+-record(state, {active=orddict:new(), apps=orddict:new()}).
%%%===================================================================
%%% API
@@ -61,17 +56,15 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
register_app(App, {M, F, A}) ->
- RefreshRate = app_helper:get_env(riak_core, stat_cache_ttl, ?REFRESH_RATE),
- register_app(App, {M, F, A}, RefreshRate).
-
-register_app(App, {M, F, A}, RefreshRateSecs) ->
- gen_server:call(?SERVER, {register, App, {{M, F, A}, ?REFRSH_MILLIS(RefreshRateSecs)}}, infinity).
+ gen_server:call(?SERVER, {register, App, {M, F, A}}, infinity).
get_stats(App) ->
- gen_server:call(?SERVER, {get_stats, App}, infinity).
-
-clear_cache(App) ->
- gen_server:call(?SERVER, {clear, App}, infinity).
+ case gen_server:call(?SERVER, {get_stats_mfa, App}) of
+ {ok, MFA} ->
+ do_get_stats(App, MFA);
+ Error ->
+ Error
+ end.
stop() ->
gen_server:cast(?SERVER, stop).
@@ -80,92 +73,37 @@ stop() ->
init([]) ->
process_flag(trap_exit, true),
- Tab = ets:new(?MODULE, [protected, set, named_table]),
- RefreshRateSecs = app_helper:get_env(riak_core, stat_cache_ttl, ?REFRESH_RATE),
- RefreshRateMillis = ?REFRSH_MILLIS(RefreshRateSecs),
%% re-register mods, if this is a restart after a crash
RegisteredMods = lists:foldl(fun({App, Mod}, Registered) ->
- register_mod(App, ?DEFAULT_REG(Mod, RefreshRateMillis), Registered) end,
+ register_mod(App, ?DEFAULT_REG(Mod), Registered) end,
orddict:new(),
riak_core:stat_mods()),
- {ok, #state{tab=Tab, apps=orddict:from_list(RegisteredMods)}}.
+ {ok, #state{apps=orddict:from_list(RegisteredMods)}}.
-handle_call({register, App, {MFA, RefreshRateMillis}}, _From, State0=#state{apps=Apps0}) ->
+handle_call({register, App, MFA}, _From, State0=#state{apps=Apps0}) ->
Apps = case registered(App, Apps0) of
false ->
- register_mod(App,{MFA, RefreshRateMillis}, Apps0);
+ register_mod(App, MFA, Apps0);
{true, _} ->
Apps0
end,
{reply, ok, State0#state{apps=Apps}};
-handle_call({get_stats, App}, From, State0=#state{apps=Apps, active=Active0, tab=Tab}) ->
- Reply = case registered(App, Apps) of
- false ->
- {reply, ?ENOTREG(App), State0};
- {true, {MFA, _RefreshRateMillis}} ->
- case cache_get(App, Tab) of
- miss ->
- Active = maybe_get_stats(App, From, Active0, MFA),
- {noreply, State0#state{active=Active}};
- {hit, Stats, TS} ->
- FreshnessStat = make_freshness_stat(App, TS),
- {reply, {ok, [FreshnessStat | Stats], TS}, State0}
- end
- end,
- Reply;
-handle_call({clear, App}, _From, State=#state{apps=Apps, tab=Tab}) ->
+handle_call({get_stats_mfa, App}, _From, State0=#state{apps=Apps}) ->
case registered(App, Apps) of
- {true, _} ->
- true = ets:delete(Tab, App);
- _ -> ok
- end,
- {reply, ok, State};
+ false ->
+ {reply, ?ENOTREG(App), State0};
+ {true, MFA} ->
+ {reply, {ok, MFA}, State0}
+ end;
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
-%% @doc call back from process executig the stat calculation
-handle_cast({stats, App, Stats0, TS}, State0=#state{tab=Tab, active=Active, apps=Apps}) ->
- %% @TODO standardise stat mods return type with a behaviour
- Stats = case Stats0 of
- {App, Stats1} -> Stats1;
- Stats1 -> Stats1
- end,
- ets:insert(Tab, {App, TS, Stats}),
- State = case orddict:find(App, Active) of
- {ok, {_Pid, Awaiting}} ->
- _ = [gen_server:reply(From, {ok, [make_freshness_stat(App, TS) |Stats], TS}) || From <- Awaiting, From /= ?SERVER],
- State0#state{active=orddict:erase(App, Active)};
- error ->
- State0
- end,
- {ok, {MFA, RefreshRateMillis}} = orddict:find(App, Apps),
- schedule_get_stats(RefreshRateMillis, App, MFA),
- Apps2 = clear_fail_count(App, Apps),
- {noreply, State#state{apps=Apps2}};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-%% don't let a crashing stat mod crash the cache
-handle_info({'EXIT', FromPid, Reason}, State0=#state{active=Active, apps=Apps}) when Reason /= normal ->
- Reply = case awaiting_for_pid(FromPid, Active) of
- not_found ->
- {stop, Reason, State0};
- {ok, {App, Awaiting}} ->
- _ = [gen_server:reply(From, {error, Reason}) || From <- Awaiting, From /= ?SERVER],
- {ok, {MFA, RefreshRateMillis}} = orddict:find(App, Apps),
- Apps2 = update_fail_count(App, Apps),
- FailCnt = get_fail_count(App, Apps2),
- schedule_get_stats(RefreshRateMillis, App, MFA, FailCnt),
- {noreply, State0#state{active=orddict:erase(App, Active), apps=Apps2}}
- end,
- Reply;
-%% @doc callback on timer timeout to keep cache fresh
-handle_info({get_stats, {App, MFA}}, State) ->
- Active = maybe_get_stats(App, ?SERVER, State#state.active, MFA),
- {noreply, State#state{active=Active}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -175,48 +113,12 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%% internal
-get_fail_count(App, Apps) ->
- case orddict:find([App, fail], Apps) of
- {ok, Cnt} ->
- Cnt;
- error ->
- 0
- end.
-
-clear_fail_count(App, Apps) ->
- orddict:erase([App, fail], Apps).
-
-update_fail_count(App, Apps) ->
- orddict:update_counter([App, fail], 1, Apps).
-
-schedule_get_stats(After, App, MFA) ->
- Pid = self(),
- erlang:send_after(After, Pid, {get_stats, {App, MFA}}).
-
-schedule_get_stats(After, Apps, MFA, 0) ->
- schedule_get_stats(After, Apps, MFA);
-schedule_get_stats(After, Apps, MFA, FailCnt) ->
- Millis = back_off(After, FailCnt),
- schedule_get_stats(Millis, Apps, MFA).
-
-back_off(After, FailCnt) ->
- min(After * (1 bsl FailCnt), ?MAX_REFRESH).
-
-make_freshness_stat(App, TS) ->
- {make_freshness_stat_name(App), TS}.
-
-make_freshness_stat_name(App) ->
- list_to_atom(atom_to_list(App) ++ "_stat_ts").
-
-spec register_mod(atom(), registered_app(), orddict:orddict()) -> orddict:orddict().
-register_mod(App, AppRegistration, Apps0) ->
- {{Mod, _, _}=MFA, RefreshRateMillis} = AppRegistration,
- ok = folsom_metrics:new_histogram({?MODULE, Mod}),
- ok = folsom_metrics:new_meter({?MODULE, App}),
- Apps = orddict:store(App, AppRegistration, Apps0),
- schedule_get_stats(RefreshRateMillis, App, MFA),
- Apps.
+register_mod(App, {Mod, _, _} = MFA, Apps0) ->
+ P = riak_core_stat:prefix(),
+ exometer:new([P, ?MODULE, Mod], histogram),
+ exometer:new([P, ?MODULE, App], meter),
+ orddict:store(App, MFA, Apps0).
registered(App, Apps) ->
registered(orddict:find(App, Apps)).
@@ -226,74 +128,22 @@ registered(error) ->
registered({ok, Val}) ->
{true, Val}.
-cache_get(App, Tab) ->
- Res = case ets:lookup(Tab, App) of
- [] ->
- miss;
- [{App, TStamp, Stats}] ->
- {hit, Stats, TStamp}
- end,
- Res.
-
-maybe_get_stats(App, From, Active, MFA) ->
- %% if a get stats is not under way start one
- Awaiting = case orddict:find(App, Active) of
- error ->
- Pid = do_get_stats(App, MFA),
- {Pid, [From]};
- {ok, {Pid, Froms}} ->
- {Pid, [From|Froms]}
- end,
- orddict:store(App, Awaiting, Active).
-
do_get_stats(App, {M, F, A}) ->
- spawn_link(fun() ->
- Stats = folsom_metrics:histogram_timed_update({?MODULE, M}, M, F, A),
- ok = folsom_metrics:notify_existing_metric({?MODULE, App}, 1, meter),
- gen_server:cast(?MODULE, {stats, App, Stats, folsom_utils:now_epoch()})
- end).
-
-awaiting_for_pid(Pid, Active) ->
- case [{App, Awaiting} || {App, {Proc, Awaiting}} <- orddict:to_list(Active),
- Proc == Pid] of
- [] ->
- not_found;
- L -> {ok, hd(L)}
- end.
+ P = riak_core_stat:prefix(),
+ Stats = histogram_timed_update([P, ?MODULE, M], M, F, A),
+ exometer:update([P, ?MODULE, App], 1),
+ Stats.
+
+histogram_timed_update(Name, M, F, A) ->
+ {Time, Value} = timer:tc(M, F, A),
+ exometer:update(Name, Time),
+ Value.
-ifdef(TEST).
-define(MOCKS, [folsom_utils, riak_core_stat, riak_kv_stat]).
-define(STATS, [{stat1, 0}, {stat2, 1}, {stat3, 2}]).
-cached(App, Time) ->
- [make_freshness_stat(App, Time) | ?STATS].
-
-cache_test_() ->
- {setup,
- fun() ->
- folsom:start(),
- [meck:new(Mock, [non_strict, passthrough]) || Mock <- ?MOCKS],
- riak_core_stat_cache:start_link()
- end,
- fun(_) ->
- folsom:stop(),
- [meck:unload(Mock) || Mock <- ?MOCKS],
- riak_core_stat_cache:stop()
- end,
-
- [{"Register with the cache",
- fun register/0},
- {"Get cached value",
- fun get_cached/0},
- {"Expired cache, re-calculate",
- fun get_expired/0},
- {"Only a single process can calculate stats",
- fun serialize_calls/0},
- {"Crash test",
- fun crasher/0}
- ]}.
-
register() ->
[meck:expect(M, produce_stats, fun() -> ?STATS end)
|| M <- [riak_core_stat, riak_kv_stat]],
@@ -301,32 +151,15 @@ register() ->
riak_core_stat_cache:register_app(riak_core, {riak_core_stat, produce_stats, []}, 5),
riak_core_stat_cache:register_app(riak_kv, {riak_kv_stat, produce_stats, []}, 5),
NonSuch = riak_core_stat_cache:get_stats(nonsuch),
- ?assertEqual({ok, cached(riak_core, Now), Now}, riak_core_stat_cache:get_stats(riak_core)),
- ?assertEqual({ok, cached(riak_kv, Now), Now}, riak_core_stat_cache:get_stats(riak_kv)),
+
?assertEqual(?ENOTREG(nonsuch), NonSuch),
- %% and check the cache has the correct values
- [?assertEqual([{App, Now, ?STATS}], ets:lookup(riak_core_stat_cache, App))
- || App <- [riak_core, riak_kv]],
+
%% and that a meter and histogram has been registered for all registered modules
[?assertEqual([{{?MODULE, M}, [{type, histogram}]}], folsom_metrics:get_metric_info({?MODULE, M}))
|| M <- [riak_core_stat, riak_kv_stat]],
[?assertEqual([{{?MODULE, App}, [{type, meter}]}], folsom_metrics:get_metric_info({?MODULE, App}))
|| App <- [riak_core, riak_kv]].
-get_cached() ->
- Now = tick(1000, 0),
- [?assertEqual({ok, cached(riak_core, Now), Now}, riak_core_stat_cache:get_stats(riak_core))
- || _ <- lists:seq(1, 20)],
- ?assertEqual(1, meck:num_calls(riak_core_stat, produce_stats, [])).
-
-get_expired() ->
- CalcTime = 1000,
- _Expired = tick(CalcTime, ?REFRESH_RATE+?REFRESH_RATE),
- [?assertEqual({ok, cached(riak_core, CalcTime), CalcTime}, riak_core_stat_cache:get_stats(riak_core))
- || _ <- lists:seq(1, 20)],
- %% Stale stats should no longer trigger a stat calculation
- ?assertEqual(1, meck:num_calls(riak_core_stat, produce_stats, [])).
-
serialize_calls() ->
%% many processes can call get stats at once
%% they should not block the server
@@ -339,7 +172,6 @@ serialize_calls() ->
%% b) only one call to produce_stats
%% But ONLY in the case that the cache is empty. At any other time,
%% that cached answer should be returned.
- riak_core_stat_cache:clear_cache(riak_kv),
Procs = 20,
Then = 1000,
Now = tick(2000, 0),
@@ -367,7 +199,6 @@ serialize_calls() ->
?assertEqual(2, meck:num_calls(riak_kv_stat, produce_stats, [])).
crasher() ->
- riak_core_stat_cache:clear_cache(riak_kv),
Pid = whereis(riak_core_stat_cache),
Then = tick(1000, 0),
%% Now = tick(10000, 0),
View
79 src/riak_core_stat_q.erl
@@ -23,9 +23,7 @@
%% A `Path' is a list of atoms | binaries. The module creates a set
%% of `ets:select/1' guards, one for each element in `Path'
%% For each stat that has a key that matches `Path' we calculate the
-%% current value and return it. This module makes use of
-%% `riak_core_stat_calc_proc'
-%% to cache and limit stat calculations.
+%% current value and return it.
-module(riak_core_stat_q).
@@ -37,7 +35,7 @@
-type path() :: [] | [atom()|binary()].
-type stats() :: [stat()].
-type stat() :: {stat_name(), stat_value()}.
--type stat_name() :: tuple().
+-type stat_name() :: list().
-type stat_value() :: integer() | [tuple()].
%% @doc To allow for namespacing, and adding richer dimensions, stats
@@ -53,80 +51,37 @@
%% in `Path' as a wild card.
-spec get_stats(path()) -> stats().
get_stats(Path) ->
- %% get all the stats that are at Path
- NamesNTypes = names_and_types(Path),
- calculate_stats(NamesNTypes).
-
-%% @doc queries folsom's metrics table for stats that match our path
-names_and_types(Path) ->
- Guards = guards_from_path(Path),
- ets:select(folsom, [{{'$1','$2'}, Guards,['$_']}]).
-
-guards_from_path(Path) ->
- SizeGuard = size_guard(length(Path)),
- %% Going to reverse it is why this way around
- Guards = [SizeGuard, {is_tuple, '$1'}],
- add_guards(Path, Guards, 1).
-
-add_guards([], Guards, _Cnt) ->
- lists:reverse(Guards);
-add_guards(['_'|Path], Guards, Cnt) ->
- add_guards(Path, Guards, Cnt+1);
-add_guards([Elem|Path], Guards, Cnt) ->
- add_guards(Path, [guard(Elem, Cnt) | Guards], Cnt+1).
-
-guard(Elem, Cnt) when Cnt > 0 ->
- {'==', {element, Cnt, '$1'}, Elem}.
-
--spec size_guard(pos_integer()) -> tuple().
-size_guard(N) ->
- {'>=', {size, '$1'}, N}.
+ exometer:get_values(Path).
+ %% %% get all the stats that are at Path
+ %% calculate_stats(exometer:select(
+ %% [{ {Path ++ '_','_',enabled}, [], ['$_'] }])).
calculate_stats(NamesAndTypes) ->
- [{Name, get_stat({Name, Type})} || {Name, {metric, _, Type, _}} <- NamesAndTypes].
+ [{Name, get_stat(Name)} || {Name, _, _} <- NamesAndTypes].
%% Create/lookup a cache/calculation process
get_stat(Stat) ->
- Pid = riak_core_stat_calc_sup:calc_proc(Stat),
- riak_core_stat_calc_proc:value(Pid).
-
-throw_folsom_error({error, _, _} = Err) ->
- throw(Err);
-throw_folsom_error(Other) -> Other.
+ exometer:get_value(Stat).
-%% Encapsulate getting a stat value from folsom.
+%% Encapsulate getting a stat value from exometer.
%%
%% If for any reason we can't get a stats value
%% return 'unavailable'.
%% @TODO experience shows that once a stat is
%% broken it stays that way. Should we delete
%% stats that are broken?
-calc_stat({Name, gauge}) ->
- try
- GaugeVal = throw_folsom_error(folsom_metrics:get_metric_value(Name)),
- calc_gauge(GaugeVal)
- catch ErrClass:ErrReason ->
- log_error(Name, ErrClass, ErrReason),
- unavailable
- end;
-calc_stat({Name, histogram}) ->
- try
- throw_folsom_error(folsom_metrics:get_histogram_statistics(Name))
- catch ErrClass:ErrReason ->
- log_error(Name, ErrClass, ErrReason),
- unavailable
- end;
-calc_stat({Name, _Type}) ->
- try throw_folsom_error(folsom_metrics:get_metric_value(Name))
- catch ErrClass:ErrReason ->
- log_error(Name, ErrClass, ErrReason),
- unavailable
- end.
+calc_stat({Name, _Type}) when is_tuple(Name) ->
+ stat_return(exometer:get_value([riak_core_stat:prefix()|tuple_to_list(Name)]));
+calc_stat({[_|_] = Name, _Type}) ->
+ stat_return(exometer:get_value([riak_core_stat:prefix()|Name])).
+
+stat_return({error,not_found}) -> unavailable;
+stat_return({ok, Value}) -> Value.
log_error(StatName, ErrClass, ErrReason) ->
lager:warning("Failed to calculate stat ~p with ~p:~p", [StatName, ErrClass, ErrReason]).
-%% some crazy people put funs in folsom gauges
+%% some crazy people put funs in gauges (exometer has a 'function' metric)
%% so that they can have a consistent interface
%% to access stats from disperate sources
calc_gauge({function, Mod, Fun}) ->
View
3  src/riak_core_stat_sup.erl
@@ -48,8 +48,7 @@ start_link() ->
init([]) ->
Children = lists:flatten(
[?CHILD(folsom_sup, supervisor),
- ?CHILD(riak_core_stats_sup, supervisor),
- ?CHILD(riak_core_stat_calc_sup, supervisor)
+ ?CHILD(riak_core_stats_sup, supervisor)
]),
{ok, {{rest_for_one, 10, 10}, Children}}.
View
41 src/riak_core_stat_xform.erl
@@ -0,0 +1,41 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_core_stat_xform).
+
+-export([parse_transform/2]).
+
+
+parse_transform(Forms, _) ->
+ case os:getenv("RIAK_CORE_STAT_PREFIX") of
+ false ->
+ Forms;
+ Str ->
+ transform(Forms, list_to_atom(Str))
+ end.
+
+transform([{function,L,prefix,0,[_]}|T], Pfx) ->
+ [{function, L, prefix, 0,
+ [{clause, L, [], [], [{atom,L,Pfx}]}]}|T];
+transform([H|T], Pfx) ->
+ [H | transform(T, Pfx)];
+transform([], _) ->
+ [].
+
+
View
13 src/riak_core_vnode_manager.erl
@@ -377,6 +377,7 @@ handle_cast({unregister, Index, Mod, Pid}, #state{idxtab=T} = State) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
State2 = update_forwarding({Mod, Index}, Ring, State),
ets:match_delete(T, {idxrec, {Index, Mod}, Index, Mod, Pid, '_'}),
+ unregister_vnode_stats(Mod, Index),
riak_core_vnode_proxy:unregister_vnode(Mod, Index, Pid),
{noreply, State2};
handle_cast({vnode_event, Mod, Idx, Pid, Event}, State) ->
@@ -553,7 +554,8 @@ idx2vnode(Idx, Mod, _State=#state{idxtab=T}) ->
%% @private
delmon(MonRef, _State=#state{idxtab=T}) ->
case ets:lookup(T, MonRef) of
- [#monrec{key=Key}] ->
+ [#monrec{key= {Index, Mod} = Key}] ->
+ unregister_vnode_stats(Mod, Index),
ets:match_delete(T, {idxrec, Key, '_', '_', '_', MonRef}),
ets:delete(T, MonRef);
[] ->
@@ -581,6 +583,7 @@ get_vnode(IdxList, Mod, State) ->
lager:debug("Will start VNode for partition ~p", [Idx]),
{ok, Pid} =
riak_core_vnode_sup:start_vnode(Mod, Idx, ForwardTo),
+ register_vnode_stats(Mod, Idx, Pid),
lager:debug("Started VNode, waiting for initialization to complete ~p, ~p ", [Pid, Idx]),
ok = riak_core_vnode:wait_for_init(Pid),
lager:debug("VNode initialization ready ~p, ~p", [Pid, Idx]),
@@ -708,7 +711,7 @@ should_handoff(Ring, _CHBin, Mod, Idx) ->
case app_for_vnode_module(Mod) of
undefined -> false;
{ok, App} ->
- case lists:member(TargetNode,
+ case lists:member(TargetNode,
riak_core_node_watcher:nodes(App)) of
false -> false;
true -> {true, TargetNode}
@@ -986,3 +989,9 @@ kill_repair(Repair, Reason) ->
riak_core_handoff_manager:kill_xfer(node(),
{Mod, undefined, Partition},
Reason).
+
+register_vnode_stats(Mod, Index, Pid) ->
+ riak_core_stat:register_vnode_stats(Mod, Index, Pid).
+
+unregister_vnode_stats(Mod, Index) ->
+ riak_core_stat:unregister_vnode_stats(Mod, Index).
Something went wrong with that request. Please try again.