Permalink
Browse files

Merge branch 'WHISTLE-1650'

  • Loading branch information...
2 parents fe9429c + 4825f09 commit 55ad66fa5395671591bf0296fd0456bf20436ae5 @jamesaimonetti jamesaimonetti committed Oct 3, 2012
@@ -244,8 +244,8 @@ rm_binding(Srv, Binding, Props) ->
-spec init/1 :: ([atom() | wh_proplist(),...]) -> {'ok', #state{}}.
init([Module, Params, InitArgs]) ->
process_flag(trap_exit, true),
- put(callid, ?LOG_SYSTEM_ID),
- lager:debug("starting new gen_listener proc: ~s", [wh_util:to_binary(Module)]),
+ put(callid, Module),
+ lager:debug("starting new gen_listener proc"),
{ModState, TimeoutRef} =
case erlang:function_exported(Module, init, 1) andalso Module:init(InitArgs) of
{ok, MS} -> {MS, undefined};
@@ -259,7 +259,10 @@ init([Module, Params, InitArgs]) ->
gen_server:cast(self(), {init_amqp, Params, Responders, Bindings}),
- {ok, #state{module=Module, module_state=ModState, module_timeout_ref=TimeoutRef}}.
+ {ok, #state{module=Module
+ ,module_state=ModState
+ ,module_timeout_ref=TimeoutRef
+ }}.
-type gen_l_handle_call_ret() :: {'reply', term(), #state{}, gen_server_timeout()} |
{'noreply', #state{}, gen_server_timeout()} |
@@ -304,8 +307,6 @@ handle_call(Request, From, #state{module=Module, module_state=ModState, module_t
handle_cast({init_amqp, Params, Responders, Bindings}, State) ->
case start_amqp(Params) of
{error, _E} ->
- lager:debug("failed to init AMQP: ~p", [_E]),
-
_R = erlang:send_after(?TIMEOUT_RETRY_CONN, self(), {amqp_channel_event, initial_conn_failed}),
_ = [add_responder(self(), Mod, Evts) || {Mod, Evts} <- Responders],
@@ -318,7 +319,6 @@ handle_cast({init_amqp, Params, Responders, Bindings}, State) ->
,is_consuming=false
}};
{ok, Q} ->
- lager:debug("AMQP queue created: ~s", [Q]),
_ = erlang:send_after(?TIMEOUT_RETRY_CONN, self(), is_consuming),
_ = [add_responder(self(), Mod, Evts) || {Mod, Evts} <- Responders],
_ = [create_binding(wh_util:to_binary(Type), BindProps, Q) || {Type, BindProps} <- Bindings],
@@ -375,24 +375,20 @@ handle_cast({add_binding, _, _}=AddBinding, #state{is_consuming=false}=State) ->
handle_cast({add_binding, Binding, Props}, #state{queue=Q, bindings=Bs}=State) ->
case lists:keyfind(Binding, 1, Bs) of
false ->
- lager:debug("creating new binding: ~s", [Binding]),
create_binding(Binding, Props, Q),
{noreply, State#state{bindings=[{Binding, Props}|Bs]}};
{_, P} ->
case Props =:= P of
true ->
- lager:debug("binding ~s exists", [Binding]),
{noreply, State};
false ->
- lager:debug("adding binding ~s with new props: ~p", [Binding, Props]),
create_binding(Binding, Props, Q),
{noreply, State#state{bindings=[{Binding, Props}|Bs]}}
end
end;
handle_cast({rm_binding, Binding, Props}, #state{queue=Q, bindings=Bs}=State) ->
KeepBs = lists:filter(fun({B, P}) when B =:= Binding, P =:= Props ->
- lager:debug("removing binding ~s (~p)", [B, P]),
remove_binding(B, P, Q),
false;
(_) -> true
@@ -451,7 +447,6 @@ handle_info({amqp_channel_event, restarted}, #state{params=Params
}=State) ->
case start_amqp(Params) of
{ok, Q} ->
- lager:debug("lost our channel, but its back up; rebinding"),
_ = [add_binding(self(), Type, BindProps)
|| {Type, BindProps} <- Bindings
],
@@ -461,12 +456,10 @@ handle_info({amqp_channel_event, restarted}, #state{params=Params
_ = erlang:send_after(?TIMEOUT_RETRY_CONN, self(), is_consuming),
{noreply, State#state{queue=Q, is_consuming=false, bindings=[], other_queues=[]}, hibernate};
{error, _R} ->
- lager:alert("failed to rebind after channel restart: ~p", [_R]),
_Ref = erlang:send_after(?START_TIMEOUT, self(), {'$maybe_connect_amqp', ?START_TIMEOUT}),
{noreply, State#state{queue = <<>>, is_consuming=false}, hibernate}
end;
handle_info({amqp_channel_event, _Reason}, State) ->
- lager:alert("notified AMQP channel died: ~p", [_Reason]),
_Ref = erlang:send_after(?START_TIMEOUT, self(), {'$maybe_connect_amqp', ?START_TIMEOUT}),
{noreply, State#state{queue = <<>>, is_consuming=false}, hibernate};
@@ -476,7 +469,6 @@ handle_info({'$maybe_connect_amqp', Timeout}, #state{bindings=Bindings
}=State) ->
case start_amqp(Params) of
{ok, Q} ->
- lager:info("reconnected to AMQP channel, rebinding"),
_ = [add_binding(self(), Type, BindProps)
|| {Type, BindProps} <- Bindings
],
@@ -492,13 +484,9 @@ handle_info({'$maybe_connect_amqp', Timeout}, #state{bindings=Bindings
end;
handle_info(#'basic.consume_ok'{}, S) ->
- lager:debug("consuming from our queue"),
{noreply, S#state{is_consuming=true}};
-handle_info(is_consuming, #state{is_consuming=false
- ,queue=Q
- }=State) ->
- lager:debug("huh, we're not consuming. Queue: ~p", [Q]),
+handle_info(is_consuming, #state{is_consuming=false}=State) ->
_Ref = erlang:send_after(?START_TIMEOUT, self(), {'$maybe_connect_amqp', ?START_TIMEOUT}),
{noreply, State};
@@ -548,7 +536,6 @@ terminate(Reason, #state{module=Module
}) ->
_ = (catch Module:terminate(Reason, ModState)),
lists:foreach(fun({B, P}) ->
- lager:debug("terminating binding ~s (~p)", [B, P]),
(catch remove_binding(B, P, Q))
end, Bs),
lager:debug("~s terminated cleanly, going down", [Module]).
@@ -3,7 +3,7 @@
%%% @doc
%%% Karls Hackity Hack....
%%% We want to block during startup until we have a AMQP connection
-%%% but due to the way wh_amqp_mgr is structured we cant block in
+%%% but due to the way wh_amqp_mgr is structured we cant block in
%%% init there. So this module will bootstrap wh_amqp_mgr
%%% and block until a connection becomes available, after that it
%%% removes itself....
@@ -64,12 +64,15 @@ init([]) ->
Init = get_config(),
UseFederation = props:get_value(use_federation, Init, false),
URIs = case props:get_value(amqp_uri, Init, ?DEFAULT_AMQP_URI) of
- U when is_list(U) -> [U];
- URI -> [URI]
+ URI = "amqp://"++_ -> [URI];
+ URI = "amqps://"++_ -> [URI];
+ URI when is_list(URI) -> URI
end,
- _ = [gen_server:cast(wh_amqp_mgr, {add_broker, URI, UseFederation}) || URI <- URIs],
- lager:info("waiting for AMQP connection...", []),
+ _ = [gen_server:cast(wh_amqp_mgr, {add_broker, Uri, UseFederation}) || Uri <- URIs],
+ lager:info("waiting for first AMQP connection...", []),
wh_amqp_mgr:wait_for_available_host(),
+ lager:debug("host is available"),
+ lager:debug("connection to use: ~p", [wh_amqp_mgr:get_connection()]),
{ok, #state{}, 100}.
%%--------------------------------------------------------------------
@@ -116,6 +119,7 @@ handle_info(timeout, State) ->
_ = wh_amqp_sup:stop_bootstrap(),
{noreply, State};
handle_info(_Info, State) ->
+ lager:debug("unhandled message: ~p", [_Info]),
{noreply, State}.
%%--------------------------------------------------------------------
@@ -146,12 +150,10 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
--spec get_config/0 :: () -> proplist().
+-spec get_config/0 :: () -> wh_proplist().
get_config() ->
case file:consult(?STARTUP_FILE) of
- {ok, Prop} ->
- lager:info("loaded amqp manager configuration from '~s'", [?STARTUP_FILE]),
- Prop;
+ {ok, Prop} -> Prop;
E ->
lager:debug("unable to load amqp manager configuration from '~s': ~p", [?STARTUP_FILE, E]),
[]
@@ -10,12 +10,14 @@
-include("amqp_util.hrl").
--export([new/0]).
--export([name/1]).
--export([uri/1, set_uri/2]).
--export([use_federation/1, set_use_federation/2]).
--export([params/1]).
--export([is_available/1, set_is_available/2]).
+-export([new/0
+ ,name/1
+ ,host/1
+ ,uri/1, set_uri/2
+ ,use_federation/1, set_use_federation/2
+ ,params/1
+ ,is_available/1, set_is_available/2
+ ]).
-record(amqp_broker, {uri :: 'undefined' | string()
,params :: 'undefined' | #'amqp_params_direct'{} | #'amqp_params_network'{}
@@ -27,16 +29,13 @@
-export_type([broker/0]).
-spec new/0 :: () -> broker().
-new() ->
- #amqp_broker{}.
+new() -> #amqp_broker{}.
-spec name/1 :: (broker()) -> 'undefined' | atom().
-name(#amqp_broker{uri=URI}) ->
- wh_util:to_atom(URI, true).
+name(#amqp_broker{uri=URI}) -> wh_util:to_atom(URI, true).
-spec uri/1 :: (broker()) -> 'undefined' | string().
-uri(#amqp_broker{uri=URI}) ->
- URI.
+uri(#amqp_broker{uri=URI}) -> URI.
-spec set_uri/2 :: (atom() | string() | ne_binary(), broker()) -> broker().
set_uri(URI, Broker) ->
@@ -45,20 +44,24 @@ set_uri(URI, Broker) ->
Broker#amqp_broker{uri=U, params=Params}.
-spec use_federation/1 :: (broker()) -> boolean().
-use_federation(#amqp_broker{use_federation=UseFederation}) ->
- UseFederation.
+use_federation(#amqp_broker{use_federation=UseFederation}) -> UseFederation.
-spec set_use_federation/2 :: (atom() | string() | ne_binary(), broker()) -> broker().
set_use_federation(UseFederation, Broker) ->
Broker#amqp_broker{use_federation=wh_util:is_true(UseFederation)}.
-spec params/1 :: (broker()) -> 'undefined' | #'amqp_params_direct'{} | #'amqp_params_network'{}.
-params(#amqp_broker{params=Params}) ->
- Params.
-
+params(#amqp_broker{params=Params}) -> Params.
+
+-spec host/1 :: (broker() | #amqp_params_direct{} | #amqp_params_network{}) -> 'undefined' | ne_binary().
+host(#amqp_broker{} = Broker) -> host(params(Broker));
+host(#amqp_params_direct{node=undefined}) -> undefined;
+host(#amqp_params_direct{node=H}) -> wh_util:to_binary(H);
+host(#amqp_params_network{host=undefined}) -> undefined;
+host(#amqp_params_network{host=H}) -> wh_util:to_binary(H).
+
-spec is_available/1 :: (broker()) -> boolean().
-is_available(#amqp_broker{is_available=IsAvailable}) ->
- IsAvailable.
+is_available(#amqp_broker{is_available=IsAvailable}) -> IsAvailable.
-spec set_is_available/2 :: (atom() | string() | ne_binary(), broker()) -> broker().
set_is_available(IsAvailable, Broker) ->
@@ -11,14 +11,17 @@
-behaviour(gen_server).
--export([start_link/1]).
--export([publish/3]).
--export([consume/2]).
--export([misc_req/2]).
--export([my_channel/1]).
--export([update_my_tag/2]).
--export([use_federation/1]).
--export([stop/1]).
+-export([start_link/1
+ ,publish/3
+ ,consume/2
+ ,misc_req/2
+ ,my_channel/1
+ ,update_my_tag/2
+ ,use_federation/1
+ ,stop/1
+ ,teardown_channels/1
+ ]).
+
-export([init/1
,handle_call/3
,handle_cast/2
@@ -67,6 +70,9 @@ start_link(Broker) ->
Name = wh_amqp_broker:name(Broker),
gen_server:start_link({local, Name}, ?MODULE, [Broker], []).
+teardown_channels(Broker) ->
+ gen_server:call(wh_amqp_broker:name(Broker), teardown_channels).
+
-spec publish/3 :: (atom(), #'basic.publish'{}, ne_binary() | iolist()) -> 'ok' | {'error', _}.
publish(Srv, #'basic.publish'{exchange=_Exchange, routing_key=_RK}=BasicPub, AmqpMsg) ->
FindChannel = [fun(Pid) when is_pid(Pid) -> my_channel(Srv, Pid, false);
@@ -82,7 +88,7 @@ publish(Srv, #'basic.publish'{exchange=_Exchange, routing_key=_RK}=BasicPub, Amq
case lists:foldl(fun(F, C) -> F(C) end, get(amqp_publish_as), FindChannel) of
{error, _}=E -> E;
{ok, Channel} ->
- lager:debug("publish to broker ~s, exchange '~s' with routing key '~s' via channel ~p", [Srv, _Exchange, _RK, Channel]),
+ lager:debug("publish: broker '~s' exchange '~s' routing key '~s' channel '~p'", [Srv, _Exchange, _RK, Channel]),
amqp_channel:call(Channel, BasicPub, AmqpMsg),
ok
end.
@@ -106,7 +112,7 @@ consume(Srv, #'queue.bind'{exchange=_Exchange, routing_key=_RK, queue=_Q}=QueueB
case my_channel(Srv) of
{error, _}=E -> E;
{ok, Channel, _} ->
- lager:debug("bind '~s' to exchange '~s' with routing key '~s' on broker ~s", [_Q, _Exchange, _RK, Srv]),
+ lager:debug("bind: broker '~s' exchange '~s' routing key '~s' queue '~s'", [Srv, _Exchange, _RK, _Q]),
case amqp_channel:call(Channel, QueueBind) of
#'queue.bind_ok'{} -> ok;
{error, _}=E -> E;
@@ -215,7 +221,8 @@ misc_channel(Srv) ->
[] -> {error, not_found}
end.
--spec create_channel/2 :: (atom(), pid()) -> {'ok', pid()} | {'error', _}.
+-spec create_channel/2 :: (atom(), pid()) -> {'ok', pid()} |
+ {'error', _}.
create_channel(Srv, Consumer) ->
case gen_server:call(Srv, {get_connection}) of
{error, _}=E -> E;
@@ -264,9 +271,12 @@ stop(Srv) ->
%%--------------------------------------------------------------------
init([Broker]) ->
process_flag(trap_exit, true),
- put(callid, ?LOG_SYSTEM_ID),
+
self() ! {connect, ?START_TIMEOUT},
Name = wh_amqp_broker:name(Broker),
+
+ _ = put(callid, wh_amqp_broker:host(Broker)),
+
_ = ets:new(Name, [set, protected, named_table, {keypos, #wh_amqp_channel.consumer}]),
{ok, #state{broker=Broker, broker_name=Name}}.
@@ -284,6 +294,10 @@ init([Broker]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
+handle_call(teardown_channels, _, #state{broker_name=Name}=State) ->
+ clear_channels(Name),
+ {reply, ok, State};
+
handle_call(use_federation, _, #state{broker=Broker}=State) ->
%% If we are diconnected dont pay attention to requests
{reply, wh_amqp_broker:use_federation(Broker), State};
@@ -493,16 +507,15 @@ start_channel(Connection, Srv) when is_pid(Connection) ->
lager:debug("unabled to start new channel: no_connection", []),
{error, no_connection};
E ->
- lager:debug("unabled to start new channel: ~p", [E]),
+ lager:debug("unable to start new channel: ~p", [E]),
E
end.
-spec notify_consumers/2 :: ({'amqp_channel_event', atom()}, atom()) -> 'ok'.
notify_consumers(Msg, Name) ->
- ets:foldl(fun(#wh_amqp_channel{consumer = Consumer}, ok) when is_pid(Consumer) ->
- Consumer ! Msg,
- ok;
- (_, ok) -> ok
+ ets:foldl(fun(#wh_amqp_channel{consumer = Consumer}, _) when is_pid(Consumer) ->
+ Consumer ! Msg, ok;
+ (_, _) -> ok
end, ok, Name).
-spec try_to_subscribe/3 :: (atom(), pid(), #'basic.consume'{}) -> 'ok' | {'error', term()}.
@@ -537,3 +550,16 @@ exchange_declare(#'exchange.declare'{type=Type}=ED, true) ->
]
},
ED1.
+
+clear_channels(Name) ->
+ [clear_channel(C) || #wh_amqp_channel{}=C <- ets:tab2list(Name)].
+clear_channel(#wh_amqp_channel{channel=ChPid
+ ,channel_ref=ChRef
+ ,consumer_ref=ConRef
+ ,consumer=Consumer
+ }) ->
+ is_reference(ChRef) andalso erlang:demonitor(ChRef, [flush]),
+ is_reference(ConRef) andalso erlang:demonitor(ConRef, [flush]),
+ erlang:is_process_alive(ChPid) andalso amqp_channel:close(ChPid),
+ is_pid(Consumer) andalso (Consumer ! {amqp_channel_event, closing}).
+
Oops, something went wrong.

0 comments on commit 55ad66f

Please sign in to comment.