Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Feuerlabs stat combo #487

Closed
wants to merge 33 commits into from

3 participants

@uwiger

Same functionality as in #465, but with the possibility to choose the old ('legacy') stats package at compile-time.

The function identifying which system is used is riak_core_stat:stat_system(). This function is hard-coded to 'exometer', but can be changed to 'legacy' by setting the OS environment variable RIAK_CORE_STAT_SYSTEM to "legacy", and ensuring that the module riak_core_stat.erl is recompiled.

uwiger and others added some commits
@uwiger uwiger added exometer & riak_exoport deps 0ef3e1e
@uwiger uwiger integrated exometer 632cea7
@uwiger uwiger further exometer fixes 0a0a896
@uwiger uwiger find_entries() new return value a56db2c
@uwiger uwiger corrected return value from exometer_entry:get_value() f247618
Ubuntu select only enabled metrics c718586
@uwiger uwiger Merge branch 'develop' of git://github.com/basho/riak_core into uw-ex…
…ometer-dev
9de6007
@uwiger uwiger counter datapoint extraction in backwards_compat 1b506f4
@uwiger uwiger simplifications for exometer a02a73c
@uwiger uwiger added exometer & riak_exoport deps d791c82
@uwiger uwiger integrated exometer 159a6d5
@uwiger uwiger further exometer fixes 714a94f
@uwiger uwiger find_entries() new return value 37fa4a8
@uwiger uwiger corrected return value from exometer_entry:get_value() f204cff
Ubuntu select only enabled metrics e6ddc2d
@uwiger uwiger counter datapoint extraction in backwards_compat 2d3f0d3
@uwiger uwiger simplifications for exometer a07b7e1
@uwiger uwiger Merge branch 'uw-exometer-dev' of github.com:Feuerlabs/riak_core into…
… uw-exometer-dev

Conflicts:
	src/riak_core.app.src
5f14d69
@uwiger uwiger fixing stats console outout e84693f
@uwiger uwiger Merge branch 'develop' of git://github.com/basho/riak_core into uw-ex…
…ometer-dev
69afc9e
@uwiger uwiger Started adding console 'stat' commands 01d063a
@uwiger uwiger JSON stats fixed 7794bf9
@uwiger uwiger remove debug printout 245733e
@uwiger uwiger Forgot prefix when updating stat 118c9c2
@uwiger uwiger wrong git protocol for exometer.git 5712ead
@uwiger uwiger export vnodeq_stats (for riak_kv_wm_stat) c4bd18b
@uwiger uwiger typo in vnodeq aggregate 084978d
@uwiger uwiger supporting both legacy and exometer, first draft 9cddbc8
@uwiger uwiger both stat systems seem to work 331a755
@uwiger uwiger add missing _TS to connection_mgr_stats:get_stats() 0171e12
@uwiger uwiger Merge branch 'develop' of git://github.com/basho/riak_core into feuer…
…labs-stat-combo
78daef8
@uwiger uwiger untabify rebar.config b7dc255
@uwiger uwiger recovered calc_stat_proc ca9f706
@jrwest

seems like 2.1 thing, someone with more knowledge can perhaps correct me

@jrwest jrwest added this to the 2.1 milestone
@jburwell

Superseded by #650

@jburwell jburwell closed this
@seancribbs seancribbs deleted the feuerlabs-stat-combo branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 18, 2013
  1. @uwiger
  2. @uwiger

    integrated exometer

    uwiger authored
  3. @uwiger

    further exometer fixes

    uwiger authored
  4. @uwiger

    find_entries() new return value

    uwiger authored
  5. @uwiger
  6. @uwiger

    select only enabled metrics

    Ubuntu authored uwiger committed
Commits on Oct 28, 2013
  1. @uwiger
Commits on Oct 29, 2013
  1. @uwiger
Commits on Nov 13, 2013
  1. @uwiger

    simplifications for exometer

    uwiger authored
Commits on Nov 19, 2013
  1. @uwiger

    added exometer & riak_exoport deps

    uwiger authored Ulf Wiger committed
  2. @uwiger

    integrated exometer

    uwiger authored Ulf Wiger committed
  3. @uwiger

    further exometer fixes

    uwiger authored Ulf Wiger committed
  4. @uwiger

    find_entries() new return value

    uwiger authored Ulf Wiger committed
  5. @uwiger

    corrected return value from exometer_entry:get_value()

    uwiger authored Ulf Wiger committed
  6. select only enabled metrics

    Ubuntu authored Ulf Wiger committed
  7. @uwiger

    counter datapoint extraction in backwards_compat

    uwiger authored Ulf Wiger committed
  8. @uwiger

    simplifications for exometer

    uwiger authored Ulf Wiger committed
  9. @uwiger

    Merge branch 'uw-exometer-dev' of github.com:Feuerlabs/riak_core into…

    uwiger authored
    … uw-exometer-dev
    
    Conflicts:
    	src/riak_core.app.src
Commits on Nov 26, 2013
  1. @uwiger

    fixing stats console outout

    uwiger authored
Commits on Nov 28, 2013
  1. @uwiger
Commits on Nov 29, 2013
  1. @uwiger
Commits on Dec 3, 2013
  1. @uwiger

    JSON stats fixed

    uwiger authored
  2. @uwiger

    remove debug printout

    uwiger authored
Commits on Dec 5, 2013
  1. @uwiger
  2. @uwiger
Commits on Dec 8, 2013
  1. @uwiger
