Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

WHISTLE-824: refactor post-hangup events to ensure that any commands …

…that can not be run create an error message (thereby advancing waiting whapps)
  • Loading branch information...
commit 80cb006a5e16bda721e41eb4b9df44c507849512 1 parent 60a1b14
@k-anderson k-anderson authored
Showing with 85 additions and 43 deletions.
  1. +85 −43 ecallmgr/src/ecallmgr_call_control.erl
View
128 ecallmgr/src/ecallmgr_call_control.erl
@@ -77,7 +77,9 @@
,evtpid = 'undefined' :: 'undefined' | pid()
,command_q = queue:new() :: queue()
,current_app = 'undefined' :: ne_binary() | 'undefined'
+ ,current_cmd = 'undefined' :: wh_json:json_object() | 'undefined'
,start_time = erlang:now() :: wh_now()
+ ,is_call_up = 'true' :: boolean()
,is_node_up = 'true' :: boolean()
,keep_alive_ref = 'undefined' :: 'undefined' | reference()
,other_legs = [] :: [] | [ne_binary(),...]
@@ -191,7 +193,7 @@ handle_call_events(JObj, Props) ->
put(callid, CallId),
case wh_json:get_value(<<"Event-Name">>, JObj) of
<<"CHANNEL_EXECUTE_COMPLETE">> ->
- Application = wh_json:get_value(<<"Raw-Application-Name">>, JObj),
+ Application = wh_json:get_value(<<"Raw-Application-Name">>, JObj, wh_json:get_value(<<"Application-Name">>, JObj)),
?LOG("control queue ~p channel execute completion for '~s'", [Srv, Application]),
gen_server:cast(Srv, {event_execute_complete, CallId, Application});
<<"CHANNEL_DESTROY">> ->
@@ -339,23 +341,30 @@ handle_cast({rm_leg, JObj}, #state{other_legs=Legs, callid=CallId}=State) ->
end),
{noreply, State#state{other_legs=lists:delete(LegId, Legs), last_removed_leg=LegId}}
end;
-handle_cast({channel_destroyed, _}, #state{command_q=CmdQ, sanity_check_tref=SCTRef}=State) ->
+handle_cast({channel_destroyed, _}, #state{is_call_up=true, sanity_check_tref=SCTRef, current_app=CurrentApp
+ ,current_cmd=CurrentCmd, callid=CallId, node=Node}=State) ->
?LOG("our channel has been destroyed, executing any post-hangup commands"),
- lists:foreach(fun(Cmd) -> execute_control_request(Cmd, State) end, post_hangup_commands(CmdQ)),
%% if our sanity check timer is running stop it, it will always return false
%% now that the channel is gone
- catch (erlang:cancel_timer(SCTRef)),
- %% if the keep-alive timer is not already running start it
- %% different then get_keep_alive_ref (which resets it)
- KATRef = case State#state.keep_alive_ref of
- undefined ->
- erlang:send_after(?KEEP_ALIVE, self(), keep_alive_expired);
- Else ->
- Else
- end,
- {noreply, State#state{keep_alive_ref=KATRef}, hibernate};
-handle_cast({dialplan, JObj}
- ,#state{keep_alive_ref=Ref, command_q=CmdQ, current_app=CurrApp, is_node_up=INU}=State) ->
+ catch (erlang:cancel_timer(SCTRef)),
+ %% since this is not attached to a call the node status doesnt matter anymore
+ erlang:monitor_node(Node, false),
+ %% if the current application can not be run without a channel and we have received the
+ %% channel_destory (the last event we will ever receive from freeswitch for this call)
+ %% then create an error and force advance. This will happen with dialplan actions that
+ %% have not been executed on freeswitch but were already queued (for example in xferext).
+ %% Commonly events like masquerade, noop, ect
+ case CurrentApp =:= undefined orelse is_post_hangup_command(CurrentApp) of
+ true -> ok;
+ false ->
+ send_error_resp(CallId, CurrentCmd),
+ self() ! {force_queue_advance, CallId}
+ end,
+ {noreply, State#state{keep_alive_ref=get_keep_alive_ref(State#state{is_call_up=false}), is_call_up=false, is_node_up=true}, hibernate};
+handle_cast({channel_destroyed, _}, #state{is_call_up=false}=State) ->
+ {noreply, State};
+handle_cast({dialplan, JObj}, #state{callid=CallId, is_node_up=INU, is_call_up=CallUp
+ ,command_q=CmdQ, current_app=CurrApp}=State) ->
NewCmdQ = try
insert_command(State, wh_util:to_atom(wh_json:get_value(<<"Insert-At">>, JObj, 'tail')), JObj)
catch _T:_R ->
@@ -365,19 +374,24 @@ handle_cast({dialplan, JObj}
case INU andalso (not queue:is_empty(NewCmdQ)) andalso CurrApp =:= undefined of
true ->
{{value, Cmd}, NewCmdQ1} = queue:out(NewCmdQ),
- execute_control_request(Cmd, State),
AppName = wh_json:get_value(<<"Application-Name">>, Cmd),
- ?LOG("new app name: ~s", [AppName]),
- {noreply, State#state{command_q = NewCmdQ1, current_app = AppName, keep_alive_ref=get_keep_alive_ref(Ref)}, hibernate};
+ case CallUp orelse is_post_hangup_command(AppName) of
+ true -> execute_control_request(Cmd, State);
+ false ->
+ ?LOG("command '~s' is not valid after hangup, ignoring", [AppName]),
+ send_error_resp(CallId, Cmd),
+ self() ! {force_queue_advance, CallId}
+ end,
+ {noreply, State#state{command_q=NewCmdQ1, current_app=AppName, current_cmd=Cmd
+ ,keep_alive_ref=get_keep_alive_ref(State)}, hibernate};
false ->
- ?LOG("curr app remains: ~s", [CurrApp]),
- {noreply, State#state{command_q = NewCmdQ, keep_alive_ref=get_keep_alive_ref(Ref)}, hibernate}
+ is_binary(CurrApp) andalso ?LOG("curr app remains: ~s", [CurrApp]),
+ {noreply, State#state{command_q=NewCmdQ, keep_alive_ref=get_keep_alive_ref(State)}, hibernate}
end;
-handle_cast({event_execute_complete, CallId, EvtName}
- ,#state{callid=CallId, command_q=CmdQ, current_app=CurrApp, is_node_up=INU}=State) ->
+handle_cast({event_execute_complete, CallId, EvtName},#state{callid=CallId, is_node_up=INU, is_call_up=CallUp
+ ,command_q=CmdQ, current_app=CurrApp}=State) ->
case lists:member(EvtName, ecallmgr_util:convert_whistle_app_name(CurrApp)) of
false ->
- ?LOG("exec_complete for ~s not related to ~s", [EvtName, CurrApp]),
{noreply, State};
true ->
?LOG("completed execution of command '~s'", [CurrApp]),
@@ -390,10 +404,15 @@ handle_cast({event_execute_complete, CallId, EvtName}
?LOG("no call commands remain queued, hibernating"),
{noreply, State#state{current_app=undefined}, hibernate};
{{value, Cmd}, CmdQ1} ->
- execute_control_request(Cmd, State),
AppName = wh_json:get_value(<<"Application-Name">>, Cmd),
- ?LOG("new app name: ~s", [AppName]),
- {noreply, State#state{command_q = CmdQ1, current_app = AppName}, hibernate}
+ case CallUp orelse is_post_hangup_command(AppName) of
+ true -> execute_control_request(Cmd, State);
+ false ->
+ ?LOG("command '~s' is not valid after hangup, skipping", [AppName]),
+ send_error_resp(CallId, Cmd),
+ self() ! {force_queue_advance, CallId}
+ end,
+ {noreply, State#state{command_q = CmdQ1, current_app = AppName, current_cmd = Cmd}, hibernate}
end
end;
handle_cast(_Msg, State) ->
@@ -431,7 +450,8 @@ handle_info({is_node_up, Timeout}, #state{node=Node, is_node_up=false}=State) ->
end,
{noreply, State}
end;
-handle_info({force_queue_advance, CallId}, #state{callid=CallId, command_q=CmdQ, is_node_up=INU, current_app=CurrApp}=State) ->
+handle_info({force_queue_advance, CallId}, #state{callid=CallId, current_app=CurrApp, command_q=CmdQ
+ ,is_node_up=INU, is_call_up=CallUp}=State) ->
?LOG("received control queue unconditional advance, skipping wait for command completion of '~s'", [CurrApp]),
case INU andalso queue:out(CmdQ) of
false ->
@@ -442,9 +462,17 @@ handle_info({force_queue_advance, CallId}, #state{callid=CallId, command_q=CmdQ,
?LOG("no call commands remain queued, hibernating"),
{noreply, State#state{current_app = undefined}, hibernate};
{{value, Cmd}, CmdQ1} ->
- execute_control_request(Cmd, State),
AppName = wh_json:get_value(<<"Application-Name">>, Cmd),
- {noreply, State#state{command_q = CmdQ1, current_app = AppName}, hibernate}
+ case CallUp orelse is_post_hangup_command(AppName) of
+ true ->
+ execute_control_request(Cmd, State);
+ false ->
+ ?LOG("command '~s' is not valid after hangup, skipping", [AppName]),
+ send_error_resp(CallId, Cmd),
+ self() ! {force_queue_advance, CallId}
+ end,
+ {noreply, State#state{command_q=CmdQ1, current_app=AppName, current_cmd=Cmd
+ ,keep_alive_ref=get_keep_alive_ref(State)}, hibernate}
end;
handle_info(keep_alive_expired, State) ->
?LOG("no new commands received after channel destruction, our job here is done"),
@@ -457,7 +485,8 @@ handle_info({sanity_check}, #state{node=Node, callid=CallId, keep_alive_ref=unde
{'noreply', State#state{sanity_check_tref=TRef}};
_ ->
?LOG("call uuid does not exist, executing post-hangup events and terminating"),
- {stop, normal, State#state{sanity_check_tref=undefined}}
+ gen_server:cast(self(), {channel_destroyed, wh_json:new()}),
+ {'noreply', State}
end;
handle_info(_Msg, State) ->
{noreply, State}.
@@ -574,18 +603,12 @@ queue_insert_fun(tail) ->
queue_insert_fun(head) ->
fun queue:in_r/2.
--spec post_hangup_commands/1 :: (queue()) -> wh_json:json_objects().
-post_hangup_commands(CmdQ) ->
- [ JObj || JObj <- queue:to_list(CmdQ),
- is_post_hangup_command(wh_json:get_value(<<"Application-Name">>, JObj))
- ].
-
-spec is_post_hangup_command/1 :: (ne_binary()) -> boolean().
is_post_hangup_command(AppName) ->
lists:member(AppName, ?POST_HANGUP_COMMANDS).
-spec execute_control_request/2 :: (wh_json:json_object(), #state{}) -> 'ok'.
-execute_control_request(Cmd, #state{node=Node, callid=CallId}) ->
+execute_control_request(Cmd, #state{node=Node, callid=CallId, self=Srv}) ->
put(callid, CallId),
try
@@ -603,7 +626,15 @@ execute_control_request(Cmd, #state{node=Node, callid=CallId}) ->
,CallId/binary
," not found for "
,(wh_json:get_value(<<"Application-Name">>, Cmd))/binary>>),
- self() ! {hangup, undefined, CallId},
+ Srv ! {force_queue_advance, CallId},
+ ok;
+ error:{badmatch, {error, nosession}} ->
+ ?LOG("unable to execute command, no session"),
+ send_error_resp(CallId, Cmd, <<"Session "
+ ,CallId/binary
+ ," not found for "
+ ,(wh_json:get_value(<<"Application-Name">>, Cmd))/binary>>),
+ Srv ! {force_queue_advance, CallId},
ok;
error:{badmatch, {error, ErrMsg}} ->
ST = erlang:get_stacktrace(),
@@ -611,7 +642,12 @@ execute_control_request(Cmd, #state{node=Node, callid=CallId}) ->
?LOG("stacktrace:"),
_ = [?LOG("~p", [Line]) || Line <- ST],
send_error_resp(CallId, Cmd),
- self() ! {force_queue_advance, CallId},
+ Srv ! {force_queue_advance, CallId},
+ ok;
+ throw:{msg, ErrMsg} ->
+ ?LOG("error while executing command ~s: ~p", [wh_json:get_value(<<"Application-Name">>, Cmd), ErrMsg]),
+ send_error_resp(CallId, Cmd),
+ Srv ! {force_queue_advance, CallId},
ok;
_A:_B ->
ST = erlang:get_stacktrace(),
@@ -619,7 +655,7 @@ execute_control_request(Cmd, #state{node=Node, callid=CallId}) ->
?LOG("stacktrace:"),
_ = [?LOG("~p", [Line]) || Line <- ST],
send_error_resp(CallId, Cmd),
- self() ! {force_queue_advance, CallId},
+ Srv ! {force_queue_advance, CallId},
ok
end.
@@ -631,21 +667,27 @@ send_error_resp(CallId, Cmd) ->
send_error_resp(CallId, Cmd, Msg) ->
Resp = [{<<"Msg-ID">>, wh_json:get_value(<<"Msg-ID">>, Cmd, <<>>)}
,{<<"Error-Message">>, Msg}
+ ,{<<"Request">>, Cmd}
| wh_api:default_headers(<<>>, <<"error">>, <<"dialplan">>, ?APP_NAME, ?APP_VERSION)
],
{ok, Payload} = wapi_dialplan:error(Resp),
?LOG("sending execution error: ~s", [Payload]),
wapi_dialplan:publish_event(CallId, Payload).
--spec get_keep_alive_ref/1 :: ('undefined' | reference()) -> 'undefined' | reference().
-get_keep_alive_ref(undefined) -> undefined;
-get_keep_alive_ref(TRef) ->
+-spec get_keep_alive_ref/1 :: (#state{}) -> 'undefined' | reference().
+get_keep_alive_ref(#state{is_call_up=true}) ->
+ undefined;
+get_keep_alive_ref(#state{keep_alive_ref=undefined, is_call_up=false}) ->
+ ?LOG("started post hangup keep alive timer for ~bms", [?KEEP_ALIVE]),
+ erlang:send_after(?KEEP_ALIVE, self(), keep_alive_expired);
+get_keep_alive_ref(#state{keep_alive_ref=TRef, is_call_up=false}) ->
_ = case erlang:cancel_timer(TRef) of
false -> ok;
_ -> %% flush the receive buffer of expiration messages
receive keep_alive_expired -> ok
after 0 -> ok end
end,
+ ?LOG("reset post hangup keep alive timer"),
erlang:send_after(?KEEP_ALIVE, self(), keep_alive_expired).
-spec publish_leg_addition/1 :: (wh_json:json_object()) -> 'ok'.
Please sign in to comment.
Something went wrong with that request. Please try again.