Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 314 lines (293 sloc) 15.368 kB
d036073 BossDB is now its own project
Evan Miller authored
1 -module(boss_news_controller).
2
3 -behaviour(gen_server).
4
5 -export([start_link/0, start_link/1]).
6
7 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
8
9 -record(state, {
10 watch_dict = dict:new(),
11 ttl_tree = gb_trees:empty(),
12
13 set_watchers = dict:new(),
14 id_watchers = dict:new(),
15
16 set_attr_watchers = dict:new(),
17 id_attr_watchers = dict:new(),
18 watch_counter = 0}).
19
20 -record(watch, {
21 watch_list = [],
22 callback,
23 user_info,
24 exp_time,
25 ttl}).
26
27 start_link() ->
28 start_link([]).
29
30 start_link(Args) ->
31 gen_server:start_link({global, boss_news}, ?MODULE, Args, []).
32
33 init(_Options) ->
34 {ok, #state{}}.
35
36 handle_call(reset, _From, _State) ->
37 {reply, ok, #state{}};
38 handle_call(dump, _From, State0) ->
39 State = prune_expired_entries(State0),
40 {reply, State, State};
41 handle_call({watch, TopicString, CallBack, UserInfo, TTL}, From, State0) ->
42 WatchId = State0#state.watch_counter,
43 {reply, RetVal, State} = handle_call({set_watch, WatchId, TopicString, CallBack, UserInfo, TTL}, From, State0),
44 case RetVal of
45 ok ->
46 {reply, {ok, WatchId}, State#state{ watch_counter = WatchId + 1 }};
47 Other ->
48 {reply, Other, State0}
49 end;
50 handle_call({set_watch, WatchId, TopicString, CallBack, UserInfo, TTL}, From, State0) ->
51 {reply, _, State} = handle_call({cancel_watch, WatchId}, From, State0),
52 ExpTime = future_time(TTL),
53 {RetVal, NewState, WatchList} = lists:foldr(fun
54 (SingleTopic, {ok, StateAcc, WatchListAcc}) ->
55 case re:split(SingleTopic, "\\.", [{return, list}]) of
56 [Id, Attr] ->
57 [Module, IdNum] = re:split(Id, "-", [{return, list}]),
58 {NewState1, WatchInfo} = case IdNum of
59 "*" ->
60 SetAttrWatchers = case dict:find(Module, StateAcc#state.set_attr_watchers) of
61 {ok, Val} -> Val;
62 _ -> []
63 end,
64 {StateAcc#state{
65 set_attr_watchers = dict:store(Module, [WatchId|SetAttrWatchers], StateAcc#state.set_attr_watchers)
66 }, {set_attr, Module, Attr}};
67 _ ->
68 IdAttrWatchers = case dict:find(Id, StateAcc#state.id_attr_watchers) of
69 {ok, Val} -> Val;
70 _ -> []
71 end,
72 {StateAcc#state{
73 id_attr_watchers = dict:store(Id, [WatchId|IdAttrWatchers], StateAcc#state.id_attr_watchers)
74 }, {id_attr, Id, Attr}}
75 end,
76 {ok, NewState1, [WatchInfo|WatchListAcc]};
77 _ ->
78 case re:split(SingleTopic, "-", [{return, list}]) of
79 [_Module, _IdNum] ->
80 IdWatchers = case dict:find(SingleTopic, State#state.id_watchers) of
81 {ok, Val} -> Val;
82 _ -> []
83 end,
84 {ok, StateAcc#state{
85 id_watchers = dict:store(SingleTopic, [WatchId|IdWatchers], StateAcc#state.id_watchers)
86 }, [{id, SingleTopic}|WatchListAcc]};
87 [_PluralModel] ->
88 SetWatchers = case dict:find(SingleTopic, StateAcc#state.set_watchers) of
89 {ok, Val} -> Val;
90 _ -> []
91 end,
92 {ok, StateAcc#state{
93 set_watchers = dict:store(SingleTopic, [WatchId|SetWatchers], StateAcc#state.set_watchers)
94 }, [{set, SingleTopic}|WatchListAcc]}
95 end
96 end;
97 (_, Error) ->
98 Error
99 end, {ok, State, []}, re:split(TopicString, ", +", [{return, list}])),
100 case RetVal of
101 ok -> {reply, RetVal, NewState#state{
102 watch_dict = dict:store(WatchId,
103 #watch{
104 watch_list = WatchList,
105 callback = CallBack,
106 user_info = UserInfo,
107 exp_time = ExpTime,
108 ttl = TTL}, NewState#state.watch_dict),
22cbbd2 @evanmiller Update riakc, move to tiny_pq
evanmiller authored
109 ttl_tree = tiny_pq:insert_value(ExpTime, WatchId, NewState#state.ttl_tree)
d036073 BossDB is now its own project
Evan Miller authored
110 }};
111 Error -> {reply, Error, State}
112 end;
113 handle_call({cancel_watch, WatchId}, _From, State) ->
114 {RetVal, NewState} = case dict:find(WatchId, State#state.watch_dict) of
115 {ok, #watch{ exp_time = ExpTime }} ->
22cbbd2 @evanmiller Update riakc, move to tiny_pq
evanmiller authored
116 NewTree = tiny_pq:move_value(ExpTime, 0, WatchId, State#state.ttl_tree),
d036073 BossDB is now its own project
Evan Miller authored
117 {ok, State#state{ ttl_tree = NewTree }};
118 _ ->
119 {{error, not_found}, State}
120 end,
121 {reply, RetVal, prune_expired_entries(NewState)};
122 handle_call({extend_watch, WatchId}, _From, State0) ->
123 State = prune_expired_entries(State0),
124 {RetVal, NewState} = case dict:find(WatchId, State#state.watch_dict) of
125 {ok, #watch{ exp_time = ExpTime, ttl = TTL } = Watch} ->
126 NewExpTime = future_time(TTL),
22cbbd2 @evanmiller Update riakc, move to tiny_pq
evanmiller authored
127 NewTree = tiny_pq:move_value(ExpTime, NewExpTime, WatchId, State#state.ttl_tree),
d036073 BossDB is now its own project
Evan Miller authored
128 {ok, State#state{ ttl_tree = NewTree,
129 watch_dict = dict:store(WatchId, Watch#watch{ exp_time = NewExpTime }, State#state.watch_dict) }};
130 _ ->
131 {{error, not_found}, State}
132 end,
133 {reply, RetVal, NewState};
134 handle_call({created, Id, Attrs}, _From, State0) ->
135 State = prune_expired_entries(State0),
136 [Module | _IdNum] = re:split(Id, "-", [{return, list}]),
137 PluralModel = inflector:pluralize(Module),
138 {RetVal, State1} = case dict:find(PluralModel, State#state.set_watchers) of
139 {ok, SetWatchers} ->
140 Record = activate_record(Id, Attrs),
141 NewState = lists:foldr(fun(WatchId, Acc0) ->
142 #watch{ watch_list = WatchList,
143 callback = CallBack,
144 user_info = UserInfo } = dict:fetch(WatchId, State#state.watch_dict),
145 lists:foldr(fun
146 ({set, TopicString}, Acc1) when TopicString =:= PluralModel ->
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
147 execute_callback(CallBack, created, Record, UserInfo, WatchId),
148 Acc1;
d036073 BossDB is now its own project
Evan Miller authored
149 (_, Acc1) ->
150 Acc1
151 end, Acc0, WatchList)
152 end, State, SetWatchers),
153 {ok, NewState};
154 _ -> {ok, State}
155 end,
156 {reply, RetVal, State1};
157 handle_call({deleted, Id, OldAttrs}, _From, State0) ->
158 State = prune_expired_entries(State0),
159 [Module | _IdNum] = re:split(Id, "-", [{return, list}]),
160 PluralModel = inflector:pluralize(Module),
161 {RetVal, State1} = case dict:find(PluralModel, State#state.set_watchers) of
162 {ok, SetWatchers} ->
163 Record = activate_record(Id, OldAttrs),
164 NewState = lists:foldr(fun(WatchId, Acc0) ->
165 #watch{ watch_list = WatchList,
166 callback = CallBack,
167 user_info = UserInfo } = dict:fetch(WatchId, State#state.watch_dict),
168 lists:foldr(fun
169 ({set, TopicString}, Acc1) when TopicString =:= PluralModel ->
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
170 execute_callback(CallBack, deleted, Record, UserInfo, WatchId),
171 Acc1;
d036073 BossDB is now its own project
Evan Miller authored
172 ({id, TopicString}, Acc1) when TopicString =:= Id ->
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
173 execute_callback(CallBack, deleted, Record, UserInfo, WatchId),
174 Acc1;
d036073 BossDB is now its own project
Evan Miller authored
175 (_, Acc1) ->
176 Acc1
177 end, Acc0, WatchList)
178 end, State, SetWatchers),
179 {ok, NewState};
180 _ -> {ok, State}
181 end,
182 {reply, RetVal, State1};
183 handle_call({updated, Id, OldAttrs, NewAttrs}, _From, State0) ->
184 State = prune_expired_entries(State0),
185 [Module | _IdNum] = re:split(Id, "-", [{return, list}]),
186 IdWatchers = case dict:find(Id, State#state.id_attr_watchers) of
187 {ok, Val} -> Val;
188 _ -> []
189 end,
190 WildcardWatchers = case dict:find(Module, State#state.set_attr_watchers) of
191 {ok, Val1} -> Val1;
192 _ -> []
193 end,
194 AllWatchers = IdWatchers ++ WildcardWatchers,
195 OldRecord = activate_record(Id, OldAttrs),
196 NewRecord = activate_record(Id, NewAttrs),
197 NewState = lists:foldr(fun
198 ({Key, OldVal}, Acc0) ->
199 KeyString = atom_to_list(Key),
200 case NewRecord:Key() of
201 OldVal -> Acc0;
202 NewVal ->
203 lists:foldr(fun(WatchId, Acc1) ->
204 #watch{ watch_list = WatchList,
205 callback = CallBack,
206 user_info = UserInfo } = dict:fetch(WatchId, State#state.watch_dict),
207 lists:foldr(fun
208 ({id_attr, ThisId, Attr}, Acc2) when ThisId =:= Id, Attr =:= KeyString ->
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
209 execute_callback(CallBack, updated, {NewRecord, Key, OldVal, NewVal}, UserInfo, WatchId),
210 Acc2;
d036073 BossDB is now its own project
Evan Miller authored
211 ({id_attr, ThisId, "*"}, Acc2) when ThisId =:= Id ->
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
212 execute_callback(CallBack, updated, {NewRecord, Key, OldVal, NewVal}, UserInfo, WatchId),
213 Acc2;
d036073 BossDB is now its own project
Evan Miller authored
214 ({set_attr, ThisModule, Attr}, Acc2) when ThisModule =:= Module, Attr =:= KeyString ->
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
215 execute_callback(CallBack, updated, {NewRecord, Key, OldVal, NewVal}, UserInfo, WatchId),
216 Acc2;
d036073 BossDB is now its own project
Evan Miller authored
217 ({set_attr, ThisModule, "*"}, Acc2) when ThisModule =:= Module ->
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
218 execute_callback(CallBack, updated, {NewRecord, Key, OldVal, NewVal}, UserInfo, WatchId),
219 Acc2;
d036073 BossDB is now its own project
Evan Miller authored
220 (_, Acc2) -> Acc2
221 end, Acc1, WatchList)
222 end, Acc0, AllWatchers)
223 end
224 end, State, OldRecord:attributes()),
225 {reply, ok, NewState}.
226
227 handle_cast(_Request, State) ->
228 {noreply, State}.
229
230 terminate(_Reason, _State) ->
231 ok.
232
233 code_change(_OldVsn, State, _Extra) ->
234 {ok, State}.
235
236 handle_info(_Info, State) ->
237 {noreply, State}.
238
239
240 future_time(TTL) ->
241 {MegaSecs, Secs, _} = erlang:now(),
242 MegaSecs * 1000 * 1000 + Secs + TTL.
243
244 activate_record(Id, Attrs) ->
245 [Module | _IdNum] = re:split(Id, "-", [{return, list}]),
246 Type = list_to_atom(Module),
247 DummyRecord = boss_record_lib:dummy_record(Type),
248 apply(Type, new, lists:map(fun
249 (id) -> Id;
250 (Key) -> proplists:get_value(Key, Attrs)
251 end, DummyRecord:attribute_names())).
252
253 prune_expired_entries(#state{ ttl_tree = Tree } = State) ->
254 Now = future_time(0),
22cbbd2 @evanmiller Update riakc, move to tiny_pq
evanmiller authored
255 {NewState, NewTree} = tiny_pq:prune_collect_old(fun(WatchId, StateAcc) ->
d036073 BossDB is now its own project
Evan Miller authored
256 #watch{ watch_list = WatchList } = dict:fetch(WatchId, StateAcc#state.watch_dict),
257 NewState = lists:foldr(fun
258 ({id, TopicString}, Acc) ->
259 NewDict = case dict:fetch(TopicString, StateAcc#state.id_watchers) of
260 [WatchId] ->
261 dict:erase(TopicString, StateAcc#state.id_watchers);
262 Watchers ->
263 dict:store(TopicString, lists:delete(WatchId, Watchers), StateAcc#state.id_watchers)
264 end,
265 Acc#state{ id_watchers = NewDict };
266 ({set, TopicString}, Acc) ->
267 NewDict = case dict:fetch(TopicString, StateAcc#state.set_watchers) of
268 [WatchId] ->
269 dict:erase(TopicString, StateAcc#state.set_watchers);
270 Watchers ->
271 dict:store(TopicString, lists:delete(WatchId, Watchers), StateAcc#state.set_watchers)
272 end,
273 Acc#state{ set_watchers = NewDict };
274 ({id_attr, Id, _Attr}, Acc) ->
275 NewDict = case dict:fetch(Id, StateAcc#state.id_attr_watchers) of
276 [WatchId] ->
277 dict:erase(Id, StateAcc#state.id_attr_watchers);
278 Watchers ->
279 dict:store(Id, lists:delete(WatchId, Watchers), StateAcc#state.id_attr_watchers)
280 end,
281 Acc#state{ id_attr_watchers = NewDict };
282 ({set_attr, Module, _Attr}, Acc) ->
283 NewDict = case dict:fetch(Module, StateAcc#state.set_attr_watchers) of
284 [WatchId] ->
285 dict:erase(Module, StateAcc#state.set_attr_watchers);
286 Watchers ->
287 dict:store(Module, lists:delete(WatchId, Watchers), StateAcc#state.set_attr_watchers)
288 end,
289 Acc#state{ set_attr_watchers = NewDict };
290 (_, Acc) ->
291 Acc
292 end, StateAcc, WatchList),
293 NewState#state{ watch_dict = dict:erase(WatchId, StateAcc#state.watch_dict) }
294 end, State, Tree, Now),
295 NewState#state{ ttl_tree = NewTree }.
296
8076dda Spawn workers for BossNews callbacks
Evan Miller authored
297 execute_callback(Fun, Event, EventInfo, UserInfo, WatchId) when is_function(Fun) ->
298 erlang:spawn(fun() ->
299 Result = case proplists:get_value(arity, erlang:fun_info(Fun)) of
300 2 ->
301 Fun(Event, EventInfo);
302 3 ->
303 Fun(Event, EventInfo, UserInfo)
304 end,
305 case Result of
306 {ok, cancel_watch} ->
307 boss_news:cancel_watch(WatchId);
308 {ok, extend_watch} ->
309 boss_news:extend_watch(WatchId);
310 _ ->
311 ok
312 end
313 end).
Something went wrong with that request. Please try again.