Commits on Dec 9, 2013
  1. @uwiger

    typo in vnodeq aggregate

    uwiger authored
Commits on Dec 20, 2013
  1. @uwiger
  2. @uwiger

    both stat systems seem to work

    uwiger authored
Commits on Dec 21, 2013
  1. @uwiger
  2. @uwiger
  3. @uwiger

    untabify rebar.config

    uwiger authored
  4. @uwiger

    recovered calc_stat_proc

    uwiger authored
This page is out of date. Refresh to see the latest.
View
6 rebar.config
@@ -1,4 +1,5 @@
-{erl_first_files, ["src/gen_nb_server.erl", "src/riak_core_gen_server.erl"]}.
+{erl_first_files, ["src/gen_nb_server.erl", "src/riak_core_gen_server.erl",
+ "src/riak_core_stat_xform"]}.
{cover_enabled, true}.
{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
{edoc_opts, [{preprocess, true}]}.
@@ -13,5 +14,6 @@
{folsom, "0.7.4p4", {git, "git://github.com/basho/folsom.git", {tag, "0.7.4p4"}}},
{riak_ensemble, ".*", {git, "git://github.com/basho/riak_ensemble", {branch, "jdb-wip-2.0"}}},
{pbkdf2, ".*", {git, "git://github.com/basho/erlang-pbkdf2.git", {branch, "adt-cleanups"}}},
- {eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {branch, "master"}}}
+ {eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {branch, "master"}}},
+ {exometer, ".*", {git, "git://github.com/Feuerlabs/exometer.git", {branch, "master"}}}
]}.
View
1  src/riak_core.app.src
@@ -15,6 +15,7 @@
crypto,
riak_sysmon,
os_mon,
+ exometer,
basho_stats,
eleveldb,
pbkdf2,
View
167 src/riak_core_console.erl
@@ -24,7 +24,8 @@
stage_force_replace/1, print_staged/1, commit_staged/1,
clear_staged/1, transfer_limit/1, pending_claim_percentage/2,
transfers/1, add_user/1, add_source/1, grant/1, revoke/1,
- print_users/1, print_user/1, print_sources/1, ciphers/1]).
+ print_users/1, print_user/1, print_sources/1, ciphers/1,
+ stat_show/1, stat_showall/1, stat_info/1, stat_enable/1, stat_disable/1]).
%% @doc Return for a given ring and node, percentage currently owned and
%% anticipated after the transitions have been completed.
@@ -968,3 +969,167 @@ parse_cidr(CIDR) ->
[IP, Mask] = string:tokens(CIDR, "/"),
{ok, Addr} = inet_parse:address(IP),
{Addr, list_to_integer(Mask)}.
+
+-define(only_exometer(Expr),
+ case riak_core_stat:stat_system() of
+ legacy ->
+ io:fwrite(
+ "Error: only available for the Exometer stat system~n", []);
+ exometer ->
+ Expr
+ end).
+
+stat_show(Arg) ->
+ ?only_exometer(print_stats(find_entries(Arg))).
+
+stat_showall(Arg) ->
+ ?only_exometer(print_stats(find_entries(Arg, '_'))).
+
+find_entries(Arg) ->
+ ?only_exometer(find_entries(Arg, enabled)).
+
+find_entries(Arg, Status) ->
+ Patterns = lists:flatten([parse_stat_entry(S, Status) || S <- Arg]),
+ exometer:select(Patterns).
+
+print_stats(Entries) ->
+ io:fwrite(
+ [io_lib:fwrite("~p: ~p~n", [E, get_value(E, Status)]) || {E, _T, Status} <- Entries]).
+
+get_value(_, disabled) ->
+ disabled;
+get_value(E, _Status) ->
+ case exometer:get_value(E) of
+ {ok, V} -> V;
+ {error,_} -> unavailable
+ end.
+
+stat_enable(Arg) ->
+ ?only_exometer(
+ [io:fwrite("~p: ~p~n", [N, change_status(N, enabled)])
+ || {N, _, _} <- find_entries(Arg, disabled)]).
+
+stat_disable(Arg) ->
+ ?only_exometer(
+ [io:fwrite("~p: ~p~n", [N, change_status(N, disabled)])
+ || {N, _, _} <- find_entries(Arg, enabled)]).
+
+change_status(N, St) ->
+ case exometer:setopts(N, [{status, St}]) of
+ ok ->
+ St;
+ Error ->
+ Error
+ end.
+
+stat_info(Arg) ->
+ ?only_exometer(
+ begin
+ {Attrs, RestArg} = pick_info_attrs(split_arg(Arg)),
+ [print_info(E, Attrs) || E <- find_entries(RestArg, '_')]
+ end).
+
+pick_info_attrs(Arg) ->
+ lists:foldr(
+ fun("-name" , {As, Ps}) -> {[name |As], Ps};
+ ("-type" , {As, Ps}) -> {[type |As], Ps};
+ ("-module" , {As, Ps}) -> {[module |As], Ps};
+ ("-value" , {As, Ps}) -> {[value |As], Ps};
+ ("-cache" , {As, Ps}) -> {[cache |As], Ps};
+ ("-status" , {As, Ps}) -> {[status |As], Ps};
+ ("-timestamp", {As, Ps}) -> {[timestamp|As], Ps};
+ ("-options" , {As, Ps}) -> {[options |As], Ps};
+ (P, {As, Ps}) -> {As, [P|Ps]}
+ end, {[], []}, Arg).
+
+print_info({N, _Type, _Status}, [A|Attrs]) ->
+ Hdr = lists:flatten(io_lib:fwrite("~p: ", [N])),
+ Pad = lists:duplicate(length(Hdr), $\s),
+ Info = exometer:info(N),
+ Body = [io_lib:fwrite("~w = ~p~n", [A, proplists:get_value(A, Info)])
+ | lists:map(fun(Ax) ->
+ io_lib:fwrite(Pad ++ "~w = ~p~n",
+ [Ax, proplists:get_value(Ax, Info)])
+ end, Attrs)],
+ io:fwrite([Hdr, Body]).
+
+split_arg([Str]) ->
+ re:split(Str, "\\s", [{return,list}]).
+
+parse_stat_entry("[" ++ _ = Expr, _Status) ->
+ case erl_scan:string(ensure_trailing_dot(Expr)) of
+ {ok, Toks, _} ->
+ case erl_parse:parse_exprs(Toks) of
+ {ok, [Abst]} ->
+ partial_eval(Abst);
+ Error ->
+ io:fwrite("(Parse error for ~p: ~p~n", [Expr, Error]),
+ []
+ end;
+ ScanErr ->
+ io:fwrite("(Scan error for ~p: ~p~n", [Expr, ScanErr]),
+ []
+ end;
+parse_stat_entry(Str, Status) when Status==enabled; Status==disabled; Status=='_' ->
+ Parts = re:split(Str, "\\.", [{return,list}]),
+ {{replace_parts(Parts),'_',Status}, [], ['$_']};
+parse_stat_entry(_, Status) ->
+ io:fwrite("(Illegal status: ~p~n", [Status]).
+
+
+ensure_trailing_dot(Str) ->
+ case lists:reverse(Str) of
+ "." ++ _ ->
+ Str;
+ _ ->
+ Str ++ "."
+ end.
+
+partial_eval({cons,_,H,T}) ->
+ [partial_eval(H) | partial_eval(T)];
+%% partial_eval({nil,_}) ->
+%% [];
+partial_eval({tuple,_,Elems}) ->
+ list_to_tuple([partial_eval(E) || E <- Elems]);
+%% partial_eval({T,_,X}) when T==atom; T==integer; T==float ->
+%% X;
+partial_eval({op,_,'++',L1,L2}) ->
+ partial_eval(L1) ++ partial_eval(L2);
+partial_eval(X) ->
+ erl_parse:normalise(X).
+
+replace_parts([H|T]) ->
+ R = case H of
+ "*" -> '_';
+ "'" ++ _ ->
+ case erl_scan:string(H) of
+ {ok, [{atom, _, A}], _} ->
+ A;
+ Error ->
+ error(Error)
+ end;
+ [C|_] when C >= $0, C =< $9 ->
+ try list_to_integer(H)
+ catch
+ error:_ -> list_to_atom(H)
+ end;
+ _ -> list_to_atom(H)
+ end,
+ case T of
+ ["**"] -> [R] ++ '_';
+ _ -> [R|replace_parts(T)]
+ end;
+replace_parts([]) ->
+ [].
+
+
+
+
+
+
+
+
+
+
+
+
View
246 src/riak_core_stat.erl
@@ -23,13 +23,18 @@
-behaviour(gen_server).
%% API
--export([start_link/0, get_stats/0, update/1,
- register_stats/0, produce_stats/0]).
+-export([start_link/0, get_stats/0, get_stats/1, update/1,
+ register_stats/0, produce_stats/0, vnodeq_stats/0,
+ register_stats/2,
+ prefix/0,
+ stat_system/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-compile({parse_transform, riak_core_stat_xform}).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
@@ -42,19 +47,63 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
register_stats() ->
+ case stat_system() of
+ legacy -> register_stats_legacy();
+ exometer -> register_stats_exometer()
+ end.
+
+register_stats_legacy() ->
[(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
[register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
+register_stats_exometer() ->
+ register_stats(common, system_stats()),
+ register_stats(?APP, stats()).
+
+%% @spec register_stats(App, Stats) -> ok
+%% @doc (Re-)Register a list of metrics for App.
+register_stats(App, Stats) ->
+ P = prefix(),
+ [register_stat_int(P, App, Stat) || Stat <- Stats].
+
+register_stat_int(P, App, Stat) ->
+ {Name, Type, Opts} = case Stat of
+ {N, T} -> {N, T, []};
+ {N, T, Os} -> {N, T, Os}
+ end,
+ exometer:re_register(stat_name(P,App,Name), Type, Opts).
+
+stat_name(P, App, N) when is_atom(N) ->
+ stat_name_([P, App, N]);
+stat_name(P, App, N) when is_list(N) ->
+ stat_name_([P, App | N]).
+
+stat_name_([P, [] | Rest]) -> [P | Rest];
+stat_name_(N) -> N.
+
+
%% @spec get_stats() -> proplist()
%% @doc Get the current aggregation of stats.
get_stats() ->
+ case stat_system() of
+ legacy -> get_stats_legacy();
+ exometer ->
+ get_stats(?APP) ++ vnodeq_stats()
+ end.
+
+get_stats_legacy() ->
case riak_core_stat_cache:get_stats(?APP) of
{ok, Stats, _TS} ->
Stats;
Error -> Error
end.
+
+get_stats(App) ->
+ P = prefix(),
+ exometer:get_values([P, App]).
+
update(Arg) ->
gen_server:cast(?SERVER, {update, Arg}).
@@ -65,6 +114,25 @@ produce_stats() ->
lists:append([gossip_stats(),
vnodeq_stats()]).
+%% @doc (Exometer) stat name prefix.
+%%
+%% This function can be transformed at compile-time by setting the
+%% OS environment variable `RIAK_CORE_STAT_PREFIX'. The result will be
+%% converted to an atom. Default is `riak'.
+%% @end
+prefix() ->
+ riak.
+
+%% @doc Switch function to determine which stats system to use.
+%%
+%% This function can be transformed at compile-time by setting the
+%% OS environment variable `RIAK_CORE_STAT_SYSTEM' to either "legacy"
+%% or "exometer". The result will be converted to an atom.
+%% Default is `exometer'.
+%% @end
+stat_system() ->
+ exometer.
+
%% gen_server
init([]) ->
@@ -75,7 +143,10 @@ handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast({update, Arg}, State) ->
- update1(Arg),
+ case stat_system() of
+ legacy -> update1(Arg);
+ exometer -> exometer:update([prefix(), ?APP, Arg], update_value(Arg))
+ end,
{noreply, State};
handle_cast(_Req, State) ->
{noreply, State}.
@@ -119,6 +190,13 @@ update1(rebalance_timer_begin) ->
update1(rebalance_timer_end) ->
folsom_metrics:notify_existing_metric({?APP, rebalance_delay}, timer_end, duration).
+update_value(converge_timer_begin) -> timer_begin;
+update_value(rebalance_timer_begin) -> timer_begin;
+update_value(converge_timer_end) -> timer_begin;
+update_value(rebalance_timer_end) -> timer_begin;
+update_value(_) -> 1.
+
+
%% private
stats() ->
[{ignored_gossip_total, counter},
@@ -128,8 +206,10 @@ stats() ->
{handoff_timeouts, counter},
{dropped_vnode_requests_total, counter},
{converge_delay, duration},
- {rebalance_delay, duration}].
+ {rebalance_delay, duration}
+ ].
+%% used by legacy
register_stat(Name, counter) ->
folsom_metrics:new_counter(Name);
register_stat(Name, spiral) ->
@@ -137,23 +217,36 @@ register_stat(Name, spiral) ->
register_stat(Name, duration) ->
folsom_metrics:new_duration(Name).
+%% used by exometer
+system_stats() ->
+ [{cpu_stats, cpu, [{sample_interval, 5000}]}].
+
gossip_stats() ->
lists:flatten([backwards_compat(Stat, Type, riak_core_stat_q:calc_stat({{?APP, Stat}, Type})) ||
{Stat, Type} <- stats(), Stat /= riak_core_rejected_handoffs]).
backwards_compat(Name, Type, unavailable) when Type =/= counter ->
backwards_compat(Name, Type, []);
+backwards_compat(Name, Type, {error,not_found}) when Type =/= counter ->
+ backwards_compat(Name, Type, []);
backwards_compat(rings_reconciled, spiral, Stats) ->
[{rings_reconciled_total, proplists:get_value(count, Stats, unavailable)},
{rings_reconciled, safe_trunc(proplists:get_value(one, Stats, unavailable))}];
backwards_compat(gossip_received, spiral, Stats) ->
{gossip_received, safe_trunc(proplists:get_value(one, Stats, unavailable))};
backwards_compat(Name, counter, Stats) ->
- {Name, Stats};
+ case stat_system() of
+ legacy -> {Name, Stats};
+ exometer -> {Name, proplists:get_value(value, Stats)}
+ end;
backwards_compat(Name, duration, Stats) ->
+ Mean = case stat_system() of
+ legacy -> arithmetic_mean;
+ exometer -> mean
+ end,
[{join(Name, min), safe_trunc(proplists:get_value(min, Stats, unavailable))},
{join(Name, max), safe_trunc(proplists:get_value(max, Stats, unavailable))},
- {join(Name, mean), safe_trunc(proplists:get_value(arithmetic_mean, Stats, unavailable))},
+ {join(Name, mean), safe_trunc(proplists:get_value(Mean, Stats, unavailable))},
{join(Name, last), proplists:get_value(last, Stats, unavailable)}].
join(Atom1, Atom2) ->
@@ -197,17 +290,25 @@ vnodeq_aggregate(Service, MQLs0) ->
1 ->
lists:nth(Len div 2 + 1, MQLs)
end,
- [{vnodeq_atom(Service, <<"s_running">>), Len},
- {vnodeq_atom(Service, <<"q_min">>), lists:nth(1, MQLs)},
- {vnodeq_atom(Service, <<"q_median">>), Median},
- {vnodeq_atom(Service, <<"q_mean">>), Mean},
- {vnodeq_atom(Service, <<"q_max">>), lists:nth(Len, MQLs)},
- {vnodeq_atom(Service, <<"q_total">>), Total}].
+ case stat_system() of
+ legacy ->
+ [{vnodeq_atom(Service, <<"s_running">>), Len},
+ {vnodeq_atom(Service, <<"q_min">>), lists:nth(1, MQLs)},
+ {vnodeq_atom(Service, <<"q_median">>), Median},
+ {vnodeq_atom(Service, <<"q_mean">>), Mean},
+ {vnodeq_atom(Service, <<"q_max">>), lists:nth(Len, MQLs)},
+ {vnodeq_atom(Service, <<"q_total">>), Total}];
+ exometer ->
+ P = prefix(),
+ [{[P, riak_core, vnodeq_atom(Service,<<"s_running">>)], [{value, Len}]},
+ {[P, riak_core, vnodeq_atom(Service,<<"q">>)],
+ [{min, lists:nth(1, MQLs)}, {median, Median}, {mean, Mean},
+ {max, lists:nth(Len, MQLs)}, {total, Total}]}]
+ end.
vnodeq_atom(Service, Desc) ->
binary_to_atom(<<(atom_to_binary(Service, latin1))/binary, Desc/binary>>, latin1).
-
-ifdef(TEST).
%% Check vnodeq aggregation function
@@ -215,48 +316,97 @@ vnodeq_aggregate_empty_test() ->
?assertEqual([], vnodeq_aggregate(service_vnode, [])).
vnodeq_aggregate_odd1_test() ->
- ?assertEqual([{service_vnodes_running, 1},
- {service_vnodeq_min, 10},
- {service_vnodeq_median, 10},
- {service_vnodeq_mean, 10},
- {service_vnodeq_max, 10},
- {service_vnodeq_total, 10}],
- vnodeq_aggregate(service_vnode, [10])).
+ case system_stat() ->
+ ?assertEqual([{service_vnodes_running, 1},
+ {service_vnodeq_min, 10},
+ {service_vnodeq_median, 10},
+ {service_vnodeq_mean, 10},
+ {service_vnodeq_max, 10},
+ {service_vnodeq_total, 10}],
+ vnodeq_aggregate(service_vnode, [10]));
+ exometer ->
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 1},
+ {[P, riak_core, service_vnodeq],
+ [{min, 10}, {median, 10}, {mean, 10}, {max, 10},
+ {total, 10}]}],
+ vnodeq_aggregate(service_vnode, [10]))
+ end.
vnodeq_aggregate_odd3_test() ->
- ?assertEqual([{service_vnodes_running, 3},
- {service_vnodeq_min, 1},
- {service_vnodeq_median, 2},
- {service_vnodeq_mean, 2},
- {service_vnodeq_max, 3},
- {service_vnodeq_total, 6}],
- vnodeq_aggregate(service_vnode, [1, 2, 3])).
+ case stat_system() of
+ legacy ->
+ ?assertEqual([{service_vnodes_running, 3},
+ {service_vnodeq_min, 1},
+ {service_vnodeq_median, 2},
+ {service_vnodeq_mean, 2},
+ {service_vnodeq_max, 3},
+ {service_vnodeq_total, 6}],
+ vnodeq_aggregate(service_vnode, [1, 2, 3]));
+ exometer ->
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 3},
+ {[P, riak_core, service_vnodeq],
+ [{min, 1}, {median, 2}, {mean, 2}, {max, 3},
+ {total, 6}]}],
+ vnodeq_aggregate(service_vnode, [1, 2, 3]))
+ end.
vnodeq_aggregate_odd5_test() ->
- ?assertEqual([{service_vnodes_running, 5},
- {service_vnodeq_min, 0},
- {service_vnodeq_median, 1},
- {service_vnodeq_mean, 2},
- {service_vnodeq_max, 5},
- {service_vnodeq_total, 10}],
- vnodeq_aggregate(service_vnode, [1, 0, 5, 0, 4])).
+ case stat_system() of
+ legacy ->
+ ?assertEqual([{service_vnodes_running, 5},
+ {service_vnodeq_min, 0},
+ {service_vnodeq_median, 1},
+ {service_vnodeq_mean, 2},
+ {service_vnodeq_max, 5},
+ {service_vnodeq_total, 10}],
+ vnodeq_aggregate(service_vnode, [1, 0, 5, 0, 4]));
+ exometer ->
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 5},
+ {[P, riak_core, service_vnodeq],
+ [{min, 0}, {median, 1}, {mean, 2}, {max, 5},
+ {total, 10}]}],
+ vnodeq_aggregate(service_vnode, [1, 0, 5, 0, 4]))
+ end.
vnodeq_aggregate_even2_test() ->
- ?assertEqual([{service_vnodes_running, 2},
- {service_vnodeq_min, 10},
- {service_vnodeq_median, 15},
- {service_vnodeq_mean, 15},
- {service_vnodeq_max, 20},
- {service_vnodeq_total, 30}],
- vnodeq_aggregate(service_vnode, [10, 20])).
+ case stat_system() of
+ legacy ->
+ ?assertEqual([{service_vnodes_running, 2},
+ {service_vnodeq_min, 10},
+ {service_vnodeq_median, 15},
+ {service_vnodeq_mean, 15},
+ {service_vnodeq_max, 20},
+ {service_vnodeq_total, 30}],
+ vnodeq_aggregate(service_vnode, [10, 20]));
+ exometer ->
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 2},
+ {[P, riak_core, service_vnodeq],
+ [{min, 10}, {median, 15}, {mean, 15}, {max, 20},
+ {total, 30}]}],
+ vnodeq_aggregate(service_vnode, [10, 20]))
+ end.
vnodeq_aggregate_even4_test() ->
- ?assertEqual([{service_vnodes_running, 4},
- {service_vnodeq_min, 0},
- {service_vnodeq_median, 5},
- {service_vnodeq_mean, 7},
- {service_vnodeq_max, 20},
- {service_vnodeq_total, 30}],
- vnodeq_aggregate(service_vnode, [0, 10, 0, 20])).
+ case stat_system() of
+ legacy ->
+ ?assertEqual([{service_vnodes_running, 4},
+ {service_vnodeq_min, 0},
+ {service_vnodeq_median, 5},
+ {service_vnodeq_mean, 7},
+ {service_vnodeq_max, 20},
+ {service_vnodeq_total, 30}],
+ vnodeq_aggregate(service_vnode, [0, 10, 0, 20]));
+ exometer ->
+ P = prefix(),
+ ?assertEqual([{[P, riak_core, service_vnodes_running], 4},
+ {[P, riak_core, service_vnodeq]
+ [{min, 0}, {median, 5}, {mean, 7}, {max, 20},
+ {total, 30}]}],
+ vnodeq_aggregate(service_vnode, [0, 10, 0, 20]))
+ end.
-endif.
View
128 src/riak_core_stat_cache.erl
@@ -37,7 +37,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--type registered_app() :: {MFA::mfa(), RerfreshRateMillis::non_neg_integer()}.
+-type registered_app() :: MFA::mfa()
+ | {MFA::mfa(), RerfreshRateMillis::non_neg_integer()}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -50,6 +51,7 @@
-define(MAX_REFRESH, timer:seconds(60)).
-define(ENOTREG(App), {error, {not_registered, App}}).
-define(DEFAULT_REG(Mod, RefreshRateMillis), {{Mod, produce_stats, []}, RefreshRateMillis}).
+-define(DEFAULT_REG(Mod), {Mod, produce_stats, []}).
-record(state, {tab, active=orddict:new(), apps=orddict:new()}).
@@ -61,15 +63,28 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
register_app(App, {M, F, A}) ->
- RefreshRate = app_helper:get_env(riak_core, stat_cache_ttl, ?REFRESH_RATE),
- register_app(App, {M, F, A}, RefreshRate).
+ gen_server:call(?SERVER, {register, App, {M, F, A}}, infinity).
register_app(App, {M, F, A}, RefreshRateSecs) ->
gen_server:call(?SERVER, {register, App, {{M, F, A}, ?REFRSH_MILLIS(RefreshRateSecs)}}, infinity).
get_stats(App) ->
+ case riak_core_stat:stat_system() of
+ legacy -> get_stats_legacy(App);
+ exometer -> get_stats_exometer(App)
+ end.
+
+get_stats_legacy(App) ->
gen_server:call(?SERVER, {get_stats, App}, infinity).
+get_stats_exometer(App) ->
+ case gen_server:call(?SERVER, {get_stats_mfa, App}) of
+ {ok, MFA} ->
+ do_get_stats(App, MFA);
+ Error ->
+ Error
+ end.
+
clear_cache(App) ->
gen_server:call(?SERVER, {clear, App}, infinity).
@@ -81,23 +96,58 @@ stop() ->
init([]) ->
process_flag(trap_exit, true),
Tab = ets:new(?MODULE, [protected, set, named_table]),
+ %% re-register mods, if this is a restart after a crash
+ RegisteredMods = registered_mods(riak_core_stat:stat_system()),
+ {ok, #state{tab = Tab, apps=orddict:from_list(RegisteredMods)}}.
+
+registered_mods(legacy) ->
RefreshRateSecs = app_helper:get_env(riak_core, stat_cache_ttl, ?REFRESH_RATE),
RefreshRateMillis = ?REFRSH_MILLIS(RefreshRateSecs),
- %% re-register mods, if this is a restart after a crash
- RegisteredMods = lists:foldl(fun({App, Mod}, Registered) ->
- register_mod(App, ?DEFAULT_REG(Mod, RefreshRateMillis), Registered) end,
- orddict:new(),
- riak_core:stat_mods()),
- {ok, #state{tab=Tab, apps=orddict:from_list(RegisteredMods)}}.
+ lists:foldl(fun({App, Mod}, Registered) ->
+ register_mod_legacy(App, ?DEFAULT_REG(Mod, RefreshRateMillis), Registered) end,
+ orddict:new(),
+ riak_core:stat_mods());
+registered_mods(exometer) ->
+ lists:foldl(fun({App, Mod}, Registered) ->
+ register_mod_exometer(App, ?DEFAULT_REG(Mod), Registered) end,
+ orddict:new(),
+ riak_core:stat_mods()).
handle_call({register, App, {MFA, RefreshRateMillis}}, _From, State0=#state{apps=Apps0}) ->
Apps = case registered(App, Apps0) of
false ->
- register_mod(App,{MFA, RefreshRateMillis}, Apps0);
+ case riak_core_stat:stat_system() of
+ legacy ->
+ register_mod_legacy(App,{MFA, RefreshRateMillis}, Apps0);
+ exometer ->
+ register_mod_exometer(App, MFA, Apps0)
+ end;
+ {true, _} ->
+ Apps0
+ end,
+ {reply, ok, State0#state{apps=Apps}};
+handle_call({register, App, MFA}, _From, State0=#state{apps=Apps0}) ->
+ Apps = case registered(App, Apps0) of
+ false ->
+ case riak_core_stat:stat_system() of
+ legacy ->
+ RefreshRateSecs = app_helper:get_env(riak_core, stat_cache_ttl, ?REFRESH_RATE),
+ RefreshRateMillis = ?REFRSH_MILLIS(RefreshRateSecs),
+ register_mod_legacy(App, {MFA, RefreshRateMillis}, Apps0);
+ exometer ->
+ register_mod_exometer(App, MFA, Apps0)
+ end;
{true, _} ->
Apps0
end,
{reply, ok, State0#state{apps=Apps}};
+handle_call({get_stats_mfa, App}, _From, State0=#state{apps=Apps}) ->
+ case registered(App, Apps) of
+ false ->
+ {reply, ?ENOTREG(App), State0};
+ {true, MFA} ->
+ {reply, {ok, MFA}, State0}
+ end;
handle_call({get_stats, App}, From, State0=#state{apps=Apps, active=Active0, tab=Tab}) ->
Reply = case registered(App, Apps) of
false ->
@@ -209,8 +259,8 @@ make_freshness_stat(App, TS) ->
make_freshness_stat_name(App) ->
list_to_atom(atom_to_list(App) ++ "_stat_ts").
--spec register_mod(atom(), registered_app(), orddict:orddict()) -> orddict:orddict().
-register_mod(App, AppRegistration, Apps0) ->
+-spec register_mod_legacy(atom(), registered_app(), orddict:orddict()) -> orddict:orddict().
+register_mod_legacy(App, AppRegistration, Apps0) ->
{{Mod, _, _}=MFA, RefreshRateMillis} = AppRegistration,
folsom_metrics:new_histogram({?MODULE, Mod}),
folsom_metrics:new_meter({?MODULE, App}),
@@ -218,6 +268,13 @@ register_mod(App, AppRegistration, Apps0) ->
schedule_get_stats(RefreshRateMillis, App, MFA),
Apps.
+-spec register_mod_exometer(atom(), registered_app(), orddict:orddict()) -> orddict:orddict().
+register_mod_exometer(App, {Mod, _, _} = MFA, Apps0) ->
+ P = riak_core_stat:prefix(),
+ exometer:new([P, ?MODULE, Mod], histogram),
+ exometer:new([P, ?MODULE, App], meter),
+ orddict:store(App, MFA, Apps0).
+
registered(App, Apps) ->
registered(orddict:find(App, Apps)).
@@ -246,12 +303,29 @@ maybe_get_stats(App, From, Active, MFA) ->
end,
orddict:store(App, Awaiting, Active).
-do_get_stats(App, {M, F, A}) ->
+do_get_stats(App, MFA) ->
+ case riak_core_stat:stat_system() of
+ legacy -> do_get_stats_legacy(App, MFA);
+ exometer -> do_get_stats_exometer(App, MFA)
+ end.
+
+do_get_stats_legacy(App, {M, F, A}) ->
spawn_link(fun() ->
Stats = folsom_metrics:histogram_timed_update({?MODULE, M}, M, F, A),
folsom_metrics:notify_existing_metric({?MODULE, App}, 1, meter),
gen_server:cast(?MODULE, {stats, App, Stats, folsom_utils:now_epoch()}) end).
+do_get_stats_exometer(App, {M, F, A}) ->
+ P = riak_core_stat:prefix(),
+ Stats = histogram_timed_update([P, ?MODULE, M], M, F, A),
+ exometer:update([P, ?MODULE, App], 1),
+ Stats.
+
+histogram_timed_update(Name, M, F, A) ->
+ {Time, Value} = timer:tc(M, F, A),
+ exometer:update(Name, Time),
+ Value.
+
awaiting_for_pid(Pid, Active) ->
case [{App, Awaiting} || {App, {Proc, Awaiting}} <- orddict:to_list(Active),
Proc == Pid] of
@@ -265,34 +339,6 @@ awaiting_for_pid(Pid, Active) ->
-define(MOCKS, [folsom_utils, riak_core_stat, riak_kv_stat]).
-define(STATS, [{stat1, 0}, {stat2, 1}, {stat3, 2}]).
-cached(App, Time) ->
- [make_freshness_stat(App, Time) | ?STATS].
-
-cache_test_() ->
- {setup,
- fun() ->
- folsom:start(),
- [meck:new(Mock, [non_strict, passthrough]) || Mock <- ?MOCKS],
- riak_core_stat_cache:start_link()
- end,
- fun(_) ->
- folsom:stop(),
- [meck:unload(Mock) || Mock <- ?MOCKS],
- riak_core_stat_cache:stop()
- end,
-
- [{"Register with the cache",
- fun register/0},
- {"Get cached value",
- fun get_cached/0},
- {"Expired cache, re-calculate",
- fun get_expired/0},
- {"Only a single process can calculate stats",
- fun serialize_calls/0},
- {"Crash test",
- fun crasher/0}
- ]}.
-
register() ->
[meck:expect(M, produce_stats, fun() -> ?STATS end)
|| M <- [riak_core_stat, riak_kv_stat]],
View
47 src/riak_core_stat_q.erl
@@ -23,9 +23,7 @@
%% A `Path' is a list of atoms | binaries. The module creates a set
%% of `ets:select/1' guards, one for each element in `Path'
%% For each stat that has a key that matches `Path' we calculate the
-%% current value and return it. This module makes use of
-%% `riak_core_stat_calc_proc'
-%% to cache and limit stat calculations.
+%% current value and return it.
-module(riak_core_stat_q).
@@ -37,7 +35,7 @@
-type path() :: [] | [atom()|binary()].
-type stats() :: [stat()].
-type stat() :: {stat_name(), stat_value()}.
--type stat_name() :: tuple().
+-type stat_name() :: list().
-type stat_value() :: integer() | [tuple()].
%% @doc To allow for namespacing, and adding richer dimensions, stats
@@ -53,6 +51,12 @@
%% in `Path' as a wild card.
-spec get_stats(path()) -> stats().
get_stats(Path) ->
+ case riak_core_stat:stat_system() of
+ legacy -> get_stats_legacy(Path);
+ exometer -> get_stats_exometer(Path)
+ end.
+
+get_stats_legacy(Path) ->
%% get all the stats that are at Path
NamesNTypes = names_and_types(Path),
calculate_stats(NamesNTypes).
@@ -82,14 +86,39 @@ guard(Elem, Cnt) when Cnt > 0 ->
size_guard(N) ->
{'>=', {size, '$1'}, N}.
+
+get_stats_exometer(Path) ->
+ exometer:get_values(Path).
+
calculate_stats(NamesAndTypes) ->
- [{Name, get_stat({Name, Type})} || {Name, {metric, _, Type, _}} <- NamesAndTypes].
+ [{Name, get_stat(Name)} || {Name, _, _} <- NamesAndTypes].
%% Create/lookup a cache/calculation process
get_stat(Stat) ->
Pid = riak_core_stat_calc_sup:calc_proc(Stat),
riak_core_stat_calc_proc:value(Pid).
+%% Encapsulate getting a stat value from exometer.
+%%
+%% If for any reason we can't get a stats value
+%% return 'unavailable'.
+%% @TODO experience shows that once a stat is
+%% broken it stays that way. Should we delete
+%% stats that are broken?
+calc_stat(Stat) ->
+ case riak_core_stat:stat_system() of
+ legacy -> calc_stat_legacy(Stat);
+ exometer -> calc_stat_exometer(Stat)
+ end.
+
+calc_stat_exometer({Name, _Type}) when is_tuple(Name) ->
+ stat_return(exometer:get_value([riak_core_stat:prefix()|tuple_to_list(Name)]));
+calc_stat_exometer({[_|_] = Name, _Type}) ->
+ stat_return(exometer:get_value([riak_core_stat:prefix()|Name])).
+
+stat_return({error,not_found}) -> unavailable;
+stat_return({ok, Value}) -> Value.
+
throw_folsom_error({error, _, _} = Err) ->
throw(Err);
throw_folsom_error(Other) -> Other.
@@ -101,7 +130,7 @@ throw_folsom_error(Other) -> Other.
%% @TODO experience shows that once a stat is
%% broken it stays that way. Should we delete
%% stats that are broken?
-calc_stat({Name, gauge}) ->
+calc_stat_legacy({Name, gauge}) ->
try
GaugeVal = throw_folsom_error(folsom_metrics:get_metric_value(Name)),
calc_gauge(GaugeVal)
@@ -109,14 +138,14 @@ calc_stat({Name, gauge}) ->
log_error(Name, ErrClass, ErrReason),
unavailable
end;
-calc_stat({Name, histogram}) ->
+calc_stat_legacy({Name, histogram}) ->
try
throw_folsom_error(folsom_metrics:get_histogram_statistics(Name))
catch ErrClass:ErrReason ->
log_error(Name, ErrClass, ErrReason),
unavailable
end;
-calc_stat({Name, _Type}) ->
+calc_stat_legacy({Name, _Type}) ->
try throw_folsom_error(folsom_metrics:get_metric_value(Name))
catch ErrClass:ErrReason ->
log_error(Name, ErrClass, ErrReason),
@@ -126,7 +155,7 @@ calc_stat({Name, _Type}) ->
log_error(StatName, ErrClass, ErrReason) ->
lager:warning("Failed to calculate stat ~p with ~p:~p", [StatName, ErrClass, ErrReason]).
-%% some crazy people put funs in folsom gauges
+%% some crazy people put funs in gauges (exometer has a 'function' metric)
%% so that they can have a consistent interface
%% to access stats from disperate sources
calc_gauge({function, Mod, Fun}) ->
View
12 src/riak_core_stat_sup.erl
@@ -48,8 +48,16 @@ start_link() ->
init([]) ->
Children = lists:flatten(
[?CHILD(folsom_sup, supervisor),
- ?CHILD(riak_core_stats_sup, supervisor),
- ?CHILD(riak_core_stat_calc_sup, supervisor)
+ ?CHILD(riak_core_stats_sup, supervisor)
+ | extra_stat_procs()
]),
{ok, {{rest_for_one, 10, 10}, Children}}.
+
+extra_stat_procs() ->
+ case riak_core_stat:stat_system() of
+ legacy ->
+ [?CHILD(riak_core_stat_calc_sup, supervisor)];
+ exometer ->
+ []
+ end.
View
52 src/riak_core_stat_xform.erl
@@ -0,0 +1,52 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_core_stat_xform).
+
+-export([parse_transform/2]).
+
+
+parse_transform(Forms, _) ->
+ L = [{prefix , env("RIAK_CORE_STAT_PREFIX")},
+ {stat_system, env("RIAK_CORE_STAT_SYSTEM")}],
+ L1 = [X || {_,V} = X <- L, V =/= undefined],
+ transform(Forms, L1).
+
+env(Var) ->
+ case os:getenv(Var) of
+ false -> undefined;
+ Str ->
+ list_to_atom(Str)
+ end.
+transform([{function,L,F,0,_} = Form|T], Env) ->
+ NewForm =
+ case lists:keyfind(F, 1, Env) of
+ {_, V} when is_atom(V) ->
+ {function, L, F, 0,
+ [{clause, L, [], [], [{atom,L,V}]}]};
+ false ->
+ Form
+ end,
+ [NewForm | transform(T, Env)];
+transform([H|T], Env) ->
+ [H | transform(T, Env)];
+transform([], _) ->
+ [].
+
+
Something went wrong with that request. Please try again.