Skip to content

Commit

Permalink
WHISTLE-42: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anderson committed Mar 26, 2012
1 parent 7901b52 commit 93fb0ba
Show file tree
Hide file tree
Showing 8 changed files with 509 additions and 286 deletions.
3 changes: 3 additions & 0 deletions whistle_apps/apps/acdc/src/acdc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@
-include_lib("whistle/include/wh_log.hrl").
-include_lib("whistle/include/wh_databases.hrl").

-define(APP_NAME, <<"acdc">>).
-define(APP_VERSION, <<"0.1.0">>).

-define(ACDC_HRL, true).
-endif.
411 changes: 200 additions & 211 deletions whistle_apps/apps/acdc/src/acdc_agent.erl

Large diffs are not rendered by default.

84 changes: 36 additions & 48 deletions whistle_apps/apps/acdc/src/acdc_agent_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,77 +5,65 @@
%%% @end
%%% @contributors
%%% James Aimonetti
%%% Karl Anderson
%%%-------------------------------------------------------------------
-module(acdc_agent_pool).

-export([init/0, find_agent/2, update_agent/2]).
-export([init/0]).
-export([new_member/2]).
-export([update_agent/2]).

-include("acdc.hrl").

init() ->
lager:debug("finding all agents and starting workers"),
[add_agents(AcctDb) || AcctDb <- whapps_util:get_all_accounts()].
[add_agents(AcctDb) || AcctDb <- whapps_util:get_all_accounts()],
acdc_agents:reload_agents().

update_agent(JObj, _Prop) ->
update_agent(JObj, _Prop) ->
wh_util:put_callid(JObj),
lager:debug("recv agent update for: ~p", [wh_json:get_value(<<"doc">>, JObj)]).

-spec find_agent/2 :: (wh_json:json_object(), wh_proplist()) -> any().
find_agent(JObj, _Prop) ->
-spec new_member/2 :: (wh_json:json_object(), wh_proplist()) -> any().
new_member(JObj, _Prop) ->
wh_util:put_callid(JObj),

Call = whapps_call:from_json(wh_json:get_value(<<"Call">>, JObj)),
QueueId = wh_json:get_value(<<"Queue-ID">>, JObj),

lager:debug("caller in queue ~s", [QueueId]),
find_queue(Call, QueueId, wh_json:get_value(<<"Server-ID">>, JObj)).

find_agent(Call, whapps_call:account_db(Call), QueueId).

-spec find_agent/3 :: (whapps_call:call(), ne_binary(), ne_binary()) -> any().
find_agent(Call, AcctDb, QueueId) ->
{ok, Queue} = acdc_util:find_queue(AcctDb, QueueId),
find_agent(Call, AcctDb, QueueId, wh_json:get_integer_value(<<"connection_timeout">>, Queue, 300) * 1000).

-spec find_agent/4 :: (whapps_call:call(), ne_binary(), ne_binary(), pos_integer()) -> any().
find_agent(Call, AcctDb, QueueId, CallerTimeout) ->
Start = erlang:now(),

lager:debug("finding agent for ~s/~s in ~b timeout", [AcctDb, QueueId, CallerTimeout]),

