Permalink
Browse files

Merge branch 'gh412-pipe-stats'

  • Loading branch information...
2 parents 807c632 + 045a5a4 commit 0d965bbb304af61fedc7441a8a033debf9805651 @russelldb russelldb committed Nov 15, 2012
Showing with 127 additions and 1 deletion.
  1. +2 −1 src/riak_pipe_app.erl
  2. +3 −0 src/riak_pipe_builder_sup.erl
  3. +122 −0 src/riak_pipe_stat.erl
View
@@ -48,7 +48,8 @@ start(_StartType, _StartArgs) ->
case riak_pipe_sup:start_link() of
{ok, Pid} ->
riak_core:register(riak_pipe, [
- {vnode_module, riak_pipe_vnode}
+ {vnode_module, riak_pipe_vnode},
+ {stat_mod, riak_pipe_stat}
]),
{ok, Pid};
{error, Reason} ->
@@ -59,11 +59,14 @@ new_pipeline(Spec, Options) ->
{ok, Pid, Ref} ->
case riak_pipe_builder:pipeline(Pid) of
{ok, #pipe{sink=#fitting{ref=Ref}}=Pipe} ->
+ riak_pipe_stat:update({create, Pid}),
{ok, Pipe};
_ ->
+ riak_pipe_stat:update(create_error),
{error, startup_failure}
end;
Error ->
+ riak_pipe_stat:update(create_error),
Error
end.
View
@@ -0,0 +1,122 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+%%
+%% @doc Collector for various pipe stats.
+-module(riak_pipe_stat).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link /0, register_stats/0,
+ get_stats/0,
+ produce_stats/0,
+ update/1,
+ stats/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+-define(APP, riak_pipe).
+
+%% -------------------------------------------------------------------
+%% API
+%% -------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+register_stats() ->
+ [(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
+ [register_stat(stat_name(Stat), Type) || {Stat, Type} <- stats()],
+ riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
+
+%% @doc Return current aggregation of all stats.
+-spec get_stats() -> proplists:proplist().
+get_stats() ->
+ case riak_core_stat_cache:get_stats(?APP) of
+ {ok, Stats, _TS} ->
+ Stats;
+ Error -> Error
+ end.
+
+produce_stats() ->
+ {?APP, riak_core_stat_q:get_stats([riak_pipe])}.
+
+update(Arg) ->
+ gen_server:cast(?SERVER, {update, Arg}).
+
+%% gen_server
+
+init([]) ->
+ register_stats(),
+ {ok, ok}.
+
+handle_call(_Req, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast({update, Arg}, State) ->
+ do_update(Arg),
+ {noreply, State};
+handle_cast(_Req, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _Ref, process, _Pid, _Reason}, State) ->
+ do_update(destroy),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% @doc Update the given `Stat'.
+-spec do_update(term()) -> ok.
+do_update({create, Pid}) ->
+ folsom_metrics:notify_existing_metric({?APP, pipeline, create}, 1, spiral),
+ folsom_metrics:notify_existing_metric({?APP, pipeline, active}, {inc, 1}, counter),
+ erlang:monitor(process, Pid);
+do_update(create_error) ->
+ folsom_metrics:notify_existing_metric({?APP, pipeline, create, error}, 1, spiral);
+do_update(destroy) ->
+ folsom_metrics:notify_existing_metric({?APP, pipeline, active}, {dec, 1}, counter).
+
+%% -------------------------------------------------------------------
+%% Private
+%% -------------------------------------------------------------------
+stats() ->
+ [
+ {[pipeline, create], spiral},
+ {[pipeline, create, error], spiral},
+ {[pipeline, active], counter}
+ ].
+
+stat_name(Name) when is_list(Name) ->
+ list_to_tuple([?APP] ++ Name);
+stat_name(Name) when is_atom(Name) ->
+ {?APP, Name}.
+
+register_stat(Name, spiral) ->
+ folsom_metrics:new_spiral(Name);
+register_stat(Name, counter) ->
+ folsom_metrics:new_counter(Name).

0 comments on commit 0d965bb

Please sign in to comment.