Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
WHISTLE-42: deduplicate call events
  • Loading branch information
k-anderson committed Mar 3, 2012
1 parent 42266b4 commit 78f3514
Showing 1 changed file with 69 additions and 25 deletions.
94 changes: 69 additions & 25 deletions lib/whistle-1.0.0/src/gen_listener.erl
Expand Up @@ -92,6 +92,7 @@ behaviour_info(_) ->
,module_timeout_ref = 'undefined' :: 'undefined' | reference() % when the client sets a timeout, gen_listener calls shouldn't negate it, only calls that pass through to the client
,active_responders = [] :: [pid(),...] | [] %% list of pids processing requests
,other_queues = [] :: [{ne_binary(), bindings()},...] | [] %% {QueueName, Binding()}
,last_call_event = [] :: proplist()
}).

-define(TIMEOUT_RETRY_CONN, 1000).
Expand Down Expand Up @@ -312,12 +313,10 @@ handle_cast(Message, #state{module=Module, module_state=ModState, module_timeout
end.

-spec handle_info/2 :: (term(), #state{}) -> handle_info_ret().
handle_info({#'basic.deliver'{}=BD, #amqp_msg{props = #'P_basic'{content_type=CT}, payload = Payload}}, #state{active_responders=ARs}=State) ->
handle_info({#'basic.deliver'{}=BD, #amqp_msg{props = #'P_basic'{content_type=CT}, payload = Payload}}, State) ->
case catch handle_event(Payload, CT, BD, State) of
Pid when is_pid(Pid) ->
{noreply, State#state{active_responders=[Pid | ARs]}, hibernate};
ignore ->
{noreply, State};
#state{}=S ->
{noreply, S, hibernate};
{'EXIT', Why} ->
?LOG(alert, "exception: ~p", [Why]),
{stop, Why, State}
Expand Down Expand Up @@ -396,33 +395,29 @@ terminate(Reason, #state{module=Module, module_state=ModState}) ->
Module:terminate(Reason, ModState),
?LOG_END("~s terminated cleanly, going down", [Module]).

-spec handle_event/4 :: (ne_binary(), ne_binary(), #'basic.deliver'{}, #state{}) -> pid().
-spec handle_event/4 :: (ne_binary(), ne_binary(), #'basic.deliver'{}, #state{}) -> #state{}.
handle_event(Payload, <<"application/json">>, BD, State) ->
JObj = mochijson2:decode(Payload),
process_req(State, JObj, BD);
handle_event(Payload, <<"application/erlang">>, BD, State) ->
JObj = binary_to_term(Payload),
process_req(State, JObj, BD).

-spec process_req/3 :: (#state{}, wh_json:json_object(), #'basic.deliver'{}) -> pid().
process_req(#state{queue=Queue, responders=Responders, module=Module, module_state=ModState, other_queues=OtherQueues}, JObj, BD) ->
OtherQueueNames = [OtherQueueName || {OtherQueueName, _} <- OtherQueues],
Props1 = case catch Module:handle_event(JObj, ModState) of
{reply, Props} when is_list(Props) -> [{server, self()}
,{queue, Queue}
,{other_queues, OtherQueueNames}
| Props
];
{'EXIT', _Why} -> [{server, self()}
,{queue, Queue}
,{other_queues, OtherQueueNames}
];
ignore -> ignore
end,
case Props1 of
ignore -> ignore;
_Else ->
proc_lib:spawn_link(fun() -> _ = wh_util:put_callid(JObj), process_req(Props1, Responders, JObj, BD) end)

-spec process_req/3 :: (#state{}, wh_json:json_object(), #'basic.deliver'{}) -> #state{}.
process_req(#state{responders=Responders, active_responders=ARs}=State, JObj, BD) ->
case dedup_events(wh_util:get_event_type(JObj), JObj, State) of
duplicate -> State;
#state{}=S ->
case handle_callback_event(S, JObj) of
ignore -> State;
Props ->
Pid = proc_lib:spawn_link(fun() ->
_ = wh_util:put_callid(JObj),
process_req(Props, Responders, JObj, BD)
end),
S#state{active_responders=[Pid | ARs]}
end
end.

-spec process_req/4 :: (wh_proplist(), responders(), wh_json:json_object(), #'basic.deliver'{}) -> 'ok'.
Expand All @@ -441,6 +436,22 @@ process_req(Props, Responders, JObj, BD) ->
],
wait_for_handlers(Handlers).

-spec handle_callback_event/2 :: (#state{}, wh_json:json_object()) -> 'ignore' | proplist().
handle_callback_event(#state{module=Module, module_state=ModState, queue=Queue, other_queues=OtherQueues}, JObj) ->
OtherQueueNames = [OtherQueueName || {OtherQueueName, _} <- OtherQueues],
case catch Module:handle_event(JObj, ModState) of
{reply, Props} when is_list(Props) -> [{server, self()}
,{queue, Queue}
,{other_queues, OtherQueueNames}
| Props
];
{'EXIT', _Why} -> [{server, self()}
,{queue, Queue}
,{other_queues, OtherQueueNames}
];
ignore -> ignore
end.

%% allow wildcard (<<"*">>) in the Key to match either (or both) Category and Name
-spec maybe_event_matches_key/2 :: (responder_callback_mapping(), responder_callback_mapping()) -> boolean().
maybe_event_matches_key(Evt, Evt) -> true;
Expand Down Expand Up @@ -533,3 +544,36 @@ stop_timer(Ref) when is_reference(Ref) ->
start_timer(Timeout) when is_integer(Timeout) andalso Timeout >= 0 ->
erlang:send_after(Timeout, self(), ?CALLBACK_TIMEOUT_MSG);
start_timer(_) -> 'undefined'.

-spec dedup_events/3 :: ({ne_binary(), ne_binary()}, wh_json:json_object(), #state{}) -> 'duplicate' | #state{}.
dedup_events({<<"call_event">>, EventName}, JObj, #state{last_call_event=Props}=State) ->
{LastAppName, LastAppData, LastTimestamp} = props:get_value(EventName, Props, {undefined, undefined, 0}),
EventTimestamp = wh_json:get_integer_value(<<"Timestamp">>, JObj, 0),
EventAppName = wh_json:get_value(<<"Raw-Application-Name">>, JObj),
EventAppData = wh_json:get_value(<<"Raw-Application-Data">>, JObj, <<>>),

EventOccuredAfterLast = (EventTimestamp =:= 0
orelse
EventTimestamp > LastTimestamp
),
SameTimeDifferentEvent = (EventTimestamp >= LastTimestamp
andalso
(EventAppName =/= LastAppName
orelse
EventAppData =/= LastAppData
)
),

case EventOccuredAfterLast orelse SameTimeDifferentEvent of
true -> State#state{last_call_event=[{EventName, {EventAppName, EventAppData, EventTimestamp}}
| props:delete(EventName, Props)
]};
false when not EventOccuredAfterLast ->
?LOG("ignoring call event ~s for ~s(~s), same event has already been recieved", [EventName, EventAppName, EventAppData]),
duplicate;
false ->
?LOG("ignoring call event ~s for ~s(~s), timestamp is older than previous message", [EventName, EventAppName, EventAppData]),
duplicate
end;
dedup_events(_, _, State) ->
State.

0 comments on commit 78f3514

Please sign in to comment.