case poolboy:checkout(?MODULE, true, CallerTimeout) of
Agent when is_pid(Agent) ->
lager:debug("checking with agent ~p", [Agent]),
case acdc_agent:maybe_handle_call(Agent, Call, AcctDb, QueueId, CallerTimeout) of
false ->
lager:debug("agent isn't handling the call"),
poolboy:checkin(?MODULE, Agent),
timer:sleep(100),
find_agent(Call, AcctDb, QueueId, CallerTimeout - (timer:now_diff(erlang:now(), Start) div 1000));
down ->
lager:debug("agent thinks the call is down, we're done"),
poolboy:checkin(?MODULE, Agent);
true ->
lager:debug("agent handled the call"),
poolboy:checkin(?MODULE, Agent)
end;
_Other ->
lager:debug("checked out ~p instead of agent", [_Other]),
find_agent(Call, AcctDb, QueueId, CallerTimeout - (timer:now_diff(erlang:now(), Start) div 1000))
-spec find_queue/3 :: (whapps_call:call(), ne_binary(), ne_binary()) -> 'ok' | {'error', term()}.
find_queue(Call, QueueId, ServerId) ->
AccountDb = whapps_call:account_db(Call),
case acdc_util:find_queue(AccountDb, QueueId) of
{error, _Reason} ->
lager:debug("unable to find ACD queue ~s/~s: ~p", [AccountDb, QueueId, _Reason]),
CallId = whapps_call:is_call(Call) andalso whapps_call:call_id(Call),
Result = [{<<"Call-ID">>, CallId}
,{<<"Result">>, <<"FAULT">>}
| wh_api:default_headers(?APP_NAME, ?APP_VERSION)
],
wapi_queue:publish_result(ServerId, Result);
{ok, Queue} ->
ConnectionTimeout = wh_json:get_integer_value(<<"connection_timeout">>, Queue, 300) * 1000,
acdc_agent:maybe_handle_call(Call, Queue, ServerId, ConnectionTimeout)
end.

add_agents(AcctDb) ->
case couch_mgr:get_results(AcctDb, <<"agents/crossbar_listing">>, []) of
-spec add_agents/1 :: (ne_binary()) -> 'ok'.
add_agents(AccountDb) ->
case couch_mgr:get_results(AccountDb, <<"agents/crossbar_listing">>, []) of
{ok, []} ->
lager:debug("no agents in ~s", [AcctDb]);
lager:debug("no agents in ~s", [AccountDb]);
{ok, As} ->
lager:debug("found agents for ~s", [AcctDb]),
[start_worker(AcctDb, A) || A <- As];
lager:debug("found agents for ~s", [AccountDb]),
[start_worker(AccountDb, A) || A <- As];
{error, _E} ->
lager:debug("error finding agents in ~s", [AcctDb])
lager:debug("error finding agents in ~s", [AccountDb])
end.

start_worker(AcctDb, Agent) ->
-spec start_worker/2 :: (ne_binary(), wh_json:json_object()) -> 'ok'.
start_worker(AccountDb, Agent) ->
AgentId = wh_json:get_value(<<"id">>, Agent),
AgentInfo = wh_json:get_value(<<"value">>, Agent),
Queues = wh_json:get_value([<<"value">>, <<"queues">>], Agent, []),
lager:debug("adding agent worker ~s", [AgentId]),
poolboy:add_worker(?MODULE, fun(Worker) -> {ok, Worker} end, [AcctDb, AgentId, AgentInfo]).
acdc_agent_sup:new(AccountDb, AgentId, Queues).
69 changes: 69 additions & 0 deletions whistle_apps/apps/acdc/src/acdc_agent_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2012, VoIP INC
%%% @doc
%%%
%%% @end
%%% @contributors
%%% James Aimonetti
%%% Karl Anderson
%%%-------------------------------------------------------------------
-module(acdc_agent_sup).

-behaviour(supervisor).

-include_lib("whistle/include/wh_types.hrl").

-export([start_link/0]).
-export([new/3]).
-export([workers/0]).
-export([init/1]).

%% ===================================================================
%% API functions
%% ===================================================================

%%--------------------------------------------------------------------
%% @public
%% @doc
%% Starts the supervisor
%% @end
%%--------------------------------------------------------------------
-spec start_link/0 :: () -> startlink_ret().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec new/3 :: (ne_binary(), ne_binary(), wh_json:json_object()) -> sup_startchild_ret().
new(AccountDb, AgentId, Queues) ->
Agent = {AgentId
,{acdc_agent, start_link, [AccountDb, AgentId, Queues]}
,permanent, 5000, worker
,[acdc_agent]
},
supervisor:start_child(?MODULE, Agent).

