Skip to content

Commit

Permalink
WHISTLE-443: refactor ts_responder to use gen_listener behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
James Aimonetti committed Aug 26, 2011
1 parent 92c52fc commit 479410d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 116 deletions.
33 changes: 33 additions & 0 deletions whistle_apps/apps/trunkstore/src/route_req.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
%%%-------------------------------------------------------------------
%%% @author James Aimonetti <james@2600hz.org>
%%% @copyright (C) 2011, VoIP INC
%%% @doc
%%% Handle route requests off AMQP
%%% @end
%%% Created : 26 Aug 2011 by James Aimonetti <james@2600hz.org>
%%%-------------------------------------------------------------------
-module(route_req).

-export([init/0, handle_req/2]).

-include("ts.hrl").

init() ->
couch_mgr:db_create(?TS_DB),
?LOG_SYS("Ensured ~s is created", [?TS_DB]).

handle_req(ApiJObj, _Options) ->
true = wh_api:route_req_v(ApiJObj),
CallID = wh_json:get_value(<<"Call-ID">>, ApiJObj),
case {wh_json:get_value([<<"Custom-Channel-Vars">>, <<"Account-ID">>], ApiJObj), wh_json:get_value([<<"Custom-Channel-Vars">>, <<"Authorizing-ID">>], ApiJObj)} of
{AcctID, undefined} when is_binary(AcctID) ->
%% Coming from carrier (off-net)
?LOG_START(CallID, "Offnet call starting", []),
ts_offnet_sup:start_handler(<<"offnet-", CallID/binary>>, ApiJObj);
{AcctID, AuthID} when is_binary(AcctID) andalso is_binary(AuthID) ->
%% Coming from PBX (on-net); authed by Registrar or ts_auth
?LOG_START(CallID, "Onnet call starting", []),
ts_onnet_sup:start_handler(<<"onnet-", CallID/binary>>, ApiJObj);
{_AcctID, _AuthID} ->
?LOG("Error in routing: AcctID: ~s AuthID: ~s", [_AcctID, _AuthID])
end.
143 changes: 27 additions & 116 deletions whistle_apps/apps/trunkstore/src/ts_responder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,28 @@
%%%-------------------------------------------------------------------
-module(ts_responder).

-behaviour(gen_server).
-behaviour(gen_listener).

%% API
-export([start_link/0, start_responder/0, transfer_auth/0]).
-export([start_link/0, start_responder/0, stop/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, handle_event/2
,terminate/2, code_change/3]).

-include("ts.hrl").

-define(RESPONDERS, [
{route_req, [{<<"dialplan">>, <<"route_req">>}]}
]).
-define(BINDINGS, [
{routing, []}
]).

-define(SERVER, ?MODULE).
-define(ROUTE_QUEUE_NAME, <<"ts_responder.route.queue">>).
-define(AUTH_QUEUE_NAME, <<"ts_responder.auth.queue">>).
-define(ROUTE_QUEUE_OPTIONS, [{exclusive, false}]).
-define(ROUTE_CONSUME_OPTIONS, [{exclusive, false}]).

