Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
WHISTLE-1217: use the amqp worker pool for channel/call status requests
  • Loading branch information
k-anderson committed Jul 17, 2012
1 parent b0504b8 commit c91f52c
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 214 deletions.
6 changes: 1 addition & 5 deletions whistle_apps/apps/callflow/src/cf_exe.erl
Expand Up @@ -387,11 +387,7 @@ handle_info(_, State) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_event(JObj, #state{cf_module_pid=Pid, call=Call}) -> handle_event(JObj, #state{cf_module_pid=Pid, call=Call}) ->
CallId = whapps_call:call_id_direct(Call), CallId = whapps_call:call_id_direct(Call),
case {whapps_util:get_event_type(JObj), wh_json:get_value(<<"Call-ID">>, JObj)}of case {whapps_util:get_event_type(JObj), wh_json:get_value(<<"Call-ID">>, JObj)} of
{{<<"call_event">>, <<"channel_status_resp">>}, _} ->
{reply, [{cf_module_pid, Pid}]};
{{<<"call_event">>, <<"call_status_resp">>}, _} ->
{reply, [{cf_module_pid, Pid}]};
{{<<"call_event">>, <<"call_id_update">>},_} -> {{<<"call_event">>, <<"call_id_update">>},_} ->
NewCallId = wh_json:get_value(<<"Call-ID">>, JObj), NewCallId = wh_json:get_value(<<"Call-ID">>, JObj),
NewCtrlQ = wh_json:get_value(<<"Control-Queue">>, JObj), NewCtrlQ = wh_json:get_value(<<"Control-Queue">>, JObj),
Expand Down
60 changes: 16 additions & 44 deletions whistle_apps/apps/callflow/src/cf_listener.erl
Expand Up @@ -10,16 +10,16 @@


-behaviour(gen_listener). -behaviour(gen_listener).


%% API
-export([start_link/0]). -export([start_link/0]).
-export([stop/0]). -export([stop/0]).
-export([pause/0]). -export([pause/0]).
-export([resume/0]). -export([resume/0]).
-export([handle_call_status_resp/2]). -export([init/1

,handle_call/3
%% gen_listener callbacks ,handle_cast/2
-export([init/1, handle_call/3, handle_cast/2 ,handle_info/2
,handle_info/2, handle_event/2, terminate/2 ,handle_event/2
,terminate/2
,code_change/3 ,code_change/3
]). ]).


Expand All @@ -31,7 +31,6 @@
,{cf_route_win, [{<<"dialplan">>, <<"route_win">>}]} ,{cf_route_win, [{<<"dialplan">>, <<"route_win">>}]}
,{{cf_util, presence_probe}, [{<<"notification">>, <<"presence_probe">>}]} ,{{cf_util, presence_probe}, [{<<"notification">>, <<"presence_probe">>}]}
,{{cf_util, presence_mwi_query}, [{<<"notification">>, <<"mwi_query">>}]} ,{{cf_util, presence_mwi_query}, [{<<"notification">>, <<"mwi_query">>}]}
,{{?MODULE, handle_call_status_resp}, [{<<"call_event">>, <<"channel_status_resp">>}]}
]). ]).
-define(BINDINGS, [{route, []} -define(BINDINGS, [{route, []}
,{self, []} ,{self, []}
Expand Down Expand Up @@ -74,15 +73,6 @@ stop() ->
{ok, Srv} = callflow_sup:listener_proc(), {ok, Srv} = callflow_sup:listener_proc(),
gen_listener:stop(Srv). gen_listener:stop(Srv).


-spec handle_call_status_resp/2 :: (wh_json:json_object(), proplist()) -> any().
handle_call_status_resp(JObj, Props) ->
Consumers = props:get_value(consumers, Props),
StatusCallId = wh_json:get_value(<<"Call-ID">>, JObj),
[Consumer ! {call_status_resp, JObj}
|| {CallId, Consumer, _} <- Consumers
,CallId =:= StatusCallId
].

%%%=================================================================== %%%===================================================================
%%% gen_listener callbacks %%% gen_listener callbacks
%%%=================================================================== %%%===================================================================
Expand Down Expand Up @@ -127,8 +117,8 @@ init([]) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_call(_Msg, _From, Consumers) -> handle_call(_Msg, _From, State) ->
{noreply, Consumers}. {noreply, State}.


%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @private %% @private
Expand All @@ -140,19 +130,8 @@ handle_call(_Msg, _From, Consumers) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_cast({add_consumer, CallId, Consumer}, Consumers) -> handle_cast(_Msg, State) ->
MRef = erlang:monitor(process, Consumer), {noreply, State}.
lager:debug("added call status response consumer (~p) for ~s", [Consumer, CallId]),
{noreply, [{CallId, Consumer, MRef}|Consumers]};
handle_cast({remove_consumer, Consumer}, Consumers) ->
{noreply, lists:filter(fun({_, C, MRef}) when C =:= Consumer ->
lager:debug("removed call status response consumer (~p): response sent", [Consumer]),
erlang:demonitor(MRef, [flush]),
false;
(_) -> true
end, Consumers)};
handle_cast(_Msg, Consumers) ->
{noreply, Consumers}.


%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @private %% @private
Expand All @@ -164,16 +143,9 @@ handle_cast(_Msg, Consumers) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_info({'DOWN', _, _, Consumer, _R}, Consumers) -> handle_info(_Info, State) ->
{noreply, lists:filter(fun({_, C, MRef}) when C =:= Consumer ->
lager:debug("removed call status response consumer (~p): ~p", [Consumer, _R]),
erlang:demonitor(MRef, [flush]),
false;
(_) -> true
end, Consumers)};
handle_info(_Info, Consumers) ->
lager:debug("unhandled message: ~p", [_Info]), lager:debug("unhandled message: ~p", [_Info]),
{noreply, Consumers}. {noreply, State}.


%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @private %% @private
Expand All @@ -183,8 +155,8 @@ handle_info(_Info, Consumers) ->
%% @spec handle_event(JObj, State) -> {reply, Props} %% @spec handle_event(JObj, State) -> {reply, Props}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_event(_JObj, Consumers) -> handle_event(_JObj, _State) ->
{reply, [{consumers, Consumers}]}. {reply, []}.


%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @private %% @private
Expand All @@ -209,8 +181,8 @@ terminate(_Reason, _) ->
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
code_change(_OldVsn, Consumers, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, Consumers}. {ok, State}.


%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
Expand Down
27 changes: 0 additions & 27 deletions whistle_apps/apps/callflow/src/cf_util.erl
Expand Up @@ -14,7 +14,6 @@
-export([presence_probe/2]). -export([presence_probe/2]).
-export([presence_mwi_query/2]). -export([presence_mwi_query/2]).
-export([update_mwi/1, update_mwi/2, update_mwi/4]). -export([update_mwi/1, update_mwi/2, update_mwi/4]).
-export([get_call_status/1]).
-export([alpha_to_dialpad/1, ignore_early_media/1]). -export([alpha_to_dialpad/1, ignore_early_media/1]).
-export([correct_media_path/2]). -export([correct_media_path/2]).
-export([lookup_callflow/1, lookup_callflow/2]). -export([lookup_callflow/1, lookup_callflow/2]).
Expand Down Expand Up @@ -243,32 +242,6 @@ update_mwi(New, Saved, OwnerId, AccountDb) ->
ok ok
end. end.


%%--------------------------------------------------------------------
%% @public
%% @doc
%%
%% @end
%%--------------------------------------------------------------------
-spec get_call_status/1 :: (ne_binary()) -> {ok, wh_json:json_object()} | {error, timeout | wh_json:json_object()}.
get_call_status(CallId) ->
{ok, Srv} = callflow_sup:listener_proc(),
gen_server:cast(Srv, {add_consumer, CallId, self()}),
Command = [{<<"Call-ID">>, CallId}
| wh_api:default_headers(gen_listener:queue_name(Srv), ?APP_NAME, ?APP_VERSION)
],
wapi_call:publish_channel_status_req(CallId, Command),
Result = receive
{call_status_resp, JObj} ->
case wh_json:get_value(<<"Status">>, JObj) of
<<"active">> -> {ok, JObj};
_Else -> {error, JObj}
end
after
2000 -> {error, timeout}
end,
gen_server:cast(Srv, {remove_consumer, self()}),
Result.

%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @public %% @public
%% @doc %% @doc
Expand Down
29 changes: 13 additions & 16 deletions whistle_apps/apps/callflow/src/module/cf_park.erl
Expand Up @@ -32,7 +32,7 @@ update_presence(SlotNumber, PresenceId, AccountDb) ->
State = case wh_json:get_value([<<"slots">>, SlotNumber, <<"Call-ID">>], ParkedCalls) of State = case wh_json:get_value([<<"slots">>, SlotNumber, <<"Call-ID">>], ParkedCalls) of
undefined -> <<"terminated">>; undefined -> <<"terminated">>;
ParkedCallId -> ParkedCallId ->
case cf_util:get_call_status(ParkedCallId) of case whapps_call_command:channel_status(ParkedCallId) of
{ok, _} -> <<"early">>; {ok, _} -> <<"early">>;
{error, _} -> <<"terminated">> {error, _} -> <<"terminated">>
end end
Expand Down Expand Up @@ -94,18 +94,12 @@ handle(Data, Call) ->
%% Determine the hostname of the switch %% Determine the hostname of the switch
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec get_switch_hostname/1 :: (whapps_call:call()) -> 'undefined' | ne_binary(). -spec get_switch_nodename/1 :: ('undefined' | ne_binary() | whapps_call:call()) -> 'undefined' | ne_binary().
-spec get_switch_hostname/2 :: ('undefined' | ne_binary(), whapps_call:call()) -> 'undefined' | ne_binary(). get_switch_nodename(CallId) ->

case whapps_call_command:channel_status(CallId) of
get_switch_hostname(Call) -> {error, _} -> undefined;
get_switch_hostname(undefined, Call). {ok, JObj} ->

wh_json:get_ne_value(<<"Switch-Nodename">>, JObj)
get_switch_hostname(CallId, Call) ->
case whapps_call_command:b_channel_status(CallId, Call) of
{ok, CallerStatus} ->
wh_json:get_ne_value(<<"Switch-Nodename">>, CallerStatus);
_Else ->
undefined
end. end.


%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
Expand All @@ -126,7 +120,7 @@ retrieve(SlotNumber, ParkedCalls, Call) ->
CallerNode = whapps_call:switch_nodename(Call), CallerNode = whapps_call:switch_nodename(Call),
ParkedCall = wh_json:get_ne_value(<<"Call-ID">>, Slot), ParkedCall = wh_json:get_ne_value(<<"Call-ID">>, Slot),
lager:debug("the parking slot ~s currently has a parked call ~s, attempting to retrieve caller", [SlotNumber, ParkedCall]), lager:debug("the parking slot ~s currently has a parked call ~s, attempting to retrieve caller", [SlotNumber, ParkedCall]),
case get_switch_hostname(ParkedCall, Call) of case get_switch_nodename(ParkedCall) of
undefined -> undefined ->
lager:debug("the parked call has hungup, but is was still listed in the slot", []), lager:debug("the parked call has hungup, but is was still listed in the slot", []),
case cleanup_slot(SlotNumber, ParkedCall, whapps_call:account_db(Call)) of case cleanup_slot(SlotNumber, ParkedCall, whapps_call:account_db(Call)) of
Expand Down Expand Up @@ -304,7 +298,7 @@ save_slot(SlotNumber, Slot, ParkedCalls, Call) ->
lager:debug("slot has parked call '~s' by parker '~s', it is available", [ParkedCallId, ParkerCallId]), lager:debug("slot has parked call '~s' by parker '~s', it is available", [ParkedCallId, ParkerCallId]),
do_save_slot(SlotNumber, Slot, ParkedCalls, Call); do_save_slot(SlotNumber, Slot, ParkedCalls, Call);
false -> false ->
case whapps_call_command:b_channel_status(ParkedCallId, Call) of case whapps_call_command:channel_status(ParkedCallId) of
{ok, _} -> {ok, _} ->
lager:debug("slot has active call '~s' in it, denying use of slot", [ParkedCallId]), lager:debug("slot has active call '~s' in it, denying use of slot", [ParkedCallId]),
{error, occupied}; {error, occupied};
Expand Down Expand Up @@ -497,7 +491,10 @@ wait_for_pickup(SlotNumber, RingbackId, Call) ->
case whapps_call_command:b_hold(?DEFAULT_RINGBACK_TM, Call) of case whapps_call_command:b_hold(?DEFAULT_RINGBACK_TM, Call) of
{error, timeout} -> {error, timeout} ->
TmpCID = <<"Parking slot ", SlotNumber/binary>>, TmpCID = <<"Parking slot ", SlotNumber/binary>>,
Hungup = get_switch_hostname(Call) =/= undefined, Hungup = case whapps_call_command:channel_status(Call) of
{ok, _} -> false;
{error, _} -> true
end,
case Hungup andalso ringback_parker(RingbackId, SlotNumber, TmpCID, Call) of case Hungup andalso ringback_parker(RingbackId, SlotNumber, TmpCID, Call) of
answered -> answered ->
lager:debug("parked caller ringback was answered"), lager:debug("parked caller ringback was answered"),
Expand Down

0 comments on commit c91f52c

Please sign in to comment.