Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: basho/riak_core
base: develop
...
head fork: russelldb/riak_core
compare: use_nifs
  • 18 commits
  • 13 files changed
  • 0 commit comments
  • 1 contributor
8 ebin/riak_core.app
View
@@ -37,6 +37,14 @@
riak_core_handoff_sender,
riak_core_handoff_sender_sup,
riak_core_handoff_sup,
+ riak_core_metric,
+ riak_core_metric_counter,
+ riak_core_metric_duration,
+ riak_core_metric_histogram,
+ riak_core_metric_meter,
+ riak_core_metric_proc,
+ riak_core_metric_sup,
+ riak_core_metric_tmeter,
riak_core_nodeid,
riak_core_node_watcher,
riak_core_node_watcher_events,
33 src/riak_core.erl
View
@@ -24,6 +24,7 @@
remove_from_cluster/1]).
-export([vnode_modules/0]).
-export([register/1, register/2, bucket_fixups/0, bucket_validators/0]).
+-export([stat_specs/0]).
-export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
-export([delete_guarded_event_handler/3]).
@@ -256,22 +257,24 @@ legacy_remove(Node) when is_atom(Node) ->
end.
vnode_modules() ->
- case application:get_env(riak_core, vnode_modules) of
- undefined -> [];
- {ok, Mods} -> Mods
- end.
+ get_mods(vnode_modules).
bucket_fixups() ->
- case application:get_env(riak_core, bucket_fixups) of
- undefined -> [];
- {ok, Mods} -> Mods
- end.
+ get_mods(bucket_fixups).
bucket_validators() ->
- case application:get_env(riak_core, bucket_validators) of
- undefined -> [];
- {ok, Mods} -> Mods
- end.
+ get_mods(bucket_validators).
+
+stat_specs() ->
+ get_mods(stat_specs).
+
+%% @private
+%% @doc get registered modules of Type
+-spec get_mods(vnode_modules | bucket_fixups |
+ bucket_validators | stat_specs) ->
+ [{App::atom(), module()}] | [].
+get_mods(Type) ->
+ app_helper:get_env(riak_core, Type, []).
%% Get the application name if not supplied, first by get_application
%% then by searching by module name
@@ -308,12 +311,18 @@ register(App, [{vnode_module, VNodeMod}|T]) ->
register(App, T);
register(App, [{bucket_validator, ValidationMod}|T]) ->
register_mod(get_app(App, ValidationMod), ValidationMod, bucket_validators),
+ register(App, T);
+register(App, [{stat_specs, StatsMod}|T]) ->
+ register_mod(get_app(App, StatsMod), StatsMod, stat_specs),
register(App, T).
+
register_mod(App, Module, Type) when is_atom(Module), is_atom(Type) ->
case Type of
vnode_modules ->
riak_core_vnode_proxy_sup:start_proxies(Module);
+ stat_specs ->
+ riak_core_metric_sup:start_stats(App, Module);
_ ->
ok
end,
83 src/riak_core_metric.erl
View
@@ -0,0 +1,83 @@
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc riak_core_metric is a behaviour that metrics conform to.
+%% it is part of the riak_core stats subsystem.
+%% current impls are Meter(spiral), Counter, Histogram(slide)
+
+-module(riak_core_metric).
+
+-export([behaviour_info/1, regname/2, join_as_atom/1]).
+
+-export_type([stat_specs/0]).
+
+-type stat_specs() :: [stat()].
+
+-type stat() :: {Name :: atom(),
+ Args :: [type() | group() | presentation()]
+ }.
+
+-type type() :: {type, counter | meter | histogram | duration}.
+-type group() :: {group, GroupName::atom()}.
+-type presentation() :: {presentation, [{Name :: atom(),
+ Spec :: riak_core_metric_duration:display_spec() |
+ riak_core_metric_histogram:display_spec()}]}.
+behaviour_info(callbacks) ->
+ [{new, 0},
+ {value, 2},
+ {value, 3},
+ {update, 2}
+ ].
+
+%% @doc generate the regsitered name of the stat
+%% name Name, for the application App. e.g.
+%% regname(riak_kv, fsm_gets) ->
+%% stats_riak_kv_fsm_gets.
+-spec regname(atom(), atom()) -> atom().
+regname(App, Name) when is_atom(App), is_atom(Name) ->
+ join_as_atom(['stats_', App, $_, Name]).
+
+%% @doc joins a list of terms into a single atom
+%% e.g. join_as_atom(["prefix", '_', atom, "suffix", 7]) ->
+%% 'prefix_atomsiffix7'
+-spec join_as_atom([atom() | string() | binary() | integer()]) ->
+ atom().
+join_as_atom(L) ->
+ join_as_atom(L, <<>>).
+
+-spec join_as_atom([atom() | string() | binary() | integer()],
+ binary()) -> atom().
+join_as_atom([], Acc) ->
+ binary_to_atom(Acc, latin1);
+join_as_atom([Elem|Rest], Acc) ->
+ Bin1 = to_binary(Elem),
+ join_as_atom(Rest, <<Acc/binary, Bin1/binary>>).
+
+%% @doc turn an atom, list or integer
+%% to a binary
+-spec to_binary(atom() | string() |
+ binary() | integer()) -> binary().
+to_binary(Atom) when is_atom(Atom) ->
+ atom_to_binary(Atom, latin1);
+to_binary(List) when is_list(List) ->
+ list_to_binary(List);
+to_binary(Bin) when is_binary(Bin) ->
+ Bin;
+to_binary(Int) when is_integer(Int) ->
+ <<Int>>.
79 src/riak_core_metric_counter.erl
View
@@ -0,0 +1,79 @@
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc A counter. Wraps an integer()
+-module(riak_core_metric_counter).
+
+-behaviour(riak_core_metric).
+
+-export([new/0, value/2, value/3, update/2]).
+
+-export([increment/2, increment/3, decrement/2, decrement/3, total/2]).
+
+%% @doc increments the counter for App named Stat
+%% by Amount.
+-spec increment(atom(), atom(), integer()) ->
+ ok.
+increment(App, Stat, Amount) ->
+ riak_core_metric_proc:update(App, Stat, Amount).
+
+%% @doc increment the counter for App named Stat
+%% by 1.
+-spec increment(atom(), atom()) ->
+ ok.
+increment(App, Stat) ->
+ increment(App, Stat, 1).
+
+%% @doc decrement the counter for App named Stat
+%% by Amount.
+-spec decrement(atom(), atom(), integer()) ->
+ ok.
+decrement(App, Stat, Amount) ->
+ riak_core_metric_proc:update(App, Stat, Amount * -1).
+
+%% @doc decrement the counter for App named Stat
+%% by 1.
+-spec decrement(atom(), atom()) -> ok.
+decrement(App, Stat) ->
+ decrement(App, Stat, 1).
+
+%% @doc return the counter for App named Stat's
+%% current value.
+-spec total(atom(), atom()) -> non_neg_integer().
+total(App, Stat) ->
+ riak_core_metric_proc:value(App, Stat).
+
+%% Behaviour
+-spec new() -> 0.
+new() ->
+ 0.
+
+-spec value(atom(), non_neg_integer()) ->
+ {atom(), non_neg_integer()}.
+value(Name, Counter) ->
+ {Name, Counter}.
+
+-spec value(_, atom(), non_neg_integer()) ->
+ {atom(), non_neg_integer()}.
+value({display_name, Name}, _StatName, Counter) ->
+ value(Name, Counter).
+
+-spec update(integer(), integer()) -> non_neg_integer().
+update(Amount, Counter) when is_integer(Amount) ->
+ erlang:max(Counter + Amount, 0).
104 src/riak_core_metric_duration.erl
View
@@ -0,0 +1,104 @@
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc tracks a duration, min, max, last, mean
+-module(riak_core_metric_duration).
+
+-behaviour(riak_core_metric).
+
+%% Behaviour API
+-export([new/0, value/2, value/3, update/2]).
+
+%% Public API
+-export([start/2, stop/2, cumulative/2]).
+
+-export_type([display_spec/0]).
+
+-record(cuml, {count = 0 :: integer(),
+ min :: integer(),
+ max = -1 :: integer(),
+ mean = 0 :: integer(),
+ last :: integer(),
+ start :: calendar:t_now()}).
+
+-type display() :: [{field(), integer()}].
+-type field() :: count | min | max | mean | last | start.
+-type display_spec() :: [field()].
+
+%% @doc start timing the duration
+-spec start(atom(), atom()) -> ok.
+start(App, Stat) ->
+ riak_core_metric_proc:update(App, Stat, start).
+
+%% @doc stop timing the duration
+-spec stop(atom(), atom()) -> ok.
+stop(App, Stat) ->
+ riak_core_metric_proc:update(App, Stat, stop).
+
+%% @doc get display the current value of
+%% the duration Stat. Value is
+%% returned as a proplist
+-spec cumulative(atom(), atom()) ->
+ display().
+cumulative(App, Stat) ->
+ riak_core_metric_proc:value(App, Stat).
+
+-spec new() -> #cuml{}.
+new() ->
+ #cuml{}.
+
+%% @doc format Dur as a proplist with
+%% default fields count, min, max, mean and last
+-spec value(atom(), #cuml{}) ->
+ display().
+value(Name, Dur) ->
+ display(Name, to_proplist(Dur), [count, min, max, mean, last], []).
+
+%% @doc fromat Dur as a proplist
+%% Fields is a list of values to display
+%% picked from count, min, max, mean, last
+-spec value(display_spec(), atom(), #cuml{}) ->
+ display().
+value(Fields, Name, Dur) ->
+ display(Name, to_proplist(Dur), Fields, []).
+
+-spec update(start, #cuml{}) -> #cuml{};
+ (stop, #cuml{}) -> #cuml{}.
+update(start, Dur) ->
+ Dur#cuml{start=erlang:now()};
+update(stop, #cuml{count=N, min=Min, max=Max, mean=Mean, start=T0}) ->
+ Duration = timer:now_diff(erlang:now(), T0),
+ Min2 = erlang:min(Min, Duration),
+ Max2 = erlang:max(Max, Duration),
+ Mean2 = ((N * Mean) + Duration) div (N+1),
+ #cuml{count=N+1, min=Min2, max=Max2, mean=Mean2, last=Duration, start=undefined}.
+
+%% internal
+-spec display(atom(), display(), [field()], display()) ->
+ display().
+display(_Stat, _Cuml, [], Acc) ->
+ lists:reverse(Acc);
+display(Stat, Cuml, [Field|Rest], Acc) ->
+ Name = riak_core_metric:join_as_atom([Stat, '_', Field]),
+ Value = proplists:get_value(Field, Cuml),
+ display(Stat, Cuml, Rest, [{Name, Value}|Acc]).
+
+-spec to_proplist(#cuml{}) -> display().
+to_proplist(Cuml) when is_record(Cuml, cuml) ->
+ lists:zip(record_info(fields, cuml), tl(tuple_to_list(Cuml))).
100 src/riak_core_metric_histogram.erl
View
@@ -0,0 +1,100 @@
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc An histogram . Wraps slide.
+-module(riak_core_metric_histogram).
+
+-behaviour(riak_core_metric).
+
+-export([new/0, value/2, value/3, update/2]).
+
+-export_type([display_spec/0]).
+
+-type display() :: [{field(), integer()}].
+-type field() :: count | mean | median | '95' | '99' | '100'.
+-type display_spec() :: [ args() | fields() | prefix() ].
+-type args() :: {args, {Min::integer(), Max::integer(), Bins::integer(),
+ RoundingMode:: up | down}}.
+-type fields() :: {fields, [fields()]}.
+-type prefix() :: {prefix, atom() | string() | binary() | integer()}.
+
+
+%% Behaviour
+%% @doc a new, fresh histogram
+-spec new() -> slide:slide().
+new() ->
+ {ok, H} = basho_metrics_nifs:histogram_new(),
+ H.
+
+%% @doc Sum of readings from now to 'window size' seconds ago.
+%% Returns total number of readings and the sum of those
+%% readings.
+-spec value(atom(), slide:slide()) ->
+ {atom(), {non_neg_integer(), number()}}.
+value(Name, Histo) ->
+ Stats = basho_metrics_nifs:histogram_stats(Histo),
+ Count = proplists:get_value(count, Stats),
+ {Name, Count}.
+
+%% @doc returns the fields of the histogram defined in the
+%% display spec. Use the 'args' in the display spec
+%% to produce results.
+%% @see slide:mean_and_nines/6
+-spec value(display_spec(), atom(), slide:slide()) ->
+ display().
+value(DisplaySpec, Name, Histo) ->
+ Fields = proplists:get_value(fields, DisplaySpec),
+ Prefix = proplists:get_value(prefix, DisplaySpec),
+ Stats0 = basho_metrics_nifs:histogram_stats(Histo),
+ Stats = lists:map(fun({p50, V}) -> {median, V};
+ ({p95, V}) -> {'95', V};
+ ({p99, V}) -> {'99', V};
+ ({max, V}) -> {'100', V};
+ (E) -> E end,
+ Stats0),
+ FieldPrefix = field_prefix(Prefix, Name),
+ display(FieldPrefix, Fields, Stats, []).
+
+%% @doc update histogram with Reading for given Moment
+-spec update({integer(), integer()}, slide:slide()) ->
+ slide:slide().
+update({Reading, _Moment}, Histo) ->
+ ok = basho_metrics_nifs:histogram_update(Histo, Reading),
+ Histo.
+
+%% @doc add a prefix Prefix_ to the given Field
+-spec field_prefix(atom(), field()) ->
+ atom().
+field_prefix(undefined, Name) ->
+ Name;
+field_prefix(Prefix, Name) ->
+ riak_core_metric:join_as_atom([Prefix, '_', Name]).
+
+%% @doc produce a proplist containing only specified fields
+-spec display(atom(), [field()], display(), display()) ->
+ display().
+display(_Prefix, [], _Stat, Acc) ->
+ lists:reverse(Acc);
+display(Prefix, [{Field, Name}|Rest], Stats, Acc) ->
+ Item = {Name, proplists:get_value(Field, Stats)},
+ display(Prefix, Rest, Stats, [Item|Acc]);
+display(Prefix, [Field|Rest], Stats, Acc) ->
+ Name = riak_core_metric:join_as_atom([Prefix, '_', Field]),
+ Item = {Name, proplists:get_value(Field, Stats)},
+ display(Prefix, Rest, Stats, [Item|Acc]).
60 src/riak_core_metric_meter.erl
View
@@ -0,0 +1,60 @@
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc A Meter. Wraps spiraltime
+-module(riak_core_metric_meter).
+
+-behaviour(riak_core_metric).
+
+%% Behaviour API
+-export([new/0, value/2, value/3, update/2]).
+
+-export([tick/1]).
+
+%% @doc create a new meter.
+-spec new() -> spiraltime:spiral().
+new() ->
+ {ok, M} = basho_metrics_nifs:meter_new(),
+ M.
+
+%% @doc format the number of entries in the last minute as
+%% {name, count}.
+-spec value(atom(), spiraltime:spiral()) ->
+ {atom(), integer()}.
+value(Name, Meter) ->
+ Stats = basho_metrics_nifs:meter_stats(Meter),
+ {Name, proplists:get_value(count, Stats)}.
+
+%% @doc format the number of entries in the last minute as
+%% {name, count}.
+-spec value(_, atom(), spiraltime:spiral()) ->
+ {atom(), integer()}.
+value({display_name, Name}, _StatName, Meter) ->
+ value(Name, Meter).
+
+%% @doc update the entry for the given Moment by Amount,
+%% in the given Meter
+-spec update({integer(), integer()}, spiraltime:spiral()) ->
+ spiraltime:spiral().
+update({Amount, _Moment}, Meter) ->
+ ok = basho_metrics_nifs:meter_update(Meter, Amount),
+ Meter.
+
+tick(Meter) ->
+ basho_metrics_nifs:meter_tick(Meter).
116 src/riak_core_metric_proc.erl
View
@@ -0,0 +1,116 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc process/state wrapper around a metric
+
+-module(riak_core_metric_proc).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, update/2, value/1, value/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {name, mod, mod_state, presentation, description}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+start_link(Name, Args) ->
+ gen_server:start_link({local, Name}, ?MODULE, [{name, Name}|Args], []).
+
+update(Name, Args) ->
+ gen_server:cast(Name, {update, Args}).
+
+value(Name) ->
+ value(Name, []).
+
+value(Name, Presentation) ->
+ {ok, Val} = gen_server:call(Name, {value, Presentation}),
+ Val.
+
+init(Args) ->
+ Name = proplists:get_value(name, Args),
+ {type, Type} = proplists:lookup(type, Args), %% Does mod need init args?
+ Mod = mod_from_type(Type),
+ Description = proplists:get_value(description, Args),
+ DisplaySpec = proplists:get_value(presentation, Args),
+ ModState = Mod:new(),
+ DoTicks = do_ticks(Mod),
+ if DoTicks == true ->
+ %% start a tick
+ timer:send_interval(5000, tick);
+ true ->
+ ok
+ end,
+ {ok, #state{name=Name, mod=Mod, mod_state=ModState,
+ description=Description, presentation=DisplaySpec}}.
+
+handle_call({value, _}, _From, #state{mod=Mod, mod_state=ModState,
+ presentation=undefined, name=Name}=State) ->
+ Stat = Mod:value(Name, ModState),
+ {reply, {ok, Stat}, State};
+handle_call({value, undefined}, _From, #state{mod=Mod, mod_state=ModState, name=Name}=State) ->
+ Stat = Mod:value(Name, ModState),
+ {reply, {ok, Stat}, State};
+handle_call({value, Presentation}, _From, #state{mod=Mod, mod_state=ModState,
+ presentation=DisplaySpecs, name=Name}=State) ->
+ Stat = case proplists:get_value(Presentation, DisplaySpecs) of
+ undefined ->
+ Mod:value(Name, ModState);
+ DisplaySpec ->
+ Mod:value(DisplaySpec, Name, ModState)
+ end,
+ {reply, {ok, Stat}, State}.
+
+handle_cast({update, Args}, #state{mod=Mod, mod_state=ModState0}=State) ->
+ ModState = Mod:update(Args, ModState0),
+ {noreply, State#state{mod_state=ModState}}.
+
+handle_info(tick, #state{mod=Mod, mod_state=ModState}=State) ->
+ Mod:tick(ModState),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+mod_from_type({mod, Mod}) ->
+ Mod;
+mod_from_type(ShortName) ->
+ list_to_atom("riak_core_metric_" ++ atom_to_list(ShortName)).
+
+do_ticks(Mod) ->
+ case proplists:get_value(tick, Mod:module_info(exports)) of
+ 1 ->
+ true;
+ _ ->
+ false
+ end.
75 src/riak_core_metric_sup.erl
View
@@ -0,0 +1,75 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc starts and supervises a process per stat
+%% register stats through riak_core:register/2
+
+-module(riak_core_metric_sup).
+-behaviour(supervisor).
+-export([start_link/0, start_link/2, init/1]).
+-export([start_stat/3, stop_stat/2, start_stats/2]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_link(App, Mod) ->
+ case start_link() of
+ {ok, _Pid}=Res ->
+ ok = start_stats(App, Mod),
+ Res;
+ Error -> Error
+ end.
+
+init([]) ->
+ remove_slide_private_dirs(),
+ %% Populate supervisor list with stats for already registered app,stat
+ %% modules. Ensures restart of stat procs after a crash of this supervisor.
+ Refs0 = [mod_refs(App, Mod) || {App, Mod} <- riak_core:stat_specs()],
+ Refs = lists:flatten(Refs0),
+ {ok, {{one_for_one, 5, 10}, Refs}}.
+
+start_stat(App, Stat, Args) ->
+ Ref = stat_ref(App, Stat, Args),
+ Pid = case supervisor:start_child(?MODULE, Ref) of
+ {ok, Child} -> Child;
+ {error, {already_started, Child}} -> Child
+ end,
+ Pid.
+
+stop_stat(App, Stat) ->
+ supervisor:terminate_child(?MODULE, {App, Stat}),
+ supervisor:delete_child(?MODULE, {App, Stat}),
+ ok.
+
+start_stats(App, Mod) ->
+ Stats = Mod:stat_specs(),
+ [start_stat(App, Stat, Args) || {Stat, Args} <- Stats],
+ ok.
+
+%% @private
+mod_refs(App, Mod) ->
+ Stats = Mod:stat_specs(),
+ [stat_ref(App, Stat, Args) || {Stat, Args} <- Stats].
+
+stat_ref(App, Stat, Args) ->
+ {{App, Stat}, {riak_core_metric_proc, start_link, [Stat, Args]},
+ permanent, 5000, worker, [riak_core_metric_proc]}.
+
+remove_slide_private_dirs() ->
+ os:cmd("rm -rf " ++ slide:private_dir()).
43 src/riak_core_metric_tmeter.erl
View
@@ -0,0 +1,43 @@
+%%
+%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc A meter and a total counter
+-module(riak_core_metric_tmeter).
+
+-behaviour(riak_core_metric).
+
+%% Behaviour API
+-export([new/0, value/2, value/3, update/2, tick/1]).
+
+new() ->
+ {0, riak_core_metric_meter:new()}.
+
+update({Cnt, Moment}, {Counter, Meter}) ->
+ {Counter+Cnt, riak_core_metric_meter:update({Cnt, Moment}, Meter)}.
+
+value(Name, Value) ->
+ CounterName = riak_core_metric:join_as_atom([Name, '_', total]),
+ value({CounterName, Name}, Name, Value).
+
+value({CounterName, MeterName}, _Name, {Counter, Meter}) ->
+ [{CounterName, Counter},
+ riak_core_metric_meter:value(MeterName, Meter)].
+
+tick({_Counter, Meter}) ->
+ riak_core_metric_meter:tick(Meter).
213 src/riak_core_stat.erl
View
@@ -19,191 +19,78 @@
%% -------------------------------------------------------------------
-module(riak_core_stat).
--behaviour(gen_server2).
%% API
--export([start_link/0, get_stats/0, get_stats/1, update/1]).
+-export([get_stats/0, get_stats/1, update/1]).
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+%% Metrics API
+-export([stat_specs/0]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
--record(cuml, {count = 0 :: integer(),
- min :: integer(),
- max = -1 :: integer(),
- mean = 0 :: integer(),
- last :: integer()}).
-
--record(state, {
- ignored_gossip_total :: integer(),
- rings_reconciled_total :: integer(),
- rejected_handoffs :: integer(),
- gossip_received :: spiraltime:spiral(),
- rings_reconciled :: spiraltime:spiral(),
- converge_epoch :: calendar:t_now(),
- converge_delay :: #cuml{},
- rebalance_epoch :: calendar:t_now(),
- rebalance_delay :: #cuml{}
- }).
-
-%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
-%% @doc Start the server.
-start_link() ->
- gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
+-spec stat_specs() -> riak_core_metric:stat_specs().
+stat_specs() ->
+ [{ignored_gossip_total, [{type, counter}, {group, gossip}]},
+ {rejected_handoffs, [{type, counter}, {group, gossip}]},
+ {handoff_timeouts, [{type, counter}, {group, gossip}]},
+ {gossip_received, [{type, meter}, {group, gossip}]},
+ {rings_reconciled, [{type, tmeter}, {group, gossip}]},
+ {converge_delay, [{type, duration}, {group, gossip},
+ {presentation,
+ [{legacy, [min, max, mean, last]}]}]},
+ {rebalance_delay, [{type, duration}, {group, gossip},
+ {presentation,
+ [{legacy, [min, max, mean, last]}]}]}
+ ].
%% @spec get_stats() -> proplist()
%% @doc Get the current aggregation of stats.
get_stats() ->
- get_stats(slide:moment()).
-
-get_stats(Moment) ->
- gen_server2:call(?MODULE, {get_stats, Moment}, infinity).
-
-%% @spec update(term()) -> ok
-%% @doc Update the given stat.
-update(Stat) ->
- gen_server2:cast(?MODULE, {update, Stat, slide:moment()}).
-
-%% @private
-init([]) ->
- %% Removing the slide directory here would conflict with riak_kv_stat.
- %% We will need to resolve if we ever use slide metrics in this module.
- %%
- %% process_flag(trap_exit, true),
- %% remove_slide_private_dirs(),
-
- {ok, #state{ignored_gossip_total=0,
- rings_reconciled_total=0,
- rejected_handoffs=0,
- gossip_received=spiraltime:fresh(),
- rings_reconciled=spiraltime:fresh(),
- converge_delay=#cuml{},
- rebalance_delay=#cuml{}
- }}.
-
-%% @private
-handle_call({get_stats, Moment}, _From, State) ->
- {reply, produce_stats(State, Moment), State};
-handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
-
-%% @private
-handle_cast({update, Stat, Moment}, State) ->
- {noreply, update(Stat, Moment, State)};
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%% @private
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%% @private
-terminate(_Reason, _State) ->
+ produce_stats(legacy).
+
+get_stats(_Moment) ->
+ produce_stats(legacy).
+
+%% @doc Update the given stat
+-spec update(Stat::atom()) -> ok.
+update(converge_timer_begin) ->
+ riak_core_metric_proc:update(converge_delay, start);
+update(converge_timer_end) ->
+ riak_core_metric_proc:update(converge_delay, stop);
+update(rebalance_timer_begin) ->
+ riak_core_metric_proc:update(rebalance_delay, start);
+update(rebalance_timer_end) ->
+ riak_core_metric_proc:update(rebalance_delay, stop);
+update(rejected_handoffs) ->
+ riak_core_metric_proc:update(rejected_handoffs, 1);
+update(handoff_timeouts) ->
+ riak_core_metric_proc:update(handoff_timeouts, 1);
+update(ignored_gossip) ->
+ riak_core_metric_proc:update(ignored_gossip_totals, 1);
+update(gossip_received) ->
+ riak_core_metric_proc:update(gossip_received, {1, slide:moment()});
+update(rings_reconciled) ->
+ riak_core_metric_proc:update(rings_reconciled, {1, slide:moment()});
+update(_) ->
ok.
-%% @private
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-
-%% @doc Update the given stat in State, returning a new State.
--spec update(Stat::term(), integer(), #state{}) -> #state{}.
-update(converge_timer_begin, _Moment, State) ->
- State#state{converge_epoch=erlang:now()};
-update(converge_timer_end, _Moment, State=#state{converge_epoch=undefined}) ->
- State;
-update(converge_timer_end, _Moment, State=#state{converge_epoch=T0}) ->
- Duration = timer:now_diff(erlang:now(), T0),
- update_cumulative(#state.converge_delay, Duration,
- State#state{converge_epoch=undefined});
-
-update(rebalance_timer_begin, _Moment, State) ->
- State#state{rebalance_epoch=erlang:now()};
-update(rebalance_timer_end, _Moment, State=#state{rebalance_epoch=undefined}) ->
- State;
-update(rebalance_timer_end, _Moment, State=#state{rebalance_epoch=T0}) ->
- Duration = timer:now_diff(erlang:now(), T0),
- update_cumulative(#state.rebalance_delay, Duration,
- State#state{rebalance_epoch=undefined});
-
-update(rejected_handoffs, _Moment, State) ->
- int_incr(#state.rejected_handoffs, State);
-
-update(ignored_gossip, _Moment, State) ->
- int_incr(#state.ignored_gossip_total, State);
-
-update(gossip_received, Moment, State) ->
- spiral_incr(#state.gossip_received, Moment, State);
-
-update(rings_reconciled, Moment, State) ->
- spiral_incr(#state.rings_reconciled, Moment,
- int_incr(#state.rings_reconciled_total, State));
-
-update(_, _, State) ->
- State.
-
-%% @spec spiral_incr(integer(), integer(), state()) -> state()
-%% @doc Increment the value of a spiraltime structure at a given
-%% position of the State tuple.
-spiral_incr(Elt, Moment, State) ->
- setelement(Elt, State,
- spiraltime:incr(1, Moment, element(Elt, State))).
-
-%% @doc Increment the value at the given position of the State tuple.
-int_incr(Elt, State) ->
- int_incr(Elt, 1, State).
-int_incr(Elt, Amount, State) ->
- setelement(Elt, State, element(Elt, State) + Amount).
-
-%% @doc Add a value to a set, updating the cumulative min/max/mean
-update_cumulative(Elt, Value, State) ->
- #cuml{count=N, min=Min, max=Max, mean=Mean} = element(Elt, State),
- Min2 = erlang:min(Min, Value),
- Max2 = erlang:max(Max, Value),
- Mean2 = ((N * Mean) + Value) div (N+1),
- Stat2 = #cuml{count=N+1, min=Min2, max=Max2, mean=Mean2, last=Value},
- setelement(Elt, State, Stat2).
-
-%% @spec produce_stats(state(), integer()) -> proplist()
+%% @spec produce_stats(Presentation : atom()) -> proplist()
%% @doc Produce a proplist-formatted view of the current aggregation
%% of stats.
-produce_stats(State, Moment) ->
- lists:append([gossip_stats(Moment, State),
+produce_stats(Presentation) ->
+ lists:append([gossip_stats(Presentation),
vnodeq_stats()]).
-%% @spec spiral_minute(integer(), integer(), state()) -> integer()
-%% @doc Get the count of events in the last minute from the spiraltime
-%% structure at the given element of the state tuple.
-spiral_minute(_Moment, Elt, State) ->
- {_,Count} = spiraltime:rep_minute(element(Elt, State)),
- Count.
-
-%% @spec gossip_stats(integer(), state()) -> proplist()
+%% @spec gossip_stats(integer()) -> proplist()
%% @doc Get the gossip stats proplist.
-gossip_stats(Moment, State=#state{converge_delay=CDelay,
- rebalance_delay=RDelay}) ->
-
- [{ignored_gossip_total, State#state.ignored_gossip_total},
- {rings_reconciled_total, State#state.rings_reconciled_total},
- {rings_reconciled, spiral_minute(Moment, #state.rings_reconciled, State)},
- {gossip_received, spiral_minute(Moment, #state.gossip_received, State)},
- {converge_delay_min, CDelay#cuml.min},
- {converge_delay_max, CDelay#cuml.max},
- {converge_delay_mean, CDelay#cuml.mean},
- {converge_delay_last, CDelay#cuml.last},
- {rebalance_delay_min, RDelay#cuml.min},
- {rebalance_delay_max, RDelay#cuml.max},
- {rebalance_delay_mean, RDelay#cuml.mean},
- {rebalance_delay_last, RDelay#cuml.last}].
-
+gossip_stats(Presentation) ->
+ GossipStats = [riak_core_metric_proc:value(Name, Presentation) || {Name, Spec} <- stat_specs(), lists:keyfind(gossip, 2, Spec) /= false],
+ lists:flatten( GossipStats ).
%% Provide aggregate stats for vnode queues. Compute instantaneously for now,
%% may need to cache if stats are called heavily (multiple times per seconds)
3  src/riak_core_sup.erl
View
@@ -31,6 +31,7 @@
-export([init/1]).
%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type, Args, Timeout), {I, {I, start_link, Args}, permanent, Timeout, Type, [I]}).
-define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
-define(CHILD(I, Type), ?CHILD(I, Type, 5000)).
-define (IF (Bool, A, B), if Bool -> A; true -> B end).
@@ -59,7 +60,6 @@ init([]) ->
Children = lists:flatten(
[?CHILD(riak_core_sysmon_minder, worker),
- ?CHILD(riak_core_stat, worker),
?CHILD(riak_core_vnode_sup, supervisor, 305000),
?CHILD(riak_core_eventhandler_sup, supervisor),
?CHILD(riak_core_handoff_sup, supervisor),
@@ -70,6 +70,7 @@ init([]) ->
?CHILD(riak_core_node_watcher, worker),
?CHILD(riak_core_vnode_manager, worker),
?CHILD(riak_core_gossip, worker),
+ ?CHILD(riak_core_metric_sup, supervisor, [riak_core, riak_core_stat], 5000),
RiakWebs
]),
4 src/slide.erl
View
@@ -53,6 +53,8 @@
-export([mean_and_nines/2, mean_and_nines/6]).
-export([private_dir/0, sync/1]).
+-export_type([slide/0]).
+
-include_lib("kernel/include/file.hrl").
-include_lib("eunit/include/eunit.hrl").
@@ -68,6 +70,8 @@
readings_m %% moment associated with readings_fh
}).
+-type slide() :: #slide{}.
+
%% @spec fresh() -> slide()
%% @equiv fresh(60)
fresh() -> fresh(60).

No commit comments for this range

Something went wrong with that request. Please try again.