Skip to content

Commit

Permalink
WHISTLE-1653: start/stop auto-compaction abilities
Browse files Browse the repository at this point in the history
Conflicts:
	lib/whistle_couch-1.0.0/src/couch_compactor_fsm.erl
WHISTLE-1441: update spec

WHISTLE-1441: update the log lines

WHISTLE-1441: if agent process has doesn't know its queue name yet, fetch it and replay the request

WHISTLE-1441: update the log lines

WHISTLE-1441: if agent process has doesn't know its queue name yet, fetch it and replay the request

WHISTLE-1441: dialyzer fix

WHISTLE-1441: remove the need to track stats in the callflow action

WHISTLE-1441: publish the member_call to the shared queue (instead of by the member_call routing key)

WHISTLE-1441: only consume from the shared queue on the targeted exchange

WHISTLE-1441: handle the member call in the manager first, to handle stats and putting the caller on hold, then publish directly to the shared queue

WHISTLE-1441: track the queue's MOH attribute, in case the queue process is asked to put the caller on hold without a MOH attribute
  • Loading branch information
James Aimonetti committed Sep 26, 2012
1 parent 38a8d7c commit fc54fd2
Show file tree
Hide file tree
Showing 9 changed files with 980 additions and 30 deletions.
898 changes: 898 additions & 0 deletions lib/whistle_couch-1.0.0/src/couch_compactor_fsm.erl

Large diffs are not rendered by default.

48 changes: 32 additions & 16 deletions whistle_apps/apps/acdc/src/acdc_agent.erl
Expand Up @@ -212,7 +212,9 @@ init([Supervisor, AgentJObj, Queues]) ->

