Permalink
Browse files

Updated to RabbitMQ v2.5.0

  • Loading branch information...
1 parent ec906cb commit f438077dd5b95ee3d21cbeeb179406df75bb47cc @jbrisbin jbrisbin committed Jun 20, 2011
Showing with 12 additions and 6 deletions.
  1. +1 −1 rebar.config
  2. +11 −5 src/rabbit_exchange_type_riak.erl
View
@@ -1,6 +1,6 @@
{
deps, [
- {rabbit_common, ".*", {git, "https://github.com/jbrisbin/rabbit_common.git", "HEAD"}},
+ {rabbit_common, ".*", {git, "https://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.5.0"}}},
{riakc, ".*", {git, "https://github.com/basho/riak-erlang-client.git", "HEAD"}}
]
}.
@@ -11,7 +11,7 @@
-define(KEY, <<"X-Riak-Key">>).
-rabbit_boot_step({?MODULE,
- [{description, "Experimental Riak Exchange"},
+ [{description, "exchange type riak"},
{mfa, {rabbit_registry, register, [exchange, ?EXCHANGE_TYPE_BIN, ?MODULE]}},
{requires, rabbit_registry},
{enables, kernel_ready}]}).
@@ -21,6 +21,7 @@
route/2
]).
-export([
+ serialise_events/0,
validate/1,
create/2,
recover/2,
@@ -31,7 +32,10 @@
]).
description() ->
- [{name, ?EXCHANGE_TYPE_BIN}, {description, <<"Experimental Riak Exchange">>}].
+ [{name, ?EXCHANGE_TYPE_BIN}, {description, <<"exchange type Riak">>}].
+
+serialise_events() ->
+ false.
validate(X) ->
Exchange = exchange_type(X),
@@ -90,6 +94,7 @@ route(X=#exchange{name = #resource{virtual_host = _VirtualHost, name = Name}},
Payload = lists:foldl(fun(Chunk, NewPayload) ->
<<Chunk/binary, NewPayload/binary>>
end, <<>>, PayloadRev),
+ io:format("payload: ~p~n", [Payload]),
lists:foldl(fun(Route, _) ->
% Look for bucket from headers or default to exchange name
@@ -104,7 +109,7 @@ route(X=#exchange{name = #resource{virtual_host = _VirtualHost, name = Name}},
end,
% Insert or update everything
- % io:format("url: /~s/~s~n", [Bucket, Key]),
+ io:format("storing message to url: /~s/~s~n", [Bucket, Key]),
Obj0 = case riakc_pb_socket:get(Client, Bucket, Key) of
{ok, OldObj} -> riakc_obj:update_value(OldObj, Payload, binary_to_list(ContentType));
_ -> riakc_obj:new(Bucket, Key, Payload, binary_to_list(ContentType))
@@ -123,7 +128,8 @@ route(X=#exchange{name = #resource{virtual_host = _VirtualHost, name = Name}},
end,
% Insert/Update data
- riakc_pb_socket:put(Client, Obj1)
+ _Result = riakc_pb_socket:put(Client, Obj1)
+ % io:format("result: ~p~n", [Result])
end, [], Routes);
_Err ->
%io:format("err: ~p~n", [Err]),
@@ -190,7 +196,7 @@ exchange_type(#exchange{ arguments=Args }) ->
case lists:keyfind(?TYPE, 1, Args) of
{?TYPE, _, Type} ->
case list_to_atom(binary_to_list(Type)) of
- riak_exchange ->
+ rabbit_exchange_type_riak ->
error_logger:error_report("Cannot base a Riak exchange on a Riak exchange. An infinite loop would occur."),
rabbit_exchange_type_topic;
Else -> Else

0 comments on commit f438077

Please sign in to comment.