Permalink
Browse files

WHISTLE-1441: stop super, listener, and fsm as appropriate

WHISTLE-1441: add the queue id to the current call data returned by the agent FSM

WHISTLE-1441: bind the shared queue queue as non-exclusive

WHISTLE-1441: formatting

WHISTLE-1441: fix startup config params

WHISTLE-1441: generic queue

WHISTLE-1441: check endpoints against registrations

WHISTLE-1441: dialyzer fix

WHISTLE-1441: load endpoints in the FSM

WHISTLE-1441: don't load endpoints in the agent listener; receive them from the agent FSM

WHISTLE-1666: formatting

WHISTLE-1441: stop super, listener, and fsm as appropriate
  • Loading branch information...
1 parent 644b6e1 commit 1f17cee4e72a78f19d256ac9223552d4b206883b @jamesaimonetti jamesaimonetti committed Oct 11, 2012
@@ -295,7 +295,7 @@ init([Broker]) ->
%% @end
%%--------------------------------------------------------------------
handle_call(teardown_channels, _, #state{broker_name=Name}=State) ->
- clear_channels(Name),
+ _ = clear_channels(Name),
{reply, ok, State};
handle_call(use_federation, _, #state{broker=Broker}=State) ->
@@ -552,7 +552,8 @@ exchange_declare(#'exchange.declare'{type=Type}=ED, true) ->
ED1.
clear_channels(Name) ->
- [clear_channel(C) || #wh_amqp_channel{}=C <- ets:tab2list(Name)].
+ [clear_channel(C) || #wh_amqp_channel{}=C <- ets:tab2list(Name)],
+ ok.
clear_channel(#wh_amqp_channel{channel=ChPid
,channel_ref=ChRef
,consumer_ref=ConRef
@@ -14,7 +14,7 @@
-export([start_link/2
,member_connect_resp/2
,member_connect_retry/2
- ,bridge_to_member/2
+ ,bridge_to_member/3
,monitor_call/2
,member_connect_accepted/1
,channel_hungup/2
@@ -27,6 +27,7 @@
,add_acdc_queue/2
,rm_acdc_queue/2
,get_recording_doc_id/1
+ ,stop/1
]).
%% gen_server callbacks
@@ -50,7 +51,6 @@
,acct_id :: ne_binary()
,fsm_pid :: pid()
,agent_queues :: [ne_binary(),...] | []
- ,endpoints :: wh_json:json_objects()
,last_connect :: wh_now() % last connection
,last_attempt :: wh_now() % last attempt to connect
,my_id :: ne_binary()
@@ -141,6 +141,9 @@ start_link(Supervisor, AgentJObj) ->
)
end.
+stop(Srv) ->
+ gen_listener:cast(Srv, {stop_agent}).
+
-spec member_connect_resp/2 :: (pid(), wh_json:json_object()) -> 'ok'.
member_connect_resp(Srv, ReqJObj) ->
gen_listener:cast(Srv, {member_connect_resp, ReqJObj}).
@@ -151,8 +154,8 @@ member_connect_retry(Srv, WinJObj) ->
member_connect_accepted(Srv) ->
gen_listener:cast(Srv, member_connect_accepted).
-bridge_to_member(Srv, WinJObj) ->
- gen_listener:cast(Srv, {bridge_to_member, WinJObj}).
+bridge_to_member(Srv, WinJObj, EPs) ->
+ gen_listener:cast(Srv, {bridge_to_member, WinJObj, EPs}).
monitor_call(Srv, MonitorJObj) ->
gen_listener:cast(Srv, {monitor_call, MonitorJObj}).
@@ -207,11 +210,11 @@ init([Supervisor, AgentJObj, Queues]) ->
AgentId = wh_json:get_value(<<"_id">>, AgentJObj),
put(callid, AgentId),
- gen_listener:cast(self(), {load_endpoints, Supervisor}),
-
Self = self(),
AcctId = wh_json:get_value(<<"pvt_account_id">>, AgentJObj),
+ gen_listener:cast(self(), {start_fsm, Supervisor}),
+
_ = spawn(fun() ->
put(amqp_publish_as, Self),
@@ -266,11 +269,14 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
+handle_cast({stop_agent}, #state{supervisor=Supervisor}=State) ->
+ acdc_agent_sup:stop(Supervisor),
+ {noreply, State};
handle_cast({start_fsm, Supervisor}, #state{acct_id=AcctId
,agent_id=AgentId
}=State) ->
{ok, FSMPid} = acdc_agent_sup:start_fsm(Supervisor, AcctId, AgentId),
- link(FSMPid),
+
lager:debug("started FSM at ~p", [FSMPid]),
gen_listener:cast(self(), bind_to_member_reqs),
@@ -347,35 +353,6 @@ handle_cast(member_connect_accepted, #state{msg_queue_id=AmqpQueue
send_member_connect_accepted(AmqpQueue, call_id(Call), AcctId, AgentId, MyId),
{noreply, State};
-handle_cast({load_endpoints, Supervisor}, #state{acct_db=AcctDb
- ,agent_id=AgentId
- ,acct_id=AcctId
- ,record_calls=ShouldRecord
- }=State) ->
- lager:debug("loading agent endpoints"),
- Call = whapps_call:set_account_id(AcctId
- ,whapps_call:set_account_db(AcctDb
- ,whapps_call:new()
- )
- ),
- case catch acdc_util:get_endpoints(Call, AgentId) of
- [] ->
- lager:debug("no endpoints"),
- _ = acdc_agent_sup:stop(Supervisor),
- {noreply, State};
- [_|_]=EPs ->
- lager:debug("endpoints: ~p", [EPs]),
- gen_listener:cast(self(), {start_fsm, Supervisor}),
-
- {noreply, State#state{endpoints=EPs
- ,record_calls=record_endpoints(EPs, ShouldRecord)
- }};
- {'EXIT', _E} ->
- lager:debug("failed to load endpoints: ~p", [_E]),
- _ = acdc_agent_sup:stop(Supervisor),
- {noreply, State}
- end;
-
handle_cast({member_connect_resp, _}=Msg, #state{my_q = Q}=State) when
Q =:= undefined orelse Q =:= <<>> ->
fetch_my_queue(),
@@ -414,20 +391,24 @@ handle_cast({member_connect_retry, WinJObj}, #state{my_id=MyId}=State) ->
send_member_connect_retry(WinJObj, MyId),
{noreply, State};
-handle_cast({bridge_to_member, WinJObj}, #state{endpoints=EPs
- ,fsm_pid=FSM
- }=State) ->
+handle_cast({bridge_to_member, WinJObj, EPs}, #state{fsm_pid=FSM
+ ,record_calls=RecordCall
+ }=State) ->
lager:debug("bridging to agent endpoints: ~p", [EPs]),
Call = whapps_call:from_json(wh_json:get_value(<<"Call">>, WinJObj)),
RingTimeout = wh_json:get_value(<<"Ring-Timeout">>, WinJObj),
lager:debug("ring agent for ~p", [RingTimeout]),
+ ShouldRecord = should_record_endpoints(EPs, RecordCall),
+
acdc_util:bind_to_call_events(Call),
_P = spawn(fun() -> maybe_connect_to_agent(FSM, EPs, Call, RingTimeout) end),
lager:debug("waiting on successful bridge now: connecting in ~p", [_P]),
- {noreply, State#state{call=Call}};
+ {noreply, State#state{call=Call
+ ,record_calls=ShouldRecord
+ }};
handle_cast({monitor_call, MonitorJObj}, State) ->
Call = whapps_call:set_call_id(wh_json:get_value(<<"Call-ID">>, MonitorJObj), whapps_call:new()),
@@ -495,12 +476,6 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_info({'EXIT', P, _R}, #state{fsm_pid=P, supervisor=Supervisor}=State) ->
- lager:debug("our FSM(~p) died: ~p", [P, _R]),
-
- gen_listener:cast(self(), {start_fsm, Supervisor}),
-
- {noreply, State#state{fsm_pid=undefined}};
handle_info(_Info, State) ->
lager:debug("unhandled message: ~p", [_Info]),
{noreply, State}.
@@ -528,7 +503,8 @@ handle_event(_JObj, #state{fsm_pid=FSM}) ->
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{supervisor=Supervisor}) ->
+ acdc_agent_sup:stop(Supervisor),
lager:debug("agent process going down: ~p", [_Reason]).
%%--------------------------------------------------------------------
@@ -706,9 +682,9 @@ fetch_my_queue() ->
_ = spawn(fun() -> gen_listener:cast(Self, {queue_name, gen_listener:queue_name(Self)}) end),
ok.
--spec record_endpoints/2 :: (wh_json:json_objects(), boolean()) -> boolean().
-record_endpoints(_EPs, true) -> true;
-record_endpoints(EPs, false) ->
+-spec should_record_endpoints/2 :: (wh_json:json_objects(), boolean()) -> boolean().
+should_record_endpoints(_EPs, true) -> true;
+should_record_endpoints(EPs, false) ->
lists:any(fun(EP) ->
wh_json:is_true(<<"record_calls">>, EP, false)
end, EPs).
@@ -79,6 +79,7 @@
,caller_exit_key = <<"#">> :: ne_binary()
,agent_call_id :: ne_binary()
,next_status :: ne_binary()
+ ,endpoints = [] :: wh_json:json_objects()
}).
%%%===================================================================
@@ -246,7 +247,9 @@ init([AcctId, AgentId, AgentProc]) ->
gen_fsm:send_event(self(), send_sync_event),
acdc_stats:agent_active(AcctId, AgentId),
-
+
+ gen_fsm:send_all_state_event(self(), load_endpoints),
+
{ok, sync, #state{acct_id=AcctId
,acct_db=wh_util:format_account_id(AcctId, encoded)
,agent_id=AgentId
@@ -344,13 +347,15 @@ ready({sync_req, JObj}, #state{agent_proc=Srv}=State) ->
acdc_agent:send_sync_resp(Srv, ready, JObj),
{next_state, ready, State};
-ready({member_connect_win, JObj}, #state{agent_proc=Srv}=State) ->
+ready({member_connect_win, JObj}, #state{agent_proc=Srv
+ ,endpoints=[_|_]=EPs
+ }=State) ->
Call = whapps_call:from_json(wh_json:get_value(<<"Call">>, JObj)),
CallId = whapps_call:call_id(Call),
lager:debug("we won us a member: ~s", [CallId]),
- acdc_agent:bridge_to_member(Srv, JObj),
+ acdc_agent:bridge_to_member(Srv, JObj, EPs),
WrapupTimer = wh_json:get_integer_value(<<"Wrapup-Timeout">>, JObj, 0),
CallerExitKey = wh_json:get_value(<<"Caller-Exit-Key">>, JObj, <<"#">>),
@@ -734,6 +739,32 @@ paused(current_call, _, State) ->
handle_event({refresh, AgentJObj}, StateName, State) ->
lager:debug("refresh agent config: ~p", [AgentJObj]),
{next_state, StateName, State};
+handle_event(load_endpoints, StateName, #state{acct_db=AcctDb
+ ,agent_id=AgentId
+ ,acct_id=AcctId
+ ,agent_proc=Srv
+ }=State) ->
+ Setters = [fun(C) -> whapps_call:set_account_id(AcctId, C) end
+ ,fun(C) -> whapps_call:set_account_db(AcctDb, C) end
+ ],
+
+ Call = lists:foldl(fun(F, C) -> F(C) end
+ ,whapps_call:new(), Setters
+ ),
+ case catch acdc_util:get_endpoints(Call, AgentId) of
+ [] ->
+ lager:debug("no endpoints, going down"),
+ acdc_agent:stop(Srv),
+ {stop, normal, State};
+ [_|_]=EPs ->
+ lager:debug("endpoints: ~p", [EPs]),
+
+ {next_state, StateName, State#state{endpoints=EPs}};
+ {'EXIT', _E} ->
+ lager:debug("failed to load endpoints: ~p", [_E]),
+ acdc_agent:stop(Srv),
+ {stop, normal, State}
+ end;
handle_event(_Event, StateName, State) ->
lager:debug("unhandled event in state ~s: ~p", [StateName, _Event]),
{next_state, StateName, State}.
@@ -789,8 +820,10 @@ handle_info(_Info, StateName, State) ->
%%--------------------------------------------------------------------
terminate(_Reason, _StateName, #state{acct_id=AcctId
,agent_id=AgentId
+ ,agent_proc=Srv
}) ->
acdc_stats:agent_inactive(AcctId, AgentId),
+ acdc_agent:stop(Srv),
lager:debug("acdc agent fsm terminating while in ~s: ~p", [_StateName, _Reason]).
%%--------------------------------------------------------------------
@@ -77,7 +77,7 @@ handle_member_call(JObj, _Props) ->
acdc_stats:call_waiting(AcctId, QueueId, whapps_call:call_id(Call)),
case acdc_queues_sup:find_queue_supervisor(AcctId, QueueId) of
- P when is_pid(P) ->
+ P when is_pid(P) ->
acdc_queue:put_member_on_hold(acdc_queue_sup:queue(P), Call);
undefined ->
whapps_call_command:answer(Call),
@@ -34,6 +34,7 @@
,{exclusive, false}
]}
,{basic_qos, 1}
+ ,{queue_options, [{exclusive, false}]}
]).
-define(SHARED_QUEUE_BINDINGS(AcctId, QueueId), [{self, []}]).
@@ -280,15 +280,11 @@ agent_ready(AcctId, AgentId) ->
-define(BINDINGS, []).
-define(RESPONDERS, []).
--define(QUEUE_NAME, <<"acdc.stats">>).
--define(CONSUME_OPTIONS, []).
start_link() ->
gen_listener:start_link({local, ?MODULE}
,?MODULE
,[{bindings, ?BINDINGS}
,{responders, ?RESPONDERS}
- ,{queue_name, ?QUEUE_NAME}
- ,{consume_options, ?CONSUME_OPTIONS}
],
[]).
@@ -40,18 +40,36 @@ get_endpoints(Call, ?NE_BINARY = AgentId) ->
get_endpoints(_Call, {ok, []}) -> [];
get_endpoints(_Call, {error, _E}) -> [];
get_endpoints(Call, {ok, Devices}) ->
+ {ok, AcctDoc} = couch_mgr:open_cache_doc(whapps_call:account_db(Call), whapps_call:account_id(Call)),
+ AcctRealm = wh_json:get_value(<<"realm">>, AcctDoc),
+
EPDocs = [EPDoc
|| Device <- Devices,
(EPDoc = get_endpoint(Call, wh_json:get_value(<<"id">>, Device))) =/= undefined,
- wh_json:is_true(<<"enabled">>, EPDoc, false)
+ wh_json:is_true(<<"enabled">>, EPDoc, false),
+ is_endpoint_registered(EPDoc, AcctRealm)
],
+
lists:foldl(fun(EPDoc, Acc) ->
case cf_endpoint:build(EPDoc, Call) of
{ok, EP} -> EP ++ Acc;
{error, _} -> Acc
end
end, [], EPDocs).
+is_endpoint_registered(EPDoc, AcctRealm) ->
+ Query = [{<<"Realm">>, AcctRealm}
+ ,{<<"Username">>, wh_json:get_value([<<"sip">>, <<"username">>], EPDoc)}
+ ,{<<"Fields">>, [<<"Contact">>]}
+ ],
+ case whapps_util:amqp_pool_request(Query
+ ,fun wapi_registration:publish_query_req/1
+ ,fun wapi_registration:query_resp_v/1
+ ) of
+ {ok, _Resp} -> true;
+ {error, _E} -> false
+ end.
+
-spec get_endpoint/2 :: (whapps_call:call(), ne_binary()) -> wh_json:json_object() | 'undefined'.
get_endpoint(Call, ?NE_BINARY = EndpointId) ->
case couch_mgr:open_doc(whapps_call:account_db(Call), EndpointId) of
@@ -350,10 +350,10 @@ fetch_all_queue_stats(Context, history) ->
From = calendar:datetime_to_gregorian_seconds({Today, {0,0,0}}),
crossbar_doc:load_view(<<"acdc_stats/stats_per_queue_by_time">>
- ,[{startkey, [wh_util:current_tstamp(), <<"\ufff0">>]}
- ,{endkey, [From, <<>>]}
- ,descending
- ]
+ ,[{startkey, [wh_util:current_tstamp(), <<"\ufff0">>]}
+ ,{endkey, [From, <<>>]}
+ ,descending
+ ]
,Context
,fun normalize_queue_results/2
);

0 comments on commit 1f17cee

Please sign in to comment.