Skip to content

Commit

Permalink
post route handling
Browse files Browse the repository at this point in the history
  • Loading branch information
James Aimonetti committed Oct 27, 2010
1 parent a7dde88 commit 11f3e51
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 46 deletions.
Binary file modified applications/trunkstore/ebin/ts_call_handler.beam
Binary file not shown.
Binary file modified applications/trunkstore/ebin/ts_responder.beam
Binary file not shown.
Binary file modified applications/trunkstore/ebin/ts_route.beam
Binary file not shown.
53 changes: 31 additions & 22 deletions applications/trunkstore/src/ts_call_handler.erl
Expand Up @@ -35,42 +35,51 @@
%%%-------------------------------------------------------------------
-module(ts_call_handler).

-export([start/4, init/4]).
-export([start/3, init/3, loop/3]).

-import(proplists, [get_value/2, get_value/3, delete/2, is_defined/2]).
-import(logger, [log/2, format_log/3]).

-import(props, [get_value/2, get_value/3]).
-import(logger, [format_log/3]).

-include("../include/amqp_client/include/amqp_client.hrl").

-spec(start/4 :: (CallID :: binary(), Flags :: tuple(), EvtQ :: binary(), CtlQ :: binary()) -> pid()).
start(CallID, Flags, EvtQ, CtlQ) ->
spawn(ts_call_handler, init, [CallID, Flags, EvtQ, CtlQ]).
-spec(start/3 :: (CallID :: binary(), Flags :: tuple(), AmqpHost :: string()) -> pid()).
start(CallID, Flags, AmqpHost) ->
spawn_link(ts_call_handler, init, [CallID, Flags, AmqpHost]).

-spec(init/4 :: (CaallID :: binary(), Flags :: tuple(), EvtQ :: binary(), CtlQ :: binary()) -> no_return()).
init(CallID, Flags, EvtQ, CtlQ) ->
{ok, Channel, Ticket} = amqp_manager:open_channel(self()),
consume_events(Channel, Ticket, EvtQ),
loop(CallID, Flags, {Channel, Ticket, CtlQ}).
-spec(init/3 :: (CallID :: binary(), Flags :: tuple(), AmqpHost :: string()) -> no_return()).
init(CallID, Flags, AmqpHost) ->
format_log(info, "TS_CALL(~p): Starting post handler for ~p...~n", [self(), CallID]),
consume_events(AmqpHost, CallID),
loop(CallID, Flags, {AmqpHost, <<>>}).