-record(state, {
is_amqp_up = false :: boolean()
Expand All @@ -52,7 +60,16 @@ start_link() ->

start_responder() ->
?LOG("Starting responder"),
gen_server:start_link(?MODULE, [], []).
gen_listener:start_link(?MODULE, [{responders, ?RESPONDERS}
,{bindings, ?BINDINGS}
,{queue_name, ?ROUTE_QUEUE_NAME}
,{queue_options, ?ROUTE_QUEUE_OPTIONS}
,{consume_options, ?ROUTE_CONSUME_OPTIONS}
,{basic_qos, 1}
], []).

stop(Srv) ->
gen_listener:stop(Srv).

%%%===================================================================
%%% gen_server callbacks
Expand All @@ -70,10 +87,7 @@ start_responder() ->
%% @end
%%--------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
couch_mgr:db_create(?TS_DB),
?LOG_SYS("Ensured ~s is created", [?TS_DB]),
{ok, #state{}, 0}.
{ok, #state{}}.

%%--------------------------------------------------------------------
%% @private
Expand Down Expand Up @@ -115,50 +129,13 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------

%% receive resource requests from Apps
handle_info({_, #amqp_msg{props = Props, payload = Payload}}, State) ->
{_Pid, _Ref} = spawn_monitor(fun() -> handle_req(Props#'P_basic'.content_type, Payload) end),
{noreply, State, hibernate};

handle_info({'DOWN', _, process, _Pid, _Reason}, State) ->
?LOG_SYS("handle_req for ~p down: ~p", [_Pid, _Reason]),
{noreply, State, hibernate};

handle_info(timeout, #state{is_amqp_up=false}=S) ->
?LOG_SYS("starting amqp"),
{ok, CQ} = start_amqp(),
?LOG_SYS("Starting up responder with AMQP Queue: ~s", [CQ]),
{noreply, S#state{is_amqp_up=is_binary(CQ)}, 1000};

handle_info(timeout, #state{is_amqp_up=true}=S) ->
{noreply, S};

handle_info({amqp_host_down, H}, S) ->
?LOG_SYS("AMQP Host(~s) down", [H]),
{noreply, S#state{is_amqp_up=false}, 1000};

handle_info({amqp_lost_channel, no_connection}, S) ->
?LOG_SYS("AMQP channel lost due to no connection"),
{noreply, S#state{is_amqp_up=false}, 1000};

handle_info(Req, #state{is_amqp_up=false}=S) ->
case start_amqp() of
{ok, _} ->
handle_info(Req, S#state{is_amqp_up=true});
{error, _} ->
?LOG_SYS("Dropping request, AMQP down: ~p~n", [Req]),
{noreply, S}
end;

handle_info(#'basic.consume_ok'{}, S) ->
{noreply, S, hibernate};

%% catch all so we don't lose state
handle_info(_Unhandled, State) ->
?LOG_SYS("Unknown message: ~p~n", [_Unhandled]),
{noreply, State, 1000}.

handle_event(JObj, _State) ->
{reply, JObj}.

%%--------------------------------------------------------------------
%% @private
%% @doc
Expand Down Expand Up @@ -188,72 +165,6 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec(handle_req/2 :: (ContentType :: binary(), Payload :: binary()) -> no_return()).
handle_req(<<"application/json">>, Payload) ->
JObj = mochijson2:decode(binary_to_list(Payload)),

put(callid, wh_json:get_value(<<"Call-ID">>, JObj, wh_json:get_value(<<"Msg-ID">>, JObj, <<"0000000000">>))),
?LOG_START("Received ~s", [Payload]),

process_req(get_msg_type(JObj), JObj);
handle_req(_ContentType, _Payload) ->
?LOG_SYS("Received payload with unknown content type: ~p -> ~s", [_ContentType, _Payload]).

-spec get_msg_type/1 :: (JObj) -> {binary(), binary()} when
JObj :: json_object().
get_msg_type(JObj) ->
{ wh_json:get_value(<<"Event-Category">>, JObj), wh_json:get_value(<<"Event-Name">>, JObj) }.

-spec process_req/2 :: (MsgType, ApiJObj) -> no_return() when
MsgType :: {binary(), binary()},
ApiJObj :: json_object().
process_req({<<"dialplan">>,<<"route_req">>}, ApiJObj) ->
try
true = wh_api:route_req_v(ApiJObj),
CallID = wh_json:get_value(<<"Call-ID">>, ApiJObj),
case {wh_json:get_value([<<"Custom-Channel-Vars">>, <<"Account-ID">>], ApiJObj), wh_json:get_value([<<"Custom-Channel-Vars">>, <<"Authorizing-ID">>], ApiJObj)} of
{AcctID, undefined} when is_binary(AcctID) ->
%% Coming from carrier (off-net)
?LOG_START(CallID, "Offnet call starting", []),
ts_offnet_sup:start_handler(<<"offnet-", CallID/binary>>, ApiJObj);
{AcctID, AuthID} when is_binary(AcctID) andalso is_binary(AuthID) ->
%% Coming from PBX (on-net); authed by Registrar or ts_auth
?LOG_START(CallID, "Onnet call starting", []),
ts_onnet_sup:start_handler(<<"onnet-", CallID/binary>>, ApiJObj);
{_AcctID, _AuthID} ->
?LOG("Error in routing: AcctID: ~s AuthID: ~s", [_AcctID, _AuthID])
end
catch
A:{error,B} ->
?LOG_END("Route request exception: ~p:~p", [A, B]),
?LOG_SYS("Stacktrace: ~p", [erlang:get_stacktrace()])
end.

-spec start_amqp/0 :: () -> {ok, binary()} | {error, amqp_error}.
start_amqp() ->
ReqQueue = amqp_util:new_queue(?ROUTE_QUEUE_NAME, [{exclusive, false}]),

try
amqp_util:basic_qos(1), %% control egress of messages from the queue, only send one at time (load balances)

?LOG_SYS("QOS=1 set"),

%% Bind the queue to an exchange
_ = amqp_util:bind_q_to_callmgr(ReqQueue, ?KEY_ROUTE_REQ),

?LOG_SYS("Bound queue"),

%% Register a consumer to listen to the queue
amqp_util:basic_consume(ReqQueue, [{exclusive, false}]),

?LOG_SYS("Consuming"),

{ok, ReqQueue}
catch
_A:_B ->
?LOG_SYS("Error starting AMQP: ~p: ~p", [_A, _B]),
{error, amqp_error}
end.

-spec transfer_auth/0 :: () -> ok.
transfer_auth() ->
Expand Down

0 comments on commit 479410d

Please sign in to comment.