-spec workers/0 :: () -> [pid(),...] | [].
workers() ->
[ Pid || {_, Pid, worker, [_]} <- supervisor:which_children(?MODULE)].

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================

%%--------------------------------------------------------------------
%% @public
%% @doc
%% Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
%% @end
%%--------------------------------------------------------------------
-spec init([]) -> sup_init_ret().
init([]) ->
RestartStrategy = one_for_one,
MaxRestarts = 5,
MaxSecondsBetweenRestarts = 5,

SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},

{ok, {SupFlags, []}}.
152 changes: 152 additions & 0 deletions whistle_apps/apps/acdc/src/acdc_agents.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2012, VoIP INC
%%% @doc
%%%
%%% @end
%%% @contributors
%%% James Aimonetti
%%% Karl Anderson
%%%-------------------------------------------------------------------
-module(acdc_agents).

-behaviour(gen_server).

-export([start_link/0]).
-export([next_agent/0]).
-export([reload_agents/0]).

-export([init/1
,handle_call/3
,handle_cast/2
,handle_info/2
,terminate/2
,code_change/3
]).

-include("acdc.hrl").

-define(SERVER, ?MODULE).

%%%===================================================================
%%% API
%%%===================================================================

%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-spec reload_agents/0 :: () -> 'ok'.
reload_agents() ->
gen_server:cast(?SERVER, reload_agents).

-spec next_agent/0 :: () -> {'ok', pid()} | {'error', 'no_agents'}.
next_agent() ->
gen_server:call(?SERVER, next_agent).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([]) ->
put(callid, ?LOG_SYSTEM_ID),
lager:debug("acdc agents starting"),
{ok, []}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(next_agent, _From, []) ->
{reply, {error, no_agents}, []};
handle_call(next_agent, _From, [Agent|Agents]) ->
%% this is currently just round-robin but eventually there could
%% be lists for each queue and the pids are drawn via a strategy
%% maybe....
{reply, {ok, Agent}, lists:reverse([Agent|lists:reverse(Agents)])};
handle_call(_Request, _From, State) ->
{reply, {error, not_implemented}, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast(reload_agents, _) ->
lager:debug("reloading list of agent workers"),
{noreply, acdc_agent_sup:workers()};
handle_cast(_, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
lager:debug("acdc agents terminating: ~p", [_Reason]).

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
8 changes: 3 additions & 5 deletions whistle_apps/apps/acdc/src/acdc_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
]).
-define(RESPONDERS, [
%% New caller in the call queue
{{acdc_agent_pool, find_agent}, [{<<"queue">>, <<"new_member">>}]}
{{acdc_agent_pool, new_member}, [{<<"queue">>, <<"new_member">>}]}
%% User doc updated
,{{acdc_agent_pool, update_agent}, [{<<"configuration">>, <<"*">>}]}
]).
Expand All @@ -48,8 +48,7 @@
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_listener:start_link(?MODULE, [
{bindings, ?BINDINGS}
gen_listener:start_link(?MODULE, [{bindings, ?BINDINGS}
,{responders, ?RESPONDERS}
,{queue_name, ?QUEUE_NAME} % optional to include
,{queue_options, ?QUEUE_OPTIONS} % optional to include
Expand Down Expand Up @@ -119,7 +118,6 @@ handle_cast(_, State) ->
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
lager:debug("unhandled message: ~p", [_Info]),
{noreply, State}.

%%--------------------------------------------------------------------
Expand All @@ -145,7 +143,7 @@ handle_event(_JObj, _State) ->
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
lager:debug("listener terminating: ~p", [_Reason]).
lager:debug("acdc listner terminating: ~p", [_Reason]).

%%--------------------------------------------------------------------
%% @private
Expand Down
Loading

0 comments on commit 93fb0ba

Please sign in to comment.