-spec(loop/3 :: (CallID :: binary(), Flags :: tuple(), Amqp :: tuple(Channel :: pid(), Ticket :: integer(), CtlQ :: binary())) -> no_return()).
loop(CallID, Flags, {Channel, Ticket, CtlQ}=Amqp) ->
-spec(loop/3 :: (CallID :: binary(), Flags :: tuple(), Amqp :: tuple(Host :: string(), CtlQ :: binary())) -> no_return()).
loop(CallID, Flags, {Host, _CtlQ}=Amqp) ->
receive
{ctl_queue, CallID, CtlQ1} ->
?MODULE:loop(CallID, Flags, {Host, CtlQ1});
{shutdown, CallID} ->
format_log(info, "TS_CALL(~p): Recv shutdown...~n", [self()]);
{_, #amqp_msg{props = _Props, payload = Payload}} ->
format_log(info, "TS_CALL(~p): Evt recv:~n~s~n", [self(), Payload]),
{struct, Prop} = mochijson2:decode(binary_to_list(Payload)),

case get_value(<<"Event-Name">>, Prop) of
<<"CHANNEL_DESTROY">> ->
format_log(info, "TS_CALL(~p): ChanDestroy Done~n", [self()]),
done;
format_log(info, "TS_CALL(~p): ChanDestroy recv, shutting down...~n", [self()]),
ts_responder:rm_post_handler(CallID);
<<"CHANNEL_HANGUP">> ->
format_log(info, "TS_CALL(~p): ChanHangup recv, shutting down...~n", [self()]),
ts_responder:rm_post_handler(CallID);
_EvtName ->
format_log(info, "TS_CALL(~p): Evt: ~p AppMsg: ~p~n", [self(), _EvtName, get_value(<<"Application-Response">>, Prop)])
format_log(info, "TS_CALL(~p): Evt: ~p AppMsg: ~p~n", [self(), _EvtName, get_value(<<"Application-Response">>, Prop)]),
?MODULE:loop(CallID, Flags, Amqp)
end;
_Other ->
format_log(info, "TS_CALL(~p): Received Other: ~p~n", [self(), _Other])
end,
loop(CallID, Flags, Amqp).
format_log(info, "TS_CALL(~p): Received Other: ~p~n", [self(), _Other]),
?MODULE:loop(CallID, Flags, Amqp)
end.

-spec(consume_events/3 :: (Channel :: pid(), Ticket :: integer(), EvtQ :: binary()) -> no_return()).
consume_events(Channel, Ticket, EvtQ) ->
#'basic.consume_ok'{} = amqp_channel:subscribe(Channel, amqp_util:basic_consume(Ticket, EvtQ), self()).
-spec(consume_events/2 :: (Host :: string(), CallID :: binary()) -> no_return()).
consume_events(Host, CallID) ->
amqp_util:callevt_consume(Host, CallID).
59 changes: 50 additions & 9 deletions applications/trunkstore/src/ts_responder.erl
Expand Up @@ -16,7 +16,7 @@
-behaviour(gen_server).

%% API
-export([start_link/0, set_couch_host/1, set_amqp_host/1]).
-export([start_link/0, set_couch_host/1, set_amqp_host/1, add_post_handler/2, rm_post_handler/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand All @@ -33,6 +33,7 @@
,couch_host = "" :: string()
,broad_q = <<>> :: binary()
,tar_q = <<>> :: binary()
,post_handlers = [] :: list(tuple(binary(), pid())) %% [ {CallID, PostHandlerPid} ]
}).

%%%===================================================================
Expand All @@ -55,6 +56,12 @@ set_couch_host(CHost) ->
set_amqp_host(AHost) ->
gen_server:call(?SERVER, {set_amqp_host, AHost}, infinity).

add_post_handler(CallID, Pid) ->
gen_server:call(?SERVER, {add_post_handler, CallID, Pid}, infinity).

rm_post_handler(CallID) ->
gen_server:call(?SERVER, {rm_post_handler, CallID}, infinity).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -87,6 +94,18 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call({rm_post_handler, CallID}, _From, #state{post_handlers=Posts}=State) ->
format_log(info, "TS_RESPONDER(~p): Remove handler for ~p~n", [self(), CallID]),
case lists:keyfind(CallID, 1, Posts) of
false ->
{reply, {error, no_handler, CallID}, State};
{CallID, Pid} ->
Pid ! shutdown,
{reply, ok, State#state{post_handlers=lists:keydelete(CallID, 1, Posts)}}
end;
handle_call({add_post_handler, CallID, Pid}, _From, #state{post_handlers=Posts}=State) ->
format_log(info, "TS_RESPONDER(~p): Add handler(~p) for ~p~n", [self(), Pid, CallID]),
{reply, ok, State#state{post_handlers=[{CallID, Pid} | Posts]}};
handle_call({set_couch_host, CHost}, _From, #state{couch_host=OldCHost}=State) ->
format_log(info, "TS_RESPONDER(~p): Updating couch host from ~p to ~p~n", [self(), OldCHost, CHost]),
ts_couch:set_host(CHost),
Expand Down Expand Up @@ -132,8 +151,8 @@ handle_cast(_Msg, State) ->
%% @end
%%--------------------------------------------------------------------
handle_info({'EXIT', _Pid, Reason}, State) ->
format_log(error, "TS_RESPONDER(~p): Received EXIT(~p) from ~p, stopping...~n", [self(), Reason, _Pid]),
{stop, Reason, State};
format_log(error, "TS_RESPONDER(~p): Received EXIT(~p) from ~p...~n", [self(), Reason, _Pid]),
{noreply, Reason, State};
%% receive resource requests from Apps
handle_info({_, #amqp_msg{props = Props, payload = Payload}}, State) ->
spawn(fun() -> handle_req(Props#'P_basic'.content_type, Payload, State) end),
Expand Down Expand Up @@ -200,23 +219,45 @@ process_req({<<"directory">>, <<"auth_req">>}, Prop, State) ->
{error, _Msg} ->
format_log(error, "TS_RESPONDER.auth(~p) ERROR: ~p~n", [self(), _Msg])
end
end;
end,
State;
process_req({<<"dialplan">>,<<"route_req">>}, Prop, #state{tar_q=TQ}=State) ->
case whistle_api:route_req_v(Prop) andalso ts_route:handle_req(Prop, TQ) of
false ->
format_log(error, "TS_RESPONDER.route(~p): Failed to validate route_req~n", [self()]);
{ok, JSON} ->
{ok, JSON, Flags} ->
RespQ = get_value(<<"Server-ID">>, Prop),
send_resp(JSON, RespQ, State);
send_resp(JSON, RespQ, State),
start_post_handler(Prop, Flags, State);
{error, _Msg} ->
format_log(error, "TS_RESPONDER.route(~p) ERROR: ~p~n", [self(), _Msg])
end;
end,
State;
%% What to do with post route processing?
process_req({<<"dialplan">>,<<"route_win">>}, Prop, State) ->
3;
process_req({<<"dialplan">>,<<"route_win">>}, Prop, #state{post_handlers=Posts}) ->
spawn(fun() ->
%% extract ctl queue for call, and send to the post_handler process associated with the call-id
case lists:keyfind(get_value(<<"Call-ID">>, Prop), 1, Posts) of
false ->
format_log(error, "TS_RESPONDER(~p): Unknown post handler for winning api msg~n~p~n", [self(), Prop]);
{CallID, Pid} ->
case erlang:is_process_alive(Pid) of
true ->
Pid ! {ctl_queue, CallID, get_value(<<"Control-Queue">>, Prop)};
false ->
?MODULE:rm_post_handler(CallID)
end
end
end);
process_req(_MsgType, _Prop, _State) ->
io:format("Unhandled Msg ~p~nJSON: ~p~n", [_MsgType, _Prop]).

%% Prop - RouteReq API Proplist
start_post_handler(Prop, Flags, #state{amqp_host=AmqpHost}) ->
CallID = get_value(<<"Call-ID">>, Prop),
Pid = ts_call_handler:start(CallID, Flags, AmqpHost),
?MODULE:add_post_handler(CallID, Pid).

-spec(send_resp/3 :: (JSON :: iolist(), RespQ :: binary(), tuple()) -> no_return()).
send_resp(JSON, RespQ, #state{amqp_host=AHost}) ->
format_log(info, "TS_RESPONDER(~p): Sending to ~p~n", [self(), RespQ]),
Expand Down
35 changes: 20 additions & 15 deletions applications/trunkstore/src/ts_route.erl
Expand Up @@ -22,7 +22,7 @@
%%%===================================================================
%%% API
%%%===================================================================
-spec(handle_req/2 :: (ApiProp :: proplist(), ServerID :: binary()) -> {ok, iolist()} | {error, string()}).
-spec(handle_req/2 :: (ApiProp :: proplist(), ServerID :: binary()) -> tuple(ok, iolist(), tuple()) | tuple(error, string())).
handle_req(ApiProp, ServerID) ->
format_log(info, "TS_ROUTE(~p): Handling Route Request~n", [self()]),
case get_value(<<"Custom-Channel-Vars">>, ApiProp) of
Expand All @@ -42,7 +42,7 @@ handle_req(ApiProp, ServerID) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec(inbound_handler/2 :: (ApiProp :: list(), ServerID :: binary()) -> {ok, iolist()} | {error, string()}).
-spec(inbound_handler/2 :: (ApiProp :: list(), ServerID :: binary()) -> tuple(ok, iolist(), tuple()) | tuple(error, string())).
inbound_handler(ApiProp, ServerID) ->
format_log(info, "TS_ROUTE(~p): Inbound handler started...~n", [self()]),
[ToUser, _ToDomain] = binary:split(get_value(<<"To">>, ApiProp), <<"@">>),
Expand All @@ -54,7 +54,7 @@ inbound_handler(ApiProp, ServerID) ->
Error
end.

-spec(outbound_handler/2 :: (ApiProp :: list(), ServerID :: binary()) -> {ok, iolist()} | {error, string()}).
-spec(outbound_handler/2 :: (ApiProp :: list(), ServerID :: binary()) -> tuple(ok, iolist(), tuple()) | tuple(error, string())).
outbound_handler(ApiProp, ServerID) ->
format_log(info, "TS_ROUTE(~p): Outbound handler starting...~n", [self()]),
Did = ts_util:to_e164(get_value(<<"Caller-ID-Number">>, ApiProp, <<>>)),
Expand Down Expand Up @@ -86,38 +86,38 @@ lookup_did(Did) ->
{error, "Unexpected error in outbound_handler"}
end.

-spec(process_routing/3 :: (Flags :: tuple(), ApiProp :: proplist(), ServerID :: binary()) -> {ok, iolist()} | {error, string()}).
-spec(process_routing/3 :: (Flags :: tuple(), ApiProp :: proplist(), ServerID :: binary()) -> tuple(ok, iolist(), tuple()) | tuple(error, string())).
process_routing(Flags, ApiProp, ServerID) ->
case ts_credit:check(Flags) of
{ok, Flags1} ->
%% call may proceed
find_route(Flags1, ApiProp, ServerID);
{error, Error} ->
format_log(error, "TS_ROUTE(~p): Credit Error ~p~n", [self(), Error]),
response(503, ApiProp, ServerID)
response(503, ApiProp, Flags, ServerID)
end.

-spec(find_route/3 :: (Flags :: tuple(), ApiProp :: proplist(), ServerID :: binary()) -> {ok, iolist()} | {error, string()}).
-spec(find_route/3 :: (Flags :: tuple(), ApiProp :: proplist(), ServerID :: binary()) -> tuple(ok, iolist(), tuple()) | tuple(error, string())).
find_route(Flags, ApiProp, ServerID) ->
case Flags#route_flags.direction =:= <<"outbound">> of
false ->
%% handle inbound routing
case inbound_route(Flags) of
{ok, Routes} ->
format_log(info, "TS_ROUTE(~p): Generated Inbound Routes~n~p~n", [self(), Routes]),
response(Routes, ApiProp, ServerID);
response(Routes, ApiProp, Flags, ServerID);
{error, Error} ->
format_log(error, "TS_ROUTE(~p): Inbound Routing Error ~p~n", [self(), Error]),
response(404, ApiProp, ServerID)
response(404, ApiProp, Flags, ServerID)
end;
true ->
case ts_carrier:route(Flags) of
{ok, Routes} ->
format_log(info, "TS_ROUTE(~p): Generated Outbound Routes~n~p~n", [self(), Routes]),
response(Routes, ApiProp, ServerID);
response(Routes, ApiProp, Flags, ServerID);
{error, Error} ->
format_log(error, "TS_ROUTE(~p): Outbound Routing Error ~p~n", [self(), Error]),
response(404, ApiProp, ServerID)
response(404, ApiProp, Flags, ServerID)
end
end.

Expand All @@ -140,7 +140,7 @@ inbound_route(Flags) ->
format_log(error, "TS_ROUTE(~p): Failed to validate Route ~p~n", [self(), Route]),
{error, "Inbound route validation failed"}
end.

-spec(inbound_features/1 :: (Flags :: tuple()) -> tuple()).
inbound_features(Flags) ->
Features = [],
Expand All @@ -150,7 +150,7 @@ inbound_features(Flags) ->
outbound_features(Flags) ->
Features = [ts_e911, ts_t38],
fold_features(Features, Flags).

-spec(fold_features/2 :: (Features :: list(atom()), Flags :: tuple()) -> tuple()).
fold_features(Features, Flags) ->
lists:foldl(fun(Mod, Flags0) ->
Expand Down Expand Up @@ -192,12 +192,17 @@ set_flags(DidProp, ApiProp) ->
%,codecs = [] :: list()
}.

-spec(response/3 :: (Routes :: proplist() | integer(), Prop :: proplist(), ServerID :: binary()) -> {ok, iolist()} | {error, string()}).
response(Routes, Prop, ServerID) ->
-spec(response/4 :: (Routes :: proplist() | integer(), Prop :: proplist(), Flags :: tuple(), ServerID :: binary()) -> tuple(ok, iolist(), tuple()) | tuple(error, string())).
response(Routes, Prop, Flags, ServerID) ->
Prop1 = [ {<<"Msg-ID">>, get_value(<<"Msg-ID">>, Prop)}
| whistle_api:default_headers(ServerID, <<"dialplan">>, <<"route_resp">>, <<"ts_route">>, <<"0.1">>) ],
Data = specific_response(Routes) ++ Prop1,
whistle_api:route_resp(Data).
case whistle_api:route_resp(Data) of
{ok, JSON} ->
{ok, JSON, Flags};
{error, _E}=Error ->
Error
end.

-spec(specific_response/1 :: (CodeOrRoutes :: integer() | proplist()) -> proplist()).
specific_response(404) ->
Expand Down

0 comments on commit 11f3e51

Please sign in to comment.