Skip to content

Commit

Permalink
usurp monitor & call control log
Browse files Browse the repository at this point in the history
  • Loading branch information
lazedo committed Jul 28, 2018
1 parent 98d33a8 commit e8e0f24
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
27 changes: 20 additions & 7 deletions applications/ecallmgr/src/ecallmgr_call_control.erl
Expand Up @@ -97,6 +97,7 @@
,current_cmd_uuid :: kz_term:api_binary()
,channel :: kz_term:api_pid()
,options :: kz_term:proplist()
,event_uuids = [] :: kz_term:ne_binaries()
}).
-type state() :: #state{}.

Expand Down Expand Up @@ -318,8 +319,13 @@ handle_info({'usurp_control', FetchId, _JObj}, #state{fetch_id = FetchId} = Stat
handle_info({'usurp_control', _FetchId, _JObj}, State) ->
lager:debug("the call has been usurped by an external process"),
{'stop', 'normal', State};
handle_info({'force_queue_advance', CallId}, #state{call_id=CallId}=State) ->
handle_info({'force_queue_advance', CallId}, #state{call_id=CallId, current_cmd_uuid='undefined'}=State) ->
{'noreply', force_queue_advance(State)};
handle_info({'force_queue_advance', CallId}, #state{call_id=CallId
,current_cmd_uuid=EventUUID
,event_uuids=EventUUIDs
}=State) ->
{'noreply', force_queue_advance(State#state{event_uuids=[EventUUID | EventUUIDs]})};
handle_info({'force_queue_advance', _}, State) ->
{'noreply', State};
handle_info({'forward_queue', CallId}, #state{call_id=CallId}=State) ->
Expand Down Expand Up @@ -437,7 +443,7 @@ call_control_ready('true', #state{call_id=CallId
],
lager:debug("sending control usurp for ~s", [FetchId]),
kapi_call:publish_usurp_control(CallId, Usurp),
ecallmgr_usurp_monitor:register(CallId);
ecallmgr_usurp_monitor:register(CallId, FetchId);
call_control_ready('false', _) ->
lager:info("call is not in the channels cache, short lived call?"),
gen_listener:cast(self(), 'stop').
Expand Down Expand Up @@ -481,12 +487,13 @@ handle_channel_destroyed(#state{sanity_check_tref=SCTRef
-spec force_queue_advance(state()) -> state().
force_queue_advance(#state{call_id=CallId
,current_app=CurrApp
,current_cmd_uuid=CurrUUID
,command_q=CmdQ
,is_node_up=INU
,is_call_up=CallUp
}=State) ->
lager:debug("received control queue unconditional advance, skipping wait for command completion of '~s'"
,[CurrApp]
lager:debug("received control queue unconditional advance, skipping wait for command completion of '~s : ~s'"
,[CurrApp, CurrUUID]
),
case INU
andalso queue:out(CmdQ)
Expand Down Expand Up @@ -561,9 +568,15 @@ handle_execute_complete('undefined', _, _JObj, State) ->
handle_execute_complete(_, 'undefined', _JObj, State) ->
lager:debug_unsafe("call control received undefined : ~s", [kz_json:encode(_JObj, ['pretty'])]),
State;
handle_execute_complete(_AppName, _EventUUID, _JObj, #state{current_cmd_uuid='undefined'}=State) ->
lager:debug_unsafe("execute complete not handled : ~s:~s : ~s", [_AppName, _EventUUID, kz_json:encode(_JObj, ['pretty'])]),
State;
handle_execute_complete(_AppName, EventUUID, _JObj, #state{current_cmd_uuid='undefined'
,event_uuids=EventUUIDs
}=State) ->
case lists:member(EventUUID, EventUUIDs) of
'true' -> State#state{event_uuids=lists:delete(EventUUID, EventUUIDs)};
'false' ->
lager:debug("execute complete not handled : ~s:~s", [_AppName, EventUUID]),
State
end;
handle_execute_complete(AppName, EventUUID, _, #state{current_app=AppName
,current_cmd_uuid=EventUUID
}=State) ->
Expand Down
33 changes: 19 additions & 14 deletions applications/ecallmgr/src/ecallmgr_usurp_monitor.erl
Expand Up @@ -13,7 +13,7 @@
%% API
-export([start_link/0]).

-export([register/1, register/2]).
-export([register/2, register/3]).

%% gen_listener callbacks
-export([init/1
Expand All @@ -32,6 +32,7 @@
-type state() :: map().

-record(cache, {call_id :: kz_term:ne_binary()
,fetch_id :: kz_tern:ne_binary()
,pid :: pid()
}).
-type cache() :: #cache{}.
Expand Down Expand Up @@ -94,8 +95,9 @@ handle_call(_Request, _From, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast({register, CallId, Pid}, State) ->
{'noreply', handle_register(#cache{call_id=CallId, pid=Pid}, State)};
handle_cast({register, CallId, FetchId, Pid}, State) ->
kz_util:put_callid(CallId),
{'noreply', handle_register(#cache{call_id=CallId, fetch_id=FetchId, pid=Pid}, State)};
handle_cast(_, State) ->
{'noreply', State}.

Expand All @@ -119,12 +121,14 @@ handle_info(_Msg, State) ->
-spec handle_event(kz_json:object(), state()) -> gen_listener:handle_event_return().
handle_event(JObj, #{calls := Calls}) ->
kz_util:put_callid(JObj),
_ = case ets:lookup(Calls, kz_call_event:call_id(JObj)) of
[#cache{pid=Pid}] -> Pid ! {'usurp_control', kz_call_event:fetch_id(JObj), JObj};
_ -> 'ok'
end,
_ = handle_usurp(kz_call_event:call_id(JObj), kz_call_event:fetch_id(JObj), JObj, Calls),
'ignore'.

-spec handle_usurp(kz_term:ne_binary(), kz_term:ne_binary(), kz_json:object(), ets:tid()) -> 'ok'.
handle_usurp(CallId, FetchId, JObj, Calls) ->
_ = [Pid ! {'usurp_control', FetchId, JObj} || #cache{pid=Pid} <- ets:lookup(Calls, CallId)],
'ok'.

%%------------------------------------------------------------------------------
%% @doc This function is called by a gen_listener when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
Expand All @@ -145,16 +149,17 @@ terminate(_Reason, _State) -> 'ok'.
code_change(_OldVsn, State, _Extra) ->
{'ok', State}.

-spec register(kz_term:ne_binary()) -> 'ok'.
register(CallId) ->
register(CallId, self()).
-spec register(kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'.
register(CallId, FetchId) ->
register(CallId, FetchId, self()).

-spec register(kz_term:ne_binary(), pid()) -> 'ok'.
register(CallId, Pid) ->
gen_listener:cast(?SERVER, {register, CallId, Pid}).
-spec register(kz_term:ne_binary(), kz_term:ne_binary(), pid()) -> 'ok'.
register(CallId, FetchId, Pid) ->
gen_listener:cast(?SERVER, {register, CallId, FetchId, Pid}).

-spec handle_register(cache(), state()) -> state().
handle_register(#cache{pid=Pid}=Cache, #{calls := Calls, pids := Pids} = State) ->
handle_register(#cache{call_id=CallId, fetch_id=FetchId, pid=Pid}=Cache, #{calls := Calls, pids := Pids} = State) ->
_ = handle_usurp(CallId, FetchId, kz_json:new(), Calls),
_ = ets:insert(Calls, Cache),
_ = ets:insert(Pids, Cache),
_ = erlang:monitor(process, Pid),
Expand Down

0 comments on commit e8e0f24

Please sign in to comment.