Skip to content

Commit

Permalink
WHISTLE-840: be more aggressive about determining the status of a par…
Browse files Browse the repository at this point in the history
…ked call when updating the BLF
  • Loading branch information
k-anderson committed Feb 20, 2012
1 parent 50770c2 commit 1510c5f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 37 deletions.
7 changes: 7 additions & 0 deletions whistle_apps/apps/callflow/src/callflow_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

%% API
-export([start_link/0]).
-export([listener_proc/0]).

%% Supervisor callbacks
-export([init/1]).
Expand All @@ -36,6 +37,12 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec listener_proc/0 :: () -> {'ok', pid()}.
listener_proc() ->
[P] = [P || {Mod, P, _, _} <- supervisor:which_children(?MODULE),
Mod =:= cf_listener],
{ok, P}.

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
Expand Down
74 changes: 63 additions & 11 deletions whistle_apps/apps/callflow/src/cf_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

%% API
-export([start_link/0, stop/1]).
-export([handle_call_status_resp/2]).
-export([get_call_status/1]).

%% gen_listener callbacks
-export([init/1, handle_call/3, handle_cast/2
Expand All @@ -26,6 +28,7 @@
-define(RESPONDERS, [{cf_route_req, [{<<"dialplan">>, <<"route_req">>}]}
,{cf_route_win, [{<<"dialplan">>, <<"route_win">>}]}
,{{cf_util, presence_probe}, [{<<"notification">>, <<"presence_probe">>}]}
,{{?MODULE, handle_call_status_resp}, [{<<"call_event">>, <<"channel_status_resp">>}]}
]).
-define(BINDINGS, [{route, []}
,{notifications, [{restrict_to, [presence_probe]}]}
Expand Down Expand Up @@ -57,6 +60,36 @@ start_link() ->
stop(Srv) ->
gen_listener:stop(Srv).

-spec get_call_status/1 :: (ne_binary()) -> {ok, wh_json:json_object()} | {error, timeout | wh_json:json_object()}.
get_call_status(CallId) ->
{ok, Srv} = callflow_sup:listener_proc(),
gen_server:cast(Srv, {add_consumer, CallId, self()}),
Command = [{<<"Call-ID">>, CallId}
| wh_api:default_headers(gen_listener:queue_name(Srv), ?APP_NAME, ?APP_VERSION)
],
wapi_call:publish_channel_status_req(CallId, Command),
Result = receive
{call_status_resp, JObj} ->
case wh_json:get_value(<<"Status">>, JObj) of
<<"active">> -> {ok, JObj};
_Else -> {error, JObj}
end
after
2000 -> {error, timeout}
end,
gen_server:cast(Srv, {remove_consumer, self()}),
Result.

-spec handle_call_status_resp/2 :: (wh_json:json_object(), proplist()) -> ok.
handle_call_status_resp(JObj, Props) ->
Consumers = props:get_value(consumers, Props),
StatusCallId = wh_json:get_value(<<"Call-ID">>, JObj),
[Consumer ! {call_status_resp, JObj}
|| {CallId, Consumer, _} <- Consumers
,CallId =:= StatusCallId
],
ok.

%%%===================================================================
%%% gen_listener callbacks
%%%===================================================================
Expand All @@ -73,8 +106,9 @@ stop(Srv) ->
%% @end
%%--------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
?LOG_SYS("starting new callflow listener"),
{ok, ok}.
{ok, []}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -90,8 +124,8 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(_Msg, _From, State) ->
{noreply, State}.
handle_call(_Msg, _From, Consumers) ->
{noreply, Consumers}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -103,8 +137,19 @@ handle_call(_Msg, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
handle_cast({add_consumer, CallId, Consumer}, Consumers) ->
MRef = erlang:monitor(process, Consumer),
?LOG("added call status response consumer (~p) for ~s", [Consumer, CallId]),
{noreply, [{CallId, Consumer, MRef}|Consumers]};
handle_cast({remove_consumer, Consumer}, Consumers) ->
{noreply, lists:filter(fun({_, C, MRef}) when C =:= Consumer ->
?LOG("removed call status response consumer (~p): status sent", [Consumer]),
erlang:demonitor(MRef, [flush]),
false;
(_) -> true
end, Consumers)};
handle_cast(_Msg, Consumers) ->
{noreply, Consumers}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -116,8 +161,15 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
handle_info({'DOWN', _, _, Consumer, _R}, Consumers) ->
{noreply, lists:filter(fun({_, C, MRef}) when C =:= Consumer ->
?LOG("removed call status response consumer (~p): ~p", [Consumer, _R]),
erlang:demonitor(MRef, flush),
false;
(_) -> true
end, Consumers)};
handle_info(_Info, Consumers) ->
{noreply, Consumers}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -127,8 +179,8 @@ handle_info(_Info, State) ->
%% @spec handle_event(JObj, State) -> {reply, Props}
%% @end
%%--------------------------------------------------------------------
handle_event(_JObj, _State) ->
{reply, []}.
handle_event(_JObj, Consumers) ->
{reply, [{consumers, Consumers}]}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -153,8 +205,8 @@ terminate(_Reason, _) ->
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
code_change(_OldVsn, Consumers, _Extra) ->
{ok, Consumers}.

%%%===================================================================
%%% Internal functions
Expand Down
69 changes: 43 additions & 26 deletions whistle_apps/apps/callflow/src/module/cf_park.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ update_presence(SlotNumber, PresenceId, AccountDb) ->
ParkedCalls = get_parked_calls(#cf_call{account_db=AccountDb, account_id=AccountId}),
{State, CallId} = case wh_json:get_value([<<"slots">>, SlotNumber, <<"Call-ID">>], ParkedCalls) of
undefined -> {<<"terminated">>, undefined};
Else -> {<<"early">>, Else}
CId ->
case cf_listener:get_call_status(CId) of
{ok, _} -> {<<"early">>, CId};
{error, _} ->
cleanup_slot(SlotNumber, CId, #cf_call{account_db=AccountDb}),
{<<"terminated">>, CId}
end
end,
cf_call_command:presence(State, PresenceId, CallId).

Expand All @@ -57,17 +63,18 @@ handle(Data, #cf_call{channel_vars=CCVs}=Call) ->
<<"retrieve">> ->
?LOG("action is to retrieve a parked call"),
case retrieve(SlotNumber, ParkedCalls, Call) of
false ->
{ok, _} -> ok;
_Else ->
cf_call_command:b_answer(Call),
cf_call_command:b_prompt(<<"park-no_caller">>, Call),
cf_exe:continue(Call);
_ -> ok
cf_exe:continue(Call)
end;
<<"auto">> ->
?LOG("action is to automatically determine if we should retrieve or park"),
case retrieve(SlotNumber, ParkedCalls, Call) of
false -> park_call(SlotNumber, ParkedCalls, undefined, Call);
_ -> ok
{hungup, JObj} -> park_call(SlotNumber, JObj, undefined, Call);
{error, _} -> park_call(SlotNumber, ParkedCalls, undefined, Call);
{ok, _} -> ok
end
end;
nomatch ->
Expand Down Expand Up @@ -105,24 +112,28 @@ get_switch_hostname(CallId, Call) ->
%% Determine the appropriate action to retrieve a parked call
%% @end
%%--------------------------------------------------------------------
-spec retrieve/3 :: (ne_binary(), wh_json:json_object(), #cf_call{}) -> boolean().
-spec retrieve/3 :: (ne_binary(), wh_json:json_object(), #cf_call{}) -> {ok, wh_json:json_object()} |
{hungup, wh_json:json_object()} |
{error, term()}.
retrieve(SlotNumber, ParkedCalls, #cf_call{to_user=ToUser, to_realm=ToRealm}=Call) ->
CallerHost = get_switch_hostname(Call),
case wh_json:get_value([<<"slots">>, SlotNumber], ParkedCalls) of
undefined ->
?LOG("They hungup? play back nobody here message", []),
false;
{error, slot_empty};
Slot ->
CallerHost = get_switch_hostname(Call),
ParkedCall = wh_json:get_ne_value(<<"Call-ID">>, Slot),
case get_switch_hostname(ParkedCall, Call) of
undefined ->
?LOG("the parked caller node is undefined"),
cleanup_slot(SlotNumber, ParkedCall, Call),
false;
case cleanup_slot(SlotNumber, ParkedCall, Call) of
{ok, JObj} -> {hungup, JObj};
{error, _} -> {hungup, ParkedCalls}
end;
CallerHost ->
ParkedCall = wh_json:get_ne_value(<<"Call-ID">>, Slot),
case cleanup_slot(SlotNumber, ParkedCall, Call) of
true ->
{ok, _}=Ok ->
publish_usurp_control(ParkedCall, Call),
?LOG("pickup call id ~s", [ParkedCall]),
Name = wh_json:get_value(<<"CID-Name">>, Slot, <<"Parking Slot ", SlotNumber/binary>>),
Expand All @@ -134,16 +145,18 @@ retrieve(SlotNumber, ParkedCalls, #cf_call{to_user=ToUser, to_realm=ToRealm}=Cal
],
cf_call_command:set(wh_json:from_list(Update), undefined, Call),
cf_call_command:b_pickup(ParkedCall, Call),
cf_exe:continue(Call);
cf_exe:continue(Call),
Ok;
%% if we cant clean up the slot then someone beat us to it
false -> false
{error, _}=E -> E
end;
OtherNode ->
IP = get_node_ip(OtherNode),
Contact = <<"sip:", ToUser/binary, "@", ToRealm/binary>>,
Server = <<"sip:", IP/binary, ":5060">>,
cf_call_command:redirect(Contact, Server, Call),
cf_exe:transfer(Call)
cf_exe:transfer(Call),
{ok, ParkedCalls}
end
end.

Expand Down Expand Up @@ -287,9 +300,9 @@ save_slot(SlotNumber, Slot, ParkedCalls, Call) ->

do_save_slot(SlotNumber, Slot, ParkedCalls, #cf_call{account_db=Db}=Call) ->
case couch_mgr:save_doc(Db, wh_json:set_value([<<"slots">>, SlotNumber], Slot, ParkedCalls)) of
{ok, Parked} ->
{ok, _}=Ok ->
?LOG("successfully stored call parking data in slot ~p", [SlotNumber]),
{ok, Parked};
Ok;
{error, conflict} ->
save_slot(SlotNumber, Slot, get_parked_calls(Call), Call)
end.
Expand Down Expand Up @@ -400,7 +413,7 @@ get_parked_calls(#cf_call{account_db=Db, account_id=Id}) ->
%%
%% @end
%%--------------------------------------------------------------------
-spec cleanup_slot/3 :: (ne_binary(), ne_binary(), #cf_call{}) -> boolean().
-spec cleanup_slot/3 :: (ne_binary(), ne_binary(), #cf_call{}) -> {ok, wh_json:json_object()} | {error, term()}.
cleanup_slot(SlotNumber, ParkedCall, #cf_call{account_db=Db}=Call) ->
case couch_mgr:open_doc(Db, ?DB_DOC_NAME) of
{ok, JObj} ->
Expand All @@ -412,19 +425,19 @@ cleanup_slot(SlotNumber, ParkedCall, #cf_call{account_db=Db}=Call) ->
?LOG("update presence-id '~s' with state: terminated", [PresenceId]),
cf_call_command:presence(<<"terminated">>, PresenceId, ParkedCallId),
case couch_mgr:save_doc(Db, wh_json:delete_key([<<"slots">>, SlotNumber], JObj)) of
{ok, _} -> true;
{ok, _}=Ok -> Ok;
{error, conflict} -> cleanup_slot(SlotNumber, ParkedCall, Call);
{error, _R} ->
{error, _R}=E ->
?LOG("failed to clean up our slot: ~p", [_R]),
false
E
end;
_Else ->
?LOG("call parked in slot ~s is ~s and we expected ~s, skipping clean up", [SlotNumber, _Else, ParkedCall]),
false
{error, unexpected_callid}
end;
{error, _R} ->
{error, _R}=E ->
?LOG("failed to open the parked calls doc: ~p", [_R]),
false
E
end.

%%--------------------------------------------------------------------
Expand All @@ -442,9 +455,13 @@ wait_for_pickup(SlotNumber, RingbackId, Call) ->
case cf_call_command:b_hold(?DEFAULT_RINGBACK_TM, Call) of
{error, timeout} ->
TmpCID = <<"Parking slot ", SlotNumber/binary>>,
case ringback_parker(RingbackId, SlotNumber, TmpCID, Call) of
Hungup = get_switch_hostname(Call) =/= undefined,
case Hungup andalso ringback_parker(RingbackId, SlotNumber, TmpCID, Call) of
answered -> cf_exe:continue(Call);
failed -> wait_for_pickup(SlotNumber, RingbackId, Call)
failed -> wait_for_pickup(SlotNumber, RingbackId, Call);
false ->
cleanup_slot(SlotNumber, cf_exe:callid(Call), Call),
cf_exe:stop(Call)
end;
_ ->
?LOG("parked caller has been picked up or hungup"),
Expand Down

0 comments on commit 1510c5f

Please sign in to comment.