Skip to content

Commit

Permalink
Updates to work with RabbitMQ 3.0, add missing dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Gavin M. Roy committed Nov 23, 2012
1 parent b602688 commit 907e5ee
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion package.mk
@@ -1 +1 @@
DEPS:=rabbitmq-erlang-client rabbitmq-management
DEPS:=rabbitmq-server rabbitmq-erlang-client rabbitmq-management
16 changes: 9 additions & 7 deletions src/rabbitmq_pulse_lib.erl
Expand Up @@ -46,7 +46,7 @@ add_binding(Exchange) ->
add_bindings(Exchanges) ->
[add_binding(Exchange) || Exchange <- Exchanges].

binding_exchange_and_routing_key([{source_name, Exchange},{source_kind,exchange},{destination_name,_},{destination_kind,queue},{routing_key,RoutingKey},{arguments,_}]) ->
binding_exchange_and_routing_key([{source_name, Exchange}, {source_kind,exchange}, {destination_name,_}, {destination_kind,queue}, {routing_key,RoutingKey}, {arguments,_}]) ->
{Exchange, RoutingKey}.

distinct_vhost_pairs(Exchanges) ->
Expand All @@ -55,21 +55,21 @@ distinct_vhost_pairs(Exchanges) ->
filter_bindings(Exchange, Bindings) ->
lists:flatten([RoutingKey || {BindingExchange, RoutingKey} <- remapped_bindings(Bindings), BindingExchange =:= Exchange]).

filter_pulse_exchange(Exchange={exchange,{resource, _, exchange, _}, 'x-pulse', _, _, _, _, _}) ->
filter_pulse_exchange(Exchange={exchange,{resource, _, exchange, _}, 'x-pulse', _, _, _, _, _, _}) ->
Exchange;

filter_pulse_exchange({exchange,{resource, _, exchange, _}, _, _, _, _, _, _}) ->
filter_pulse_exchange({exchange,{resource, _, exchange, _}, _, _, _, _, _, _, _}) ->
null.

find_connection(Exchange, Connections)->
[Connection] = [C || C <- Connections, has_exchange(C, Exchange)],
Connection.

flatten(A, B, Subitem) when is_list(Subitem) ->
lists:flatten([[A,B,C,D] || {C, D} <- Subitem]);
lists:flatten([[A, B, C, D] || {C, D} <- Subitem]);

flatten(A, B, Subitem) when is_number(Subitem) ->
lists:flatten([A,B,Subitem]).
lists:flatten([A, B, Subitem]).

get_all_exchanges() ->
VirtualHosts = get_virtual_hosts(),
Expand Down Expand Up @@ -151,6 +151,8 @@ get_interval(Args) ->
{_, integer, Value} ->
Value;
{_, float, Value} ->
Value;
{_, long, Value} ->
Value
end;
false ->
Expand Down Expand Up @@ -212,7 +214,7 @@ remapped_bindings(Bindings) ->
[binding_exchange_and_routing_key(B) || B <- Bindings].

remapped_exchange(Exchange) ->
{exchange,{resource, VirtualHost, exchange, Name}, 'x-pulse', _, _, _, Args, _} = Exchange,
{exchange,{resource, VirtualHost, exchange, Name}, 'x-pulse', _, _, _, Args, _, _} = Exchange,
Remapped = #rabbitmq_pulse_exchange{virtual_host=VirtualHost,
username=get_username(Args),
exchange=Name,
Expand Down Expand Up @@ -266,7 +268,7 @@ establish_connections(Exchanges) ->


open(VirtualHost, Username) ->
AdapterInfo = #adapter_info{name = <<"rabbitmq_pulse">>},
AdapterInfo = #amqp_adapter_info{name = <<"rabbitmq_pulse">>},
case amqp_connection:start(#amqp_params_direct{username = Username,
virtual_host = VirtualHost,
adapter_info = AdapterInfo}) of
Expand Down
7 changes: 3 additions & 4 deletions src/rabbitmq_pulse_worker.erl
Expand Up @@ -26,7 +26,6 @@ start_exchange_timers(Exchanges) ->
start_link() ->
gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).


init([]) ->
register(rabbitmq_pulse, self()),
Exchanges = rabbitmq_pulse_lib:pulse_exchanges(),
Expand All @@ -49,9 +48,9 @@ handle_cast({add_binding, Tx, X, B}, State) ->
rabbit_log:info("add_binding: ~p ~p ~p ~p ~n", [Tx, X, B, State]),
{noreply, State};

%handle_cast({create, #exchange{name = #resource{virtual_host=VirtualHost, name=Name}, arguments = Args}}, State) ->
% rabbitmq_pulse_lib:create_exchange(VirtualHost, Name, Args),
% {noreply, State};
handle_cast({create, #exchange{name = #resource{virtual_host=VirtualHost, name=Name}, arguments = Args}}, State) ->
rabbitmq_pulse_lib:create_exchange(VirtualHost, Name, Args),
{noreply, State};

handle_cast({delete, X, _Bs}, State) ->
rabbit_log:info("delete: ~p ~p ~p ~n", [X, _Bs, State]),
Expand Down

0 comments on commit 907e5ee

Please sign in to comment.