_ = spawn(fun() ->
put(amqp_publish_as, Self),

gen_listener:cast(Self, {queue_name, gen_listener:queue_name(Self)}),

Prop = [{<<"Account-ID">>, AcctId}
,{<<"Agent-ID">>, AgentId}
| wh_api:default_headers(?APP_NAME, ?APP_VERSION)
Expand Down Expand Up @@ -261,10 +263,9 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({start_fsm, Supervisor}, #state{
acct_id=AcctId
,agent_id=AgentId
}=State) ->
handle_cast({start_fsm, Supervisor}, #state{acct_id=AcctId
,agent_id=AgentId
}=State) ->
{ok, FSMPid} = acdc_agent_sup:start_fsm(Supervisor, AcctId, AgentId),
link(FSMPid),
lager:debug("started FSM at ~p", [FSMPid]),
Expand All @@ -273,6 +274,7 @@ handle_cast({start_fsm, Supervisor}, #state{

{noreply, State#state{fsm_pid=FSMPid}};
handle_cast({queue_name, Q}, State) ->
lager:debug("my queue: ~s", [Q]),
{noreply, State#state{my_q=Q}};

handle_cast({queue_login, Q}, #state{agent_queues=Qs
Expand Down Expand Up @@ -317,6 +319,9 @@ handle_cast({channel_hungup, CallId}, #state{call=Call}=State) ->
,msg_queue_id=undefined
,acdc_queue_id=undefined
}};
undefined ->
lager:debug("undefined call id for channel_hungup, ignoring"),
{noreply, State};
_ ->
lager:debug("other channel ~s hungup", [CallId]),
acdc_util:unbind_from_call_events(CallId),
Expand All @@ -333,11 +338,10 @@ handle_cast(member_connect_accepted, #state{msg_queue_id=AmqpQueue
send_member_connect_accepted(AmqpQueue, call_id(Call), AcctId, AgentId, MyId),
{noreply, State};

handle_cast({load_endpoints, Supervisor}, #state{
acct_db=AcctDb
,agent_id=AgentId
,acct_id=AcctId
}=State) ->
handle_cast({load_endpoints, Supervisor}, #state{acct_db=AcctDb
,agent_id=AgentId
,acct_id=AcctId
}=State) ->
lager:debug("loading agent endpoints"),
Call = whapps_call:set_account_id(AcctId
,whapps_call:set_account_db(AcctDb
Expand All @@ -359,13 +363,16 @@ handle_cast({load_endpoints, Supervisor}, #state{
{noreply, State}
end;

handle_cast({member_connect_resp, ReqJObj}, #state{
agent_id=AgentId
,last_connect=LastConn
,agent_queues=Qs
,my_id=MyId
,my_q=MyQ
}=State) ->
handle_cast({member_connect_resp, _}=Msg, #state{my_q = <<>>}=State) ->
fetch_my_queue(),
gen_listener:cast(self(), Msg),
{noreply, State};
handle_cast({member_connect_resp, ReqJObj}, #state{agent_id=AgentId
,last_connect=LastConn
,agent_queues=Qs
,my_id=MyId
,my_q=MyQ
}=State) ->
ACDcQueue = wh_json:get_value(<<"Queue-ID">>, ReqJObj),
case is_valid_queue(ACDcQueue, Qs) of
false ->
Expand Down Expand Up @@ -425,6 +432,10 @@ handle_cast({join_agent, ACallId}, #state{call=Call}=State) ->
whapps_call_command:pickup(ACallId, <<"now">>, Call),
{noreply, State};

handle_cast({send_sync_req}=Msg, #state{my_q = <<>>}=State) ->
fetch_my_queue(),
gen_listener:cast(self(), Msg),
{noreply, State};
handle_cast({send_sync_req}, #state{my_id=MyId
,my_q=MyQ
,acct_id=AcctId
Expand Down Expand Up @@ -666,3 +677,8 @@ logout_from_queue(AcctId, Q) ->
,{queue_id, Q}
,{account_id, AcctId}
]).

fetch_my_queue() ->
Self = self(),
_ = spawn(gen_listener,cast, [Self, {queue_name, gen_listener:queue_name(Self)}]),
ok.
4 changes: 2 additions & 2 deletions whistle_apps/apps/acdc/src/acdc_agent_fsm.erl
Expand Up @@ -427,7 +427,7 @@ ringing({originate_failed, timeout}, #state{agent_proc=Srv
,member_call_queue_id=QueueId
,member_call_id=CallId
}=State) ->
lager:debug("originate timed out"),
lager:debug("originate timed out, clearing call"),
acdc_agent:member_connect_retry(Srv, CallId),

acdc_stats:call_missed(AcctId, QueueId, AgentId, CallId),
Expand Down Expand Up @@ -479,7 +479,7 @@ ringing({channel_hungup, CallId}
,agent_call_id=AgentCallId
}=State
) ->
lager:debug("member channel has gone down, stop agent call"),
lager:debug("member channel (~s) has gone down, stop agent call", [CallId]),
acdc_agent:channel_hungup(Srv, AgentCallId),

acdc_stats:call_abandoned(AcctId, QueueId, CallId, ?ABANDON_HANGUP),
Expand Down
8 changes: 7 additions & 1 deletion whistle_apps/apps/acdc/src/acdc_queue.erl
Expand Up @@ -35,7 +35,7 @@
]).

%% Call Manipulation
-export([put_member_on_hold/3]).
-export([put_member_on_hold/2, put_member_on_hold/3]).

%% gen_server callbacks
-export([init/1
Expand All @@ -53,6 +53,7 @@
queue_id :: ne_binary()
,queue_db :: ne_binary()
,acct_id :: ne_binary()
,moh :: ne_binary()

%% PIDs of the gang
,supervisor :: pid()
Expand Down Expand Up @@ -162,6 +163,8 @@ send_sync_req(Srv, Type) ->
config(Srv) ->
gen_listener:call(Srv, config).

put_member_on_hold(Srv, Call) ->
gen_listener:cast(Srv, {put_member_on_hold, Call}).
put_member_on_hold(Srv, Call, MOH) ->
gen_listener:cast(Srv, {put_member_on_hold, Call, MOH}).

Expand Down Expand Up @@ -198,6 +201,7 @@ init([Supervisor, QueueJObj]) ->
,queue_db = wh_json:get_value(<<"pvt_account_db">>, QueueJObj)
,acct_id = wh_json:get_value(<<"pvt_account_id">>, QueueJObj)
,my_id = acdc_util:proc_id()
,moh = wh_json:get_value(<<"moh">>, QueueJObj)
}}.

ask_for_queue_name() ->
Expand Down Expand Up @@ -406,6 +410,8 @@ handle_cast({send_sync_req, Type}, #state{my_q=MyQ
send_sync_req(MyQ, MyId, AcctId, QueueId, Type),
{noreply, State};

handle_cast({put_member_on_hold, Call}, #state{moh=MOH}=State) ->
handle_cast({put_member_on_hold, Call, MOH}, State);
handle_cast({put_member_on_hold, Call, MOH}, State) ->
whapps_call_command:answer(Call),
whapps_call_command:hold(MOH, Call),
Expand Down
2 changes: 1 addition & 1 deletion whistle_apps/apps/acdc/src/acdc_queue_handler.erl
Expand Up @@ -79,7 +79,7 @@ handle_stats_req(AcctId, QueueId, ServerId, MsgId) ->

-spec build_stats_resp/4 :: (api_binary(), api_binary(), api_binary(), [pid()] | []) -> any().
-spec build_stats_resp/7 :: (api_binary(), api_binary(), api_binary(), [pid()] | []
,wh_proplist(), wh_proplist(), wh_proplist()
,wh_json:json_object(), wh_json:json_object(), wh_json:json_object()
) -> any().
build_stats_resp(AcctId, RespQ, MsgId, Ps) ->
build_stats_resp(AcctId, RespQ, MsgId, Ps
Expand Down
26 changes: 25 additions & 1 deletion whistle_apps/apps/acdc/src/acdc_queue_manager.erl
Expand Up @@ -15,6 +15,7 @@

%% API
-export([start_link/0
,handle_member_call/2
]).

%% gen_server callbacks
Expand All @@ -30,8 +31,9 @@
-define(SERVER, ?MODULE).

-define(BINDINGS, [{conf, [{doc_type, <<"queue">>}]}
,{acdc_queue, [{restrict_to, [stats_req]}
,{acdc_queue, [{restrict_to, [stats_req, member_call]}
,{account_id, <<"*">>}
,{queue_id, <<"*">>}
]}
]).
-define(RESPONDERS, [{{acdc_queue_handler, handle_config_change}
Expand All @@ -40,6 +42,9 @@
,{{acdc_queue_handler, handle_stats_req}
,[{<<"queue">>, <<"stats_req">>}]
}
,{{acdc_queue_manager, handle_member_call}
,[{<<"member">>, <<"call">>}]
}
]).

%%%===================================================================
Expand All @@ -61,6 +66,25 @@ start_link() ->
,[]
).

handle_member_call(JObj, _Props) ->
Call = whapps_call:from_json(wh_json:get_value(<<"Call">>, JObj)),

AcctId = wh_json:get_value(<<"Account-ID">>, JObj),
QueueId = wh_json:get_value(<<"Queue-ID">>, JObj),

lager:debug("member call for ~s: ~s", [AcctId, QueueId]),

acdc_stats:call_waiting(AcctId, QueueId, whapps_call:call_id(Call)),

case acdc_queues_sup:find_queue_supervisor(AcctId, QueueId) of
P when is_pid(P) ->
acdc_queue:put_member_on_hold(acdc_queue_sup:queue(P), Call);
undefined ->
whapps_call_command:answer(Call),
whapps_call_command:hold(Call)
end,
wapi_acdc_queue:publish_shared_member_call(AcctId, QueueId, JObj).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down
8 changes: 3 additions & 5 deletions whistle_apps/apps/acdc/src/acdc_queue_shared.erl
Expand Up @@ -35,11 +35,9 @@
]}
,{basic_qos, 1}
]).
-define(SHARED_QUEUE_BINDINGS(AcctId, QueueId), [{acdc_queue, [{account_id, AcctId}
,{queue_id, QueueId}
,{restrict_to, [member_call]}
]}
]).

-define(SHARED_QUEUE_BINDINGS(AcctId, QueueId), [{self, []}]).

-define(RESPONDERS, [{{acdc_queue_handler, handle_member_call}
,[{<<"member">>, <<"call">>}]
}
Expand Down
12 changes: 12 additions & 0 deletions whistle_apps/apps/acdc/src/wapi_acdc_queue.erl
Expand Up @@ -31,6 +31,7 @@
]).

-export([publish_member_call/1, publish_member_call/2
,publish_shared_member_call/1, publish_shared_member_call/3, publish_shared_member_call/4
,publish_member_call_failure/2, publish_member_call_failure/3
,publish_member_call_success/2, publish_member_call_success/3
,publish_member_connect_req/1, publish_member_connect_req/2
Expand Down Expand Up @@ -668,6 +669,17 @@ publish_member_call(API, ContentType) ->
{ok, Payload} = wh_api:prepare_api_payload(API, ?MEMBER_CALL_VALUES, fun member_call/1),
amqp_util:callmgr_publish(Payload, ContentType, member_call_routing_key(API)).

publish_shared_member_call(JObj) ->
publish_shared_member_call(wh_json:get_value(<<"Account-ID">>, JObj)
,wh_json:get_value(<<"Queue-ID">>, JObj)
,JObj
).
publish_shared_member_call(AcctId, QueueId, JObj) ->
publish_shared_member_call(AcctId, QueueId, JObj, ?DEFAULT_CONTENT_TYPE).
publish_shared_member_call(AcctId, QueueId, API, ContentType) ->
{ok, Payload} = wh_api:prepare_api_payload(API, ?MEMBER_CALL_VALUES, fun member_call/1),
amqp_util:targeted_publish(shared_queue_name(AcctId, QueueId), Payload, ContentType).

-spec publish_member_call_failure/2 :: (ne_binary(), api_terms()) -> 'ok'.
-spec publish_member_call_failure/3 :: (ne_binary(), api_terms(), ne_binary()) -> 'ok'.
publish_member_call_failure(Q, JObj) ->
Expand Down
4 changes: 0 additions & 4 deletions whistle_apps/apps/callflow/src/module/cf_acdc_member.erl
Expand Up @@ -57,10 +57,6 @@ maybe_enter_queue(Call, MemberCall, QueueId, MaxWait, false) ->
lager:debug("asking for an agent, waiting up to ~p ms", [MaxWait]),

cf_exe:send_amqp(Call, MemberCall, fun wapi_acdc_queue:publish_member_call/1),
acdc_stats:call_waiting(whapps_call:account_id(Call)
,QueueId
,whapps_call:call_id(Call)
),
wait_for_bridge(Call, MaxWait).

-spec wait_for_bridge/2 :: (whapps_call:call(), max_wait()) -> 'ok'.
Expand Down

0 comments on commit fc54fd2

Please sign in to comment.