Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 234 lines (199 sloc) 8.412 kb
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
1 %% Copyright (c) 2010 Jacob Vorreuter <jacob.vorreuter@gmail.com>
2 %%
3 %% Permission is hereby granted, free of charge, to any person
4 %% obtaining a copy of this software and associated documentation
5 %% files (the "Software"), to deal in the Software without
6 %% restriction, including without limitation the rights to use,
7 %% copy, modify, merge, publish, distribute, sublicense, and/or sell
8 %% copies of the Software, and to permit persons to whom the
9 %% Software is furnished to do so, subject to the following
10 %% conditions:
11 %%
12 %% The above copyright notice and this permission notice shall be
13 %% included in all copies or substantial portions of the Software.
14 %%
15 %% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 %% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
17 %% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 %% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
19 %% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
20 %% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
22 %% OTHER DEALINGS IN THE SOFTWARE.
23 -module(nsync_callback).
24 -export([handle/1]).
25
26 -include_lib("logplex.hrl").
27
8ae7193 @jkvor add drains to token record
jkvor authored
28 %% nsync callbacks
29
30 %% LOAD
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
31 handle({load, <<"ch:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
32 Id = list_to_integer(parse_id(Rest)),
8ae7193 @jkvor add drains to token record
jkvor authored
33 create_channel(Id, Dict);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
34
35 handle({load, <<"tok:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
36 Id = list_to_binary(parse_id(Rest)),
8ae7193 @jkvor add drains to token record
jkvor authored
37 create_token(Id, Dict);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
38
39 handle({load, <<"drain:", Rest/binary>>, Dict}) when is_tuple(Dict) ->
40 Id = list_to_integer(parse_id(Rest)),
8ae7193 @jkvor add drains to token record
jkvor authored
41 create_drain(Id, Dict);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
42
43 handle({load, _Key, _Val}) ->
8ae7193 @jkvor add drains to token record
jkvor authored
44 ok;
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
45
46 handle({load, eof}) ->
8ae7193 @jkvor add drains to token record
jkvor authored
47 populate_token_channel_data(ets:tab2list(tokens)),
48 populate_token_drain_data(ets:tab2list(drains)),
442ef3c @jkvor enable read-only mode during nsync resync
jkvor authored
49 error_logger:info_msg("NSYNC sync complete"),
50 application:set_env(logplex, read_only, false),
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
51 ok;
52
8ae7193 @jkvor add drains to token record
jkvor authored
53 %% STREAM
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
54 handle({cmd, "hmset", [<<"ch:", Rest/binary>> | Args]}) ->
55 Id = list_to_integer(parse_id(Rest)),
56 Dict = dict_from_list(Args),
8ae7193 @jkvor add drains to token record
jkvor authored
57 create_channel(Id, Dict);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
58
59 handle({cmd, "hmset", [<<"tok:", Rest/binary>> | Args]}) ->
60 Id = list_to_binary(parse_id(Rest)),
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
61 Dict = dict_from_list(Args),
8ae7193 @jkvor add drains to token record
jkvor authored
62 Token = create_token(Id, Dict),
63 populate_token_channel_data([Token]);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
64
65 handle({cmd, "hmset", [<<"drain:", Rest/binary>> | Args]}) ->
66 Id = list_to_integer(parse_id(Rest)),
67 Dict = dict_from_list(Args),
8ae7193 @jkvor add drains to token record
jkvor authored
68 Drain = create_drain(Id, Dict),
69 populate_token_drain_data([Drain]);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
70
71 handle({cmd, "del", [<<"ch:", Rest/binary>> | _Args]}) ->
72 Id = list_to_integer(parse_id(Rest)),
8ae7193 @jkvor add drains to token record
jkvor authored
73 ets:delete(channels, Id);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
74
75 handle({cmd, "del", [<<"tok:", Rest/binary>> | _Args]}) ->
76 Id = list_to_binary(parse_id(Rest)),
8ae7193 @jkvor add drains to token record
jkvor authored
77 ets:delete(tokens, Id);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
78
79 handle({cmd, "del", [<<"drain:", Rest/binary>> | _Args]}) ->
80 Id = list_to_integer(parse_id(Rest)),
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
81 ets:delete(drains, Id),
8ae7193 @jkvor add drains to token record
jkvor authored
82 remove_token_drain_data(Id);
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
83
84 handle({cmd, "hset", [<<"ch:", Rest/binary>>, <<"addon">>, Addon]}) ->
85 Id = list_to_integer(parse_id(Rest)),
8ae7193 @jkvor add drains to token record
jkvor authored
86 case logplex_channel:lookup(Id) of
87 undefined ->
88 error_logger:error_report([?MODULE, set_addon, undefined_channel, Id]);
89 Channel ->
90 ets:insert(channels, Channel#channel{addon=Addon}),
91 [ets:insert(tokens, Token#token{addon=Addon}) || Token <- logplex_channel:lookup_tokens(Id)]
92 end,
93 ok;
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
94
95 handle({cmd, _Cmd, _Args}) ->
8ae7193 @jkvor add drains to token record
jkvor authored
96 ok;
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
97
442ef3c @jkvor enable read-only mode during nsync resync
jkvor authored
98 handle({error, closed}) ->
99 error_logger:error_msg("NSYNC connection closed. Read-only mode enabled"),
100 application:set_env(logplex, read_only, true),
101 ok;
102
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
103 handle(_) ->
104 ok.
105
8ae7193 @jkvor add drains to token record
jkvor authored
106 %% Helper functions
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
107 create_channel(Id, Dict) ->
8ae7193 @jkvor add drains to token record
jkvor authored
108 case dict_find(<<"app_id">>, Dict) of
109 undefined ->
110 error_logger:error_report([?MODULE, create_channel, missing_app_id, Id, dict:to_list(Dict)]);
111 Val ->
112 AppId = list_to_integer(binary_to_list(Val)),
113 Channel = #channel{id=Id,
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
114 name=dict_find(<<"name">>, Dict),
115 app_id=AppId,
116 addon=dict_find(<<"addon">>, Dict)},
8ae7193 @jkvor add drains to token record
jkvor authored
117 ets:insert(channels, Channel),
118 Channel
119 end.
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
120
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
121 create_token(Id, Dict) ->
8ae7193 @jkvor add drains to token record
jkvor authored
122 case dict_find(<<"ch">>, Dict) of
123 undefined ->
124 error_logger:error_report([?MODULE, create_token, missing_ch, Id, dict:to_list(Dict)]);
125 Val1 ->
126 Ch = list_to_integer(binary_to_list(Val1)),
127 Name = dict_find(<<"name">>, Dict),
128 Token = #token{
129 id=Id,
130 channel_id=Ch,
131 name=Name
132 },
133 ets:insert(tokens, Token),
134 Token
135 end.
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
136
137 create_drain(Id, Dict) ->
8ae7193 @jkvor add drains to token record
jkvor authored
138 case dict_find(<<"ch">>, Dict) of
139 undefined ->
140 error_logger:error_report([?MODULE, create_drain, missing_ch, Id, dict:to_list(Dict)]);
141 Val1 ->
142 Ch = list_to_integer(binary_to_list(Val1)),
143 case dict_find(<<"port">>, Dict) of
144 undefined ->
145 error_logger:error_report([?MODULE, create_drain, missing_port, Id, dict:to_list(Dict)]);
146 Val2 ->
147 Port = list_to_integer(binary_to_list(Val2)),
148 case dict_find(<<"host">>, Dict) of
149 undefined ->
150 error_logger:error_report([?MODULE, create_drain, missing_host, Id, dict:to_list(Dict)]);
151 Host ->
152 Drain = #drain{
153 id=Id,
154 channel_id=Ch,
155 host=Host,
156 port=Port
157 },
158 ets:insert(drains, Drain),
159 Drain
160 end
161 end
162 end.
163
164 populate_token_channel_data([]) ->
165 ok;
166
167 populate_token_channel_data([Token|Tail]) when is_record(Token, token) ->
168 case logplex_channel:lookup(Token#token.channel_id) of
169 undefined ->
170 error_logger:error_report([?MODULE, populate_token_channel_data, undefined_channel, Token]);
171 #channel{app_id=AppId, addon=Addon} ->
172 ets:insert(tokens, Token#token{app_id=AppId, addon=Addon})
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
173 end,
8ae7193 @jkvor add drains to token record
jkvor authored
174 populate_token_channel_data(Tail);
175
176 populate_token_channel_data([_|Tail]) ->
177 populate_token_channel_data(Tail).
178
179 populate_token_drain_data([]) ->
180 ok;
181
182 populate_token_drain_data([Drain|Tail]) when is_record(Drain, drain) ->
183 case ets:match_object(tokens, #token{id='_', channel_id=Drain#drain.channel_id, name='_', app_id='_', addon='_'}) of
184 [] ->
185 error_logger:error_report([?MODULE, populate_token_drain_data, undefined_tokens, Drain]);
186 Tokens ->
187 Drain1 = Drain#drain{resolved_host=logplex_utils:resolve_host(Drain#drain.host)},
188 ets:insert(tokens, [Token#token{drains=[Drain1|Drains]} || #token{drains=Drains}=Token <- Tokens])
e39ef3a @jkvor remove mnesia in favor of nsync replication
jkvor authored
189 end,
8ae7193 @jkvor add drains to token record
jkvor authored
190 populate_token_drain_data(Tail);
191
192 populate_token_drain_data([_|Tail]) ->
193 populate_token_drain_data(Tail).
194
195 remove_token_drain_data(DrainId) ->
196 case logplex_drain:lookup(DrainId) of
197 undefined ->
198 error_logger:error_report([?MODULE, remove_token_drain_data, undefined_drain, DrainId]);
199 Drain ->
200 case ets:match_object(tokens, #token{id='_', channel_id=Drain#drain.channel_id, name='_', app_id='_', addon='_'}) of
201 [] ->
202 error_logger:error_report([?MODULE, remove_token_drain_data, undefined_tokens, Drain]);
203 Tokens ->
204 ets:insert(tokens, [begin
205 Drains1 = lists:filter(fun(#drain{id=Id}) -> Id =/= DrainId end, Drains),
206 Token#token{drains=Drains1}
207 end || #token{drains=Drains}=Token <- Tokens])
208 end
209 end.
210
211 parse_id(Bin) ->
212 parse_id(Bin, []).
213
214 parse_id(<<":", _/binary>>, Acc) ->
215 lists:reverse(Acc);
216
217 parse_id(<<C, Rest/binary>>, Acc) ->
218 parse_id(Rest, [C|Acc]).
6f29707 @jkvor use nsync to transition between redis and mnesia
jkvor authored
219
220 dict_from_list(List) ->
221 dict_from_list(List, dict:new()).
222
223 dict_from_list([], Dict) ->
224 Dict;
225
226 dict_from_list([Key, Val | Rest], Dict) ->
227 dict_from_list(Rest, dict:store(Key, Val, Dict)).
0f1b0b4 @jkvor add logging around db load
jkvor authored
228
229 dict_find(Key, Dict) ->
230 case dict:find(Key, Dict) of
231 {ok, Val} -> Val;
232 _ -> undefined
233 end.
Something went wrong with that request. Please try again.