Skip to content

Commit

Permalink
KAZOO-545: delete the queue on close
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anderson committed Mar 12, 2013
1 parent 2166910 commit 450efbe
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 30 deletions.
1 change: 0 additions & 1 deletion lib/whistle_amqp-1.0.0/src/amqp_util.hrl
Expand Up @@ -131,7 +131,6 @@
,available = false :: boolean()
,prechannels = [] :: [{reference(), pid()},...] | [] | '_'
,exchanges = [] :: wh_exchanges() | '_'
,weight
}).
-type wh_amqp_connection() :: #wh_amqp_connection{}.
-type wh_amqp_connections() :: [wh_amqp_connection(),...] | [].
30 changes: 19 additions & 11 deletions lib/whistle_amqp-1.0.0/src/wh_amqp_channel.erl
Expand Up @@ -72,10 +72,18 @@ close(#wh_amqp_channel{channel=Pid
lager:debug("canceled consumer ~s via channel ~p", [CTag, Pid]),
catch amqp_channel:call(Pid, #'basic.cancel'{consumer_tag=CTag}),
close(Channel#wh_amqp_channel{commands=Commands});
close(#wh_amqp_channel{channel=Pid
,commands=[#'queue.declare'{queue=Queue}
|Commands
]
}=Channel) when is_pid(Pid) ->
lager:debug("removed queue ~s via channel ~p", [Queue, Pid]),
catch amqp_channel:call(Pid, #'queue.delete'{queue=Queue}),
close(Channel#wh_amqp_channel{commands=Commands});
close(#wh_amqp_channel{commands=[_|Commands]}=Channel) ->
close(Channel#wh_amqp_channel{commands=Commands});
close(#wh_amqp_channel{channel=Pid}=Channel) when is_pid(Pid) ->
lager:debug("closed channel ~p", [Pid]),
close(#wh_amqp_channel{channel=Pid, uri=URI}=Channel) when is_pid(Pid) ->
lager:debug("closed channel ~p on ~s", [Pid, URI]),
C = wh_amqp_channels:demonitor_channel(Channel),
catch amqp_channel:close(Pid),
C;
Expand Down Expand Up @@ -106,24 +114,24 @@ publish(#'basic.publish'{exchange=_Exchange, routing_key=_RK}=BasicPub
publish(BasicPub, AmqpMsg#'amqp_msg'{props=Props#'P_basic'{timestamp=Now}});
publish(#'basic.publish'{exchange=_Exchange, routing_key=_RK}=BasicPub, AmqpMsg) ->
case wh_amqp_channels:get_channel() of
#wh_amqp_channel{channel=Pid} when is_pid(Pid) ->
#wh_amqp_channel{channel=Pid, uri=URI} when is_pid(Pid) ->
amqp_channel:call(Pid, BasicPub, AmqpMsg),
lager:debug("published to ~s exchange (routing key ~s) via ~p", [_Exchange, _RK, Pid]);
#wh_amqp_channel{} ->
lager:debug("published to ~s(~s) exchange (routing key ~s) via ~p", [_Exchange, URI, _RK, Pid]);
#wh_amqp_channel{uri=URI} ->
wh_amqp_channels:reconnect(),
timer:sleep(100),
retry_publish(BasicPub, AmqpMsg),
lager:debug("dropping payload to ~s exchange (routing key ~s): ~s", [_Exchange, _RK, AmqpMsg#'amqp_msg'.payload])
lager:debug("dropping payload to ~s(~s) exchange (routing key ~s): ~s", [_Exchange, URI, _RK, AmqpMsg#'amqp_msg'.payload])
end.

-spec retry_publish(#'basic.publish'{}, #'amqp_msg'{}) -> 'ok'.
retry_publish(#'basic.publish'{exchange=_Exchange, routing_key=_RK}=BasicPub, AmqpMsg) ->
case wh_amqp_channels:get_channel() of
#wh_amqp_channel{channel=Pid} when is_pid(Pid) ->
#wh_amqp_channel{channel=Pid, uri=URI} when is_pid(Pid) ->
amqp_channel:call(Pid, BasicPub, AmqpMsg),
lager:debug("published to ~s exchange (routing key ~s) via ~p", [_Exchange, _RK, Pid]);
#wh_amqp_channel{} ->
lager:debug("dropping payload to ~s exchange (routing key ~s): ~s", [_Exchange, _RK, AmqpMsg#'amqp_msg'.payload])
lager:debug("published to ~s(~s) exchange (routing key ~s) via ~p", [_Exchange, URI, _RK, Pid]);
#wh_amqp_channel{uri=URI} ->
lager:debug("dropping payload to ~s(~s) exchange (routing key ~s): ~s", [_Exchange, URI, _RK, AmqpMsg#'amqp_msg'.payload])
end.

-spec command(wh_amqp_command()) -> command_ret().
Expand Down Expand Up @@ -238,7 +246,7 @@ open(#wh_amqp_channel{consumer=Consumer, reconnecting=Reconnecting}=Channel
})
end
],
lager:debug("create channel ~p for ~p", [Pid, Channel#wh_amqp_channel.consumer]),
lager:debug("create channel ~p for ~p on ~s", [Pid, Channel#wh_amqp_channel.consumer, URI]),
lists:foldl(fun(F, C) -> F(C) end, Channel, Routines);
{error, _R} ->
close(Channel)
Expand Down
25 changes: 7 additions & 18 deletions lib/whistle_amqp-1.0.0/src/wh_amqp_connections.erl
Expand Up @@ -36,9 +36,7 @@
,code_change/3
]).

-record(state, {weight=0
,exchanges=dict:new()
}).
-record(state, {exchanges=dict:new()}).

-define(TAB, ?MODULE).
-define(ENSURE_TIME, 5000).
Expand Down Expand Up @@ -71,8 +69,7 @@ add(URI) ->
{ok, Params} ->
new(#wh_amqp_connection{uri=URI
,manager=wh_util:to_atom(URI, true)
,params=Params
,weight=gen_server:call(?MODULE, assign_weight)})
,params=Params})
end.

-spec new(wh_amqp_connection()) -> wh_amqp_connection() | {'error', _}.
Expand Down Expand Up @@ -166,8 +163,11 @@ update_exchanges(URI, Exchanges) ->
gen_server:cast(?MODULE, {update_exchanges, wh_util:to_binary(URI), Exchanges}).

-spec connected(wh_amqp_connection()) -> wh_amqp_connection().
connected(#wh_amqp_connection{connection=Pid, weight=Weight}=Connection) when is_pid(Pid) ->
CurrentWeight = current_weight(),
connected(#wh_amqp_connection{connection=Pid, uri=Weight}=Connection) when is_pid(Pid) ->
CurrentWeight = case current() of
{ok, #wh_amqp_connection{uri=URI}} -> URI;
{error, no_available_connection} -> 0
end,
C = gen_server:call(?MODULE, {connected, Connection}),
_ = case Weight > CurrentWeight of
'true' -> gen_server:cast(?MODULE, {force_reconnect, Connection});
Expand Down Expand Up @@ -225,8 +225,6 @@ init([]) ->
%%--------------------------------------------------------------------
handle_call(exchanges, _, #state{exchanges=Exchanges}=State) ->
{reply, [V || {_, V} <- dict:to_list(Exchanges)], State};
handle_call(assign_weight, _, #state{weight=Weight}=State) ->
{reply, Weight, State#state{weight=Weight+1}};
handle_call({new, #wh_amqp_connection{uri=URI}=Connection}, _, State) ->
case ets:insert_new(?TAB, Connection) of
true -> {reply, Connection, State, ?ENSURE_TIME};
Expand Down Expand Up @@ -332,15 +330,6 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec current_weight() -> integer().
current_weight() ->
case current() of
{ok, #wh_amqp_connection{weight=Weight}} ->
Weight;
{'error', _} ->
-1
end.

-spec declare_exchange(wh_amqp_connections(), #'exchange.declare'{}) -> ok.
declare_exchange([], _) -> ok;
declare_exchange([#wh_amqp_connection{control_channel=Pid
Expand Down

0 comments on commit 450efbe

Please sign in to comment.