Permalink
Browse files

Dynamic addition and removal of bindings working

  • Loading branch information...
1 parent d6d18b7 commit 08dd8845ed071832a23ff1686185045c4d36e018 Gavin M. Roy committed Nov 24, 2012
Showing with 111 additions and 43 deletions.
  1. +1 −1 README.md
  2. +87 −29 src/rabbitmq_pulse_lib.erl
  3. +23 −13 src/rabbitmq_pulse_worker.erl
View
@@ -36,7 +36,7 @@ Todo
- Connections
- Channels
- Handle shutdown cleanly
-- Handle add_binding/remove_binding, create, delete properly
+- Handle create and delete exchange properly
Message Types
-------------
View
@@ -3,16 +3,28 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbitmq_pulse_worker.hrl").
--export([establish_connections/1,
+-export([add_binding/2,
+ create_exchange/4,
+ delete_exchange/4,
+ establish_connections/1,
process_interval/3,
pulse_exchanges/0,
- routing_key_match/2]).
+ remove_bindings/2]).
-define(PREFIX, "rabbitmq_pulse").
-define(DEFAULT_INTERVAL, 5000).
--define(IGNORE_KEYS, [applications, auth_mechanisms, erlang_version, exchange_types, processors, statistics_level, pid, owner_pid, exclusive_consumer_pid, slave_pids, synchronised_slave_pids]).
+-define(IGNORE_KEYS, [applications, auth_mechanisms, erlang_version, exchange_types, processors, statistics_level, pid,
+ owner_pid, exclusive_consumer_pid, slave_pids, synchronised_slave_pids]).
-define(OVERVIEW_BINDINGS, [<<"#">>, <<"overview">>]).
+-define(OVERVIEW_TYPE, <<"rabbitmq cluster overview">>).
+-define(NODE_TYPE, <<"rabbitmq node stats">>).
+-define(QUEUE_TYPE, <<"rabbitmq queue stats">>).
+
+-define(GRAPHITE_OVERVIEW_TYPE, <<"rabbitmq-pulse cluster overview graphite datapoint">>).
+-define(GRAPHITE_NODE_TYPE, <<"rabbitmq-pulse node graphite datapoint">>).
+-define(GRAPHITE_QUEUE_TYPE, <<"rabbitmq-pulse queue graphite datapoint">>).
+
%% General functions
convert_gregorian_to_julian(GregorianSeconds) ->
@@ -34,26 +46,42 @@ get_env(EnvVar, DefaultValue) ->
%% Functions for getting and filtering info from RabbitMQ configuration
-add_binding(Exchange) ->
+add_binding_to_exchange(Exchange, Source, RoutingKey) when Exchange#rabbitmq_pulse_exchange.exchange =:= Source ->
+ Bindings = sets:to_list(sets:from_list(lists:append(Exchange#rabbitmq_pulse_exchange.bindings, [RoutingKey]))),
#rabbitmq_pulse_exchange{virtual_host=Exchange#rabbitmq_pulse_exchange.virtual_host,
username=Exchange#rabbitmq_pulse_exchange.username,
exchange=Exchange#rabbitmq_pulse_exchange.exchange,
interval=Exchange#rabbitmq_pulse_exchange.interval,
format=Exchange#rabbitmq_pulse_exchange.format,
cluster=Exchange#rabbitmq_pulse_exchange.cluster,
- bindings=get_bindings(Exchange)}.
+ bindings=Bindings};
+
+add_binding_to_exchange(Exchange, _, _) ->
+ Exchange.
+
+add_binding(Exchanges, {binding, {resource, _, exchange, Source}, RoutingKey, {resource,_,queue, _},_}) ->
+ [add_binding_to_exchange(X, Source, RoutingKey) || X <- Exchanges].
-add_bindings(Exchanges) ->
- [add_binding(Exchange) || Exchange <- Exchanges].
+add_startup_bindings(Exchanges) ->
+ [get_exchange_bindings(Exchange) || Exchange <- Exchanges].
binding_exchange_and_routing_key([{source_name, Exchange}, {source_kind,exchange}, {destination_name,_}, {destination_kind,queue}, {routing_key,RoutingKey}, {arguments,_}]) ->
{Exchange, RoutingKey}.
+create_exchange(Connections, Exchanges, VirtualHost, Exchange) ->
+ rabbit_log:info('Create exchange: ~p~n~p~n', [VirtualHost, Exchange]),
+ {Connections, Exchanges}.
+
+delete_exchange(Connections, Exchanges, VirtualHost, Exchange) ->
+ rabbit_log:info('Delete exchange: ~p~n~p~n', [VirtualHost, Exchange]),
+ {Connections, Exchanges}.
+
distinct_vhost_pairs(Exchanges) ->
lists:usort([{X#rabbitmq_pulse_exchange.virtual_host, X#rabbitmq_pulse_exchange.username} || X <- Exchanges]).
filter_bindings(Exchange, Bindings) ->
- lists:flatten([RoutingKey || {BindingExchange, RoutingKey} <- remapped_bindings(Bindings), BindingExchange =:= Exchange]).
+ sets:to_list(sets:from_list(lists:flatten([RoutingKey || {BindingExchange, RoutingKey} <- remapped_bindings(Bindings),
+ BindingExchange =:= Exchange]))).
filter_pulse_exchange(Exchange={exchange,{resource, _, exchange, _}, 'x-pulse', _, _, _, _, _, _}) ->
Exchange;
@@ -75,7 +103,16 @@ get_all_exchanges() ->
VirtualHosts = get_virtual_hosts(),
get_exchanges(VirtualHosts).
-get_bindings(Exchange) ->
+get_exchange_bindings(Exchange) ->
+ #rabbitmq_pulse_exchange{virtual_host=Exchange#rabbitmq_pulse_exchange.virtual_host,
+ username=Exchange#rabbitmq_pulse_exchange.username,
+ exchange=Exchange#rabbitmq_pulse_exchange.exchange,
+ interval=Exchange#rabbitmq_pulse_exchange.interval,
+ format=Exchange#rabbitmq_pulse_exchange.format,
+ cluster=Exchange#rabbitmq_pulse_exchange.cluster,
+ bindings=get_bindings_from_rabbitmq(Exchange)}.
+
+get_bindings_from_rabbitmq(Exchange) ->
filter_bindings(Exchange#rabbitmq_pulse_exchange.exchange,
rabbit_binding:info_all(Exchange#rabbitmq_pulse_exchange.virtual_host)).
@@ -175,13 +212,13 @@ get_node_name(Node) ->
get_queues() ->
rabbit_mgmt_db:augment_queues([rabbit_mgmt_format:queue(Q) || Q <- rabbit_amqqueue:list()], full).
-get_node_routing_key_tuple(Value) ->
- list_to_tuple(lists:merge(["node"], string:tokens(Value, "@"))).
-
get_node_routing_key(Node) ->
Value = tuple_to_list(get_node_routing_key_tuple(get_node_name(Node))),
iolist_to_binary(string:join(Value, ".")).
+get_node_routing_key_tuple(Value) ->
+ list_to_tuple(lists:merge(["node"], string:tokens(Value, "@"))).
+
get_queue_routing_key(Vhost, Name) ->
iolist_to_binary(string:join(["queue", binary_to_list(Vhost), binary_to_list(Name)], ".")).
@@ -197,6 +234,9 @@ get_username(Args) ->
get_virtual_hosts() ->
rabbit_vhost:list().
+graphite__overview_key(Exchange, Key) ->
+ string:join([?PREFIX, "overview", Exchange#rabbitmq_pulse_exchange.cluster, Key], ".").
+
has_exchange(Connection, Exchange) ->
lists:member(Exchange, Connection#rabbitmq_pulse_connection.exchanges).
@@ -217,7 +257,7 @@ pair_up([]) ->
pulse_exchanges() ->
Exchanges = [remapped_exchange(X) || X <- get_all_exchanges(), filter_pulse_exchange(X) =/= null],
- add_bindings(Exchanges).
+ add_startup_bindings(Exchanges).
remapped_bindings(Bindings) ->
[binding_exchange_and_routing_key(B) || B <- Bindings].
@@ -236,6 +276,25 @@ remapped_exchange(Exchange) ->
remapped_queue(Q) ->
list_to_tuple([rabbitmq_queue_full |[proplists:get_value(X, Q) || X <- record_info(fields, rabbitmq_queue_full)]]).
+remove_binding_from_exchange(Exchange, Source, RoutingKey) when Exchange#rabbitmq_pulse_exchange.exchange =:= Source ->
+ #rabbitmq_pulse_exchange{virtual_host=Exchange#rabbitmq_pulse_exchange.virtual_host,
+ username=Exchange#rabbitmq_pulse_exchange.username,
+ exchange=Exchange#rabbitmq_pulse_exchange.exchange,
+ interval=Exchange#rabbitmq_pulse_exchange.interval,
+ format=Exchange#rabbitmq_pulse_exchange.format,
+ cluster=Exchange#rabbitmq_pulse_exchange.cluster,
+ bindings=[Binding || Binding <- Exchange#rabbitmq_pulse_exchange.bindings, Binding =/= RoutingKey]};
+
+remove_binding_from_exchange(Exchange, _, _) ->
+ Exchange.
+
+remove_binding(Exchanges, {binding, {resource, _, exchange, Source}, RoutingKey, {resource,_,queue, _},_}) ->
+ [remove_binding_from_exchange(X, Source, RoutingKey) || X <- Exchanges].
+
+remove_bindings(Exchanges, Bindings) ->
+ [Xs] = [remove_binding(Exchanges, Binding) || Binding <- Bindings],
+ Xs.
+
routing_key_match({BT, BN, BH}, {NT, NN, NH}) when BT =:= NT, BN =:= NN, BH =:= NH ->
true;
routing_key_match({BT, BN, BH}, {_NT, NN, NH}) when BT =:= "*", BN =:= NN, BH =:= NH ->
@@ -258,7 +317,7 @@ routing_key_match({BT}, {_NT, _NN, _NH}) when BT =:= "#" ->
true;
routing_key_match(BT, {_NT, _NN, _NH}) when BT =:= "#" ->
true;
-routing_key_match(Binding, Actual) ->
+routing_key_match(_, _) ->
false.
%% Connection and AMQP specific functions
@@ -278,8 +337,8 @@ add_exchange_to_connection(_, _) ->
null.
establish_connections(Exchanges) ->
- [add_exchange(open(VirtualHost, Username), Exchanges) || {VirtualHost, Username} <- distinct_vhost_pairs(Exchanges)].
-
+ [Connections] = [add_exchange(open(VirtualHost, Username), Exchanges) || {VirtualHost, Username} <- distinct_vhost_pairs(Exchanges)],
+ Connections.
open(VirtualHost, Username) ->
AdapterInfo = #amqp_adapter_info{name = <<"rabbitmq_pulse">>},
@@ -299,19 +358,18 @@ open(VirtualHost, Username) ->
E
end.
-publish_graphite_message(Channel, Exchange, Key, ValueString) ->
+publish_graphite_message(Channel, Exchange, Key, ValueString, Type) ->
Epoch = current_timestamp(),
[Timestamp] = io_lib:format("~p", [Epoch]),
[Value] = io_lib:format("~p", [ValueString]),
- FullKey = string:join([?PREFIX, "overview", Exchange#rabbitmq_pulse_exchange.cluster, Key], "."),
BasicPublish = #'basic.publish'{exchange=Exchange#rabbitmq_pulse_exchange.exchange,
- routing_key=list_to_bitstring(FullKey)},
+ routing_key=list_to_bitstring(Key)},
Properties = #'P_basic'{app_id = <<"rabbitmq-pulse">>,
content_type = <<"application/json">>,
delivery_mode = 1,
timestamp = Epoch,
- type = <<"rabbitmq-pulse cluster overview graphite datapoint">>},
- Message = list_to_bitstring(string:join([FullKey, Value, Timestamp], " ")),
+ type = Type},
+ Message = list_to_bitstring(string:join([Key, Value, Timestamp], " ")),
Content = #amqp_msg{props = Properties, payload = Message},
amqp_channel:call(Channel, BasicPublish, Content).
@@ -347,7 +405,7 @@ process_binding_overview(Exchange, Channel) ->
ok
end.
-process_interval(ExchangeName, Exchanges, [Connections]) ->
+process_interval(ExchangeName, Exchanges, Connections) ->
[Exchange] = [Exchange || Exchange <- Exchanges, Exchange#rabbitmq_pulse_exchange.exchange =:= ExchangeName],
Channel = get_channel(Exchange#rabbitmq_pulse_exchange.exchange, Connections),
process_binding_overview(Exchange, Channel),
@@ -359,7 +417,7 @@ process_node(Channel, Exchange, Node) when Exchange#rabbitmq_pulse_exchange.form
case should_publish_node_stats(Exchange, Node) of
true ->
{RoutingKey, Message} = node_stats(Node),
- publish_json_message(Channel, Exchange, RoutingKey, Message, <<"rabbitmq node stats">>),
+ publish_json_message(Channel, Exchange, RoutingKey, Message, ?NODE_TYPE),
ok;
false ->
ok
@@ -368,21 +426,21 @@ process_node(Channel, Exchange, Node) when Exchange#rabbitmq_pulse_exchange.form
process_overview(Exchange, Channel) when Exchange#rabbitmq_pulse_exchange.format =:= graphite ->
Overview = rabbit_mgmt_db:get_overview(),
Pairs = get_kvp(Overview),
- [publish_graphite_message(Channel, Exchange, Key, Value) || {Key, Value} <- Pairs];
+ [publish_graphite_message(Channel, Exchange, graphite__overview_key(Exchange, Key), Value, ?GRAPHITE_OVERVIEW_TYPE) || {Key, Value} <- Pairs],
+ ok;
process_overview(Exchange, Channel) when Exchange#rabbitmq_pulse_exchange.format =:= json ->
Overview = rabbit_mgmt_db:get_overview(),
- publish_json_message(Channel, Exchange, <<"overview">>, iolist_to_binary(mochijson2:encode(Overview)), <<"rabbitmq cluster overview">>).
+ publish_json_message(Channel, Exchange, <<"overview">>, iolist_to_binary(mochijson2:encode(Overview)), ?OVERVIEW_TYPE),
+ ok.
process_queue(Channel, Exchange, Q) when Exchange#rabbitmq_pulse_exchange.format =:= json ->
Queue = remapped_queue(Q),
case should_publish_queue_stats(Exchange, Queue) of
true ->
{RoutingKey, Message} = queue_stats(Queue#rabbitmq_queue_full.vhost, Queue#rabbitmq_queue_full.name, Q),
- publish_json_message(Channel, Exchange, RoutingKey, Message, <<"rabbitmq queue stats">>),
- ok;
- false ->
- rabbit_log:info("Skipped queue: ~p~n", [Queue])
+ publish_json_message(Channel, Exchange, RoutingKey, Message, ?QUEUE_TYPE),
+ ok
end.
should_publish_node_stats(Exchange, Node) ->
@@ -15,7 +15,8 @@ start_timer(Duration, Exchange) ->
timer:apply_after(Duration, ?MODULE, handle_interval, [Exchange]).
start_exchange_timer(Exchange) ->
- start_timer(Exchange#rabbitmq_pulse_exchange.interval, Exchange#rabbitmq_pulse_exchange.exchange).
+ start_timer(Exchange#rabbitmq_pulse_exchange.interval,
+ Exchange#rabbitmq_pulse_exchange.exchange).
start_exchange_timers(Exchanges) ->
[start_exchange_timer(Exchange) || Exchange <- Exchanges].
@@ -44,17 +45,24 @@ handle_cast({handle_interval, ExchangeName}, State) ->
start_exchange_timer(Exchange),
{noreply, State};
-handle_cast({add_binding, Tx, X, B}, State) ->
- rabbit_log:info("add_binding: ~p ~p ~p ~p ~n", [Tx, X, B, State]),
- {noreply, State};
+handle_cast({add_binding, none, _, Binding}, State) ->
+ Exchanges = rabbitmq_pulse_lib:add_binding(State#rabbitmq_pulse_state.exchanges, Binding),
+ NewState = #rabbitmq_pulse_state{connections=State#rabbitmq_pulse_state.connections,
+ exchanges=Exchanges},
+ {noreply, NewState};
handle_cast({create, #exchange{name = #resource{virtual_host=VirtualHost, name=Name}, arguments = Args}}, State) ->
- rabbitmq_pulse_lib:create_exchange(VirtualHost, Name, Args),
- {noreply, State};
-
-handle_cast({delete, X, _Bs}, State) ->
- rabbit_log:info("delete: ~p ~p ~p ~n", [X, _Bs, State]),
- {ok, State};
+ {Connections, Exchanges} = rabbitmq_pulse_lib:create_exchange(State#rabbitmq_pulse_state.connections,
+ State#rabbitmq_pulse_state.exchanges,
+ VirtualHost, Name, Args),
+ {noreply, #rabbitmq_pulse_state{connections=Connections, exchanges=Exchanges}};
+
+handle_cast({delete, Exchange, _Bs}, State) ->
+ {Connections, Exchanges} = rabbitmq_pulse_lib:delete_exchange(State#rabbitmq_pulse_state.connections,
+ State#rabbitmq_pulse_state.exchanges,
+ Exchange),
+ rabbit_log:info("Post delete:~n~p~n~p~n", [Connections, Exchanges]),
+ {noreply, #rabbitmq_pulse_state{connections=Connections, exchanges=Exchanges}};
handle_cast({policy_changed, _Tx, _X1, _X2}, State) ->
rabbit_log:info("policy_changed: ~p, ~p, ~p ~p ~n", [_Tx, _X1, _X2, State]),
@@ -64,9 +72,11 @@ handle_cast({recover, Exchange, Bs}, State) ->
rabbit_log:info("recover: ~p ~p ~n", [Exchange, Bs, State]),
{noreply, State};
-handle_cast({remove_bindings, Tx, X, Bs}, State) ->
- rabbit_log:info("remove_binding: ~p ~p ~p ~p ~n", [Tx, X, Bs, State]),
- {noreply, State};
+handle_cast({remove_bindings, none, _, Bindings}, State) ->
+ Exchanges = rabbitmq_pulse_lib:remove_bindings(State#rabbitmq_pulse_state.exchanges, Bindings),
+ NewState = #rabbitmq_pulse_state{connections=State#rabbitmq_pulse_state.connections,
+ exchanges=Exchanges},
+ {noreply, NewState};
handle_cast(_, State) ->
{noreply, State}.

0 comments on commit 08dd884

Please sign in to comment.