This repository has been archived by the owner on Sep 27, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 96
/
logplex_realtime.erl
130 lines (106 loc) · 4.31 KB
/
logplex_realtime.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
%% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without
%% restriction, including without limitation the rights to use,
%% copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(logplex_realtime).
-behaviour(gen_server).
%% gen_server callbacks
-export([start_link/1, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
-export([incr/1, incr/2]).
-record(state, {instance_name,
opts,
conn}).
-include_lib("logplex.hrl").
%% API functions
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
incr(Key) ->
incr(Key, 1).
incr("work_queue_dropped", Inc) when is_integer(Inc) ->
incr(work_queue_dropped, Inc);
incr("drain_buffer_dropped", Inc) when is_integer(Inc) ->
incr(drain_buffer_dropped, Inc);
incr("redis_buffer_dropped", Inc) when is_integer(Inc) ->
incr(redis_buffer_dropped, Inc);
incr(Key, Inc) when is_integer(Inc) ->
ets:update_counter(?MODULE, Key, Inc).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init([Opts]) ->
ets:new(?MODULE, [named_table, set, public]),
reset_stats(),
Self = self(),
spawn_link(fun() -> flush(Self) end),
spawn_link(fun() -> register() end),
case redis:start_link(undefined, Opts) of
{ok, Conn} ->
{ok, #state{conn=Conn, opts=Opts}};
Error ->
{stop, Error}
end.
handle_call(_Msg, _From, State) ->
{reply, {error, invalid_call}, State}.
handle_cast(_Msg, State) ->
io:format("realtime: recv'd ~p~n", [_Msg]),
{noreply, State}.
handle_info(flush, #state{instance_name=undefined}=State) ->
InstanceName = logplex_utils:instance_name(),
handle_info(flush, State#state{instance_name=InstanceName});
handle_info(flush, State) ->
Stats = ets:tab2list(?MODULE),
reset_stats(),
Stats1 = [proplists:lookup(message_received, Stats),
proplists:lookup(message_processed, Stats),
proplists:lookup(message_routed, Stats),
proplists:lookup(message_dropped, Stats),
proplists:lookup(work_queue_dropped, Stats),
proplists:lookup(drain_buffer_dropped, Stats),
proplists:lookup(redis_buffer_dropped, Stats)],
Json = {struct, [{Key,Val} || {Key,Val} <- Stats1, Val > 0]},
Json1 = iolist_to_binary(mochijson2:encode(Json)),
publish(Json1, State);
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
flush(Pid) ->
timer:sleep(1000),
Pid ! flush,
flush(Pid).
register() ->
redis_helper:register_stat_instance(),
timer:sleep(10 * 1000),
register().
reset_stats() ->
true = ets:insert(?MODULE, [{message_received, 0},
{message_processed, 0},
{message_routed, 0},
{message_dropped, 0},
{work_queue_dropped, 0},
{drain_buffer_dropped, 0},
{redis_buffer_dropped, 0}]).
publish(Json, #state{instance_name=InstanceName, conn=Conn}=State) ->
redis:q(Conn, [<<"PUBLISH">>, iolist_to_binary([<<"stats.">>, InstanceName]), Json], 60000),
{noreply, State}.