Browse files

PUBLISH ready

  • Loading branch information...
1 parent a8480d7 commit c1504fdd5ce84437749af87b6758655fbfc0cb9a Fernando 'Brujo' Benavides committed Oct 25, 2011
Showing with 209 additions and 1 deletion.
  1. +1 −0 README.md
  2. +3 −0 include/edis.hrl
  3. +25 −0 src/edis_command_runner.erl
  4. +134 −0 src/edis_pubsub.erl
  5. +43 −0 src/edis_pubsub_sup.erl
  6. +3 −1 src/edis_sup.erl
View
1 README.md
@@ -10,6 +10,7 @@ Just run `$ make run` and open connections with your favourite redis client.
* _MULTI_ doesn't support:
- cross-db commands (i.e. _FLUSHALL_, _SELECT_, _MOVE_)
- non-db commands (i.e _AUTH_, _CONFIG *_, _SHUTDOWN_, _MONITOR_)
+ - pub/sub commands (i.e. _PUBLISH_, _SUBSCRIBE_, _UNSUBSCRIBE_, _PSUBSCRIBE_, _PUNSUBSCRIBE_)
### Missing Features
* Dynamic node configuration (i.e. the _SLAVEOF_ command is not implemented)
View
3 include/edis.hrl
@@ -8,6 +8,9 @@
type = default :: alpha | default,
store_in :: undefined | binary()}).
+-record(edis_message, {channel :: binary(),
+ message :: binary()}).
+
-record(edis_command, {timestamp = edis_util:timestamp()
:: float(),
db :: non_neg_integer(),
View
25 src/edis_command_runner.erl
@@ -535,6 +535,15 @@ parse_command(C = #edis_command{cmd = <<"SAVE">>, args = []}) -> C#edis_command{
parse_command(#edis_command{cmd = <<"SAVE">>}) -> throw(bad_arg_num);
parse_command(C = #edis_command{cmd = <<"SHUTDOWN">>, args = []}) -> C#edis_command{result_type=ok,group=server};
parse_command(#edis_command{cmd = <<"SHUTDOWN">>}) -> throw(bad_arg_num);
+%% -- Pub/Sub --------------------------------------------------------------------------------------
+parse_command(#edis_command{cmd = <<"PSUBSCRIBE">>, args = []}) -> throw(bad_arg_num);
+parse_command(C = #edis_command{cmd = <<"PSUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
+parse_command(C = #edis_command{cmd = <<"PUBLISH">>, args = [_Channel, _Message]}) -> C#edis_command{result_type=number,group=pubsub};
+parse_command(#edis_command{cmd = <<"PUBLISH">>}) -> throw(bad_arg_num);
+parse_command(C = #edis_command{cmd = <<"PUNSUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
+parse_command(#edis_command{cmd = <<"SUBSCRIBE">>, args = []}) -> throw(bad_arg_num);
+parse_command(C = #edis_command{cmd = <<"SUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
+parse_command(C = #edis_command{cmd = <<"UNSUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
%% -- Transactions ---------------------------------------------------------------------------------
parse_command(C = #edis_command{cmd = <<"MULTI">>, args = []}) -> C#edis_command{result_type=ok,group=transaction};
parse_command(#edis_command{cmd = <<"MULTI">>}) -> throw(bad_arg_num);
@@ -620,6 +629,17 @@ run(C = #edis_command{cmd = <<"WATCH">>, args = Keys}, State) ->
end
end, State#state.watched_keys, Keys),
tcp_ok(State#state{watched_keys = NewWatchedKeys});
+%% -- Pub/Sub commands -----------------------------------------------------------------------------
+run(C = #edis_command{cmd = <<"PSUBSCRIBE">>, args = Patterns}, State) ->
+ throw(unsupported);
+run(#edis_command{cmd = <<"PUBLISH">>, args = [Channel, Message]}, State) ->
+ tcp_number(edis_pubsub:publish(Channel, Message), State);
+run(C = #edis_command{cmd = <<"PUNSUBSCRIBE">>, args = Patterns}, State) ->
+ throw(unsupported);
+run(C = #edis_command{cmd = <<"SUBSCRIBE">>}, State) ->
+ throw(unsupported);
+run(C = #edis_command{cmd = <<"UNSUBSCRIBE">>}, State) ->
+ throw(unsupported);
%% -- All the other commands -----------------------------------------------------------------------
run(C = #edis_command{result_type = ResType, timeout = Timeout}, State) ->
Res = case Timeout of
@@ -655,6 +675,11 @@ queue(#edis_command{cmd = <<"SELECT">>}, _State) -> throw(db_in_multi);
queue(#edis_command{cmd = <<"FLUSHALL">>}, _State) -> throw(db_in_multi);
queue(#edis_command{cmd = <<"MOVE">>}, _State) -> throw(db_in_multi);
queue(#edis_command{cmd = <<"MONITOR">>}, _State) -> throw(not_in_multi);
+queue(#edis_command{cmd = <<"PUBLISH">>}, _State) -> throw(not_in_multi);
+queue(#edis_command{cmd = <<"SUBSCRIBE">>}, _State) -> throw(not_in_multi);
+queue(#edis_command{cmd = <<"UNSUBSCRIBE">>}, _State) -> throw(not_in_multi);
+queue(#edis_command{cmd = <<"PSUBSCRIBE">>}, _State) -> throw(not_in_multi);
+queue(#edis_command{cmd = <<"PUNSUBSCRIBE">>}, _State) -> throw(not_in_multi);
queue(#edis_command{cmd = <<"DISCARD">>}, State) ->
tcp_ok(State#state{multi_queue = undefined});
queue(C = #edis_command{cmd = <<"EXEC">>}, State) ->
View
134 src/edis_pubsub.erl
@@ -0,0 +1,134 @@
+%%%-------------------------------------------------------------------
+%%% @author Fernando Benavides <fernando.benavides@inakanetworks.com>
+%%% @author Chad DePue <chad@inakanetworks.com>
+%%% @copyright (C) 2011 InakaLabs SRL
+%%% @doc edis PubSub Monitor
+%%% @end
+%%%-------------------------------------------------------------------
+-module(edis_pubsub).
+-author('Fernando Benavides <fernando.benavides@inakanetworks.com>').
+-author('Chad DePue <chad@inakanetworks.com>').
+
+-include("edis.hrl").
+
+-behaviour(gen_event).
+
+-export([subscribe_channel/1, subscribe_pattern/1,
+ unsubscribe_channel/1, unsubscribe_pattern/1,
+ publish/2]).
+-export([start_link/1, init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+
+-record(state, {client :: pid(),
+ type :: pattern | channel,
+ value :: binary() | re:mp()}).
+-type state() :: #state{}.
+
+%% ====================================================================
+%% External functions
+%% ====================================================================
+%% @doc Starts the event manager
+-spec start_link(edis_pubsub_channel | edis_pubsub_pattern) -> {ok, pid()} | {error, term()}.
+start_link(Name) ->
+ gen_event:start_link({local, Name}).
+
+%% @doc Subscribes client to channel.
+%% From this point on, all events on that channel will be notified to the client procedure using
+%% Erlang messages in the form of #edis_message{}.
+%% If the event handler later is deleted, the event manager sends a message {gen_event_EXIT,Handler,Reason} to the calling process. Reason is one of the following:
+%%
+%% normal, if the event handler has been removed due to a call to delete_handler/3, or remove_handler has been returned by a callback function (see below).
+%% shutdown, if the event handler has been removed because the event manager is terminating.
+%% {swapped,NewHandler,Pid}, if the process Pid has replaced the event handler with another event handler NewHandler using a call to swap_handler/3 or swap_sup_handler/3.
+%% a term, if the event handler is removed due to an error. Which term depends on the error.
+%% @end
+-spec subscribe_channel(binary()) -> ok.
+subscribe_channel(Channel) ->
+ Subscription = {channel, Channel, self()},
+ gen_event:add_sup_handler(edis_pubsub_channel, {?MODULE, Subscription}, Subscription).
+
+%% @doc Subscribes client to pattern.
+%% From this point on, all events on channels that match the pattern will be notified to the client procedure using
+%% Erlang messages in the form of #edis_message{}.
+%% If the event handler later is deleted, the event manager sends a message {gen_event_EXIT,Handler,Reason} to the calling process. Reason is one of the following:
+%%
+%% normal, if the event handler has been removed due to a call to delete_handler/3, or remove_handler has been returned by a callback function (see below).
+%% shutdown, if the event handler has been removed because the event manager is terminating.
+%% {swapped,NewHandler,Pid}, if the process Pid has replaced the event handler with another event handler NewHandler using a call to swap_handler/3 or swap_sup_handler/3.
+%% a term, if the event handler is removed due to an error. Which term depends on the error.
+%% @end
+-spec subscribe_pattern(binary()) -> ok.
+subscribe_pattern(Pattern) ->
+ Subscription = {pattern, Pattern, self()},
+ gen_event:add_sup_handler(edis_pubsub_pattern, {?MODULE, Subscription}, Subscription).
+
+%% @doc Unsubscribes client from channel.
+-spec unsubscribe_channel(binary()) -> ok.
+unsubscribe_channel(Channel) ->
+ Subscription = {channel, Channel, self()},
+ gen_event:delete_handler(edis_pubsub_channel, {?MODULE, Subscription}, normal).
+
+%% @doc Unsubscribes client from pattern.
+-spec unsubscribe_pattern(binary()) -> ok.
+unsubscribe_pattern(Pattern) ->
+ Subscription = {pattern, Pattern, self()},
+ gen_event:delete_handler(edis_pubsub_pattern, {?MODULE, Subscription}, normal).
+
+%% @doc Notifies an event.
+%% Returns the number of clients that received the message.
+-spec publish(binary(), binary()) -> non_neg_integer().
+publish(Channel, Message) ->
+ ok = gen_event:notify(edis_pubsub_channel, #edis_message{channel = Channel, message = Message}),
+ ok = gen_event:notify(edis_pubsub_pattern, #edis_message{channel = Channel, message = Message}),
+ %%TODO: Optimize this
+ Channels =
+ erlang:length(
+ [x || {channel, Ch, _} <- gen_event:which_handlers(edis_pubsub_channel), Ch =:= Channel]),
+ Patterns =
+ erlang:length(
+ [x || {pattern, Re, _} <- gen_event:which_handlers(edis_pubsub_pattern), re:run(Channel, Re) =/= nomatch]),
+ Channels + Patterns.
+
+%% ====================================================================
+%% Server functions
+%% ====================================================================
+%% @hidden
+-spec init({channel|pattern, binary(), pid()}) -> {ok, state()}.
+init({Type, Value, Client}) ->
+ {ok, #state{client = Client,
+ value = case Type of
+ channel -> Value;
+ pattern -> re:compile(Value)
+ end,
+ type = Type}}.
+
+%% @hidden
+-spec handle_event(term(), state()) -> {ok, state()}.
+handle_event(Message = #edis_message{channel = Channel}, State = #state{type = channel, value = Channel}) ->
+ State#state.client ! Message,
+ {ok, State};
+handle_event(Message = #edis_message{}, State = #state{type = pattern}) ->
+ case re:run(Message#edis_message.channel, State#state.value) of
+ nomatch -> {ok, State};
+ _ ->
+ State#state.client ! Message,
+ {ok, State}
+ end;
+handle_event(_Message, State) ->
+ {ok, State}.
+
+%% @hidden
+-spec handle_call(term(), state()) -> {ok, ok, state()}.
+handle_call(_Request, State) -> {ok, ok, State}.
+%% @hidden
+-spec handle_info(term(), state()) -> {ok, state()}.
+handle_info(Info, State) ->
+ ?WARN("Unexpected Info:~n\t~p~n", [Info]),
+ {ok, State}.
+
+%% @hidden
+-spec terminate(term(), state()) -> ok.
+terminate(_Reason, _State) -> ok.
+
+%% @hidden
+-spec code_change(term(), state(), term()) -> {ok, state()}.
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
View
43 src/edis_pubsub_sup.erl
@@ -0,0 +1,43 @@
+%%%-------------------------------------------------------------------
+%%% @author Fernando Benavides <fernando.benavides@inakanetworks.com>
+%%% @author Chad DePue <chad@inakanetworks.com>
+%%% @copyright (C) 2011 InakaLabs SRL
+%%% @doc Edis Pub/Sub supervisor
+%%% @end
+%%%-------------------------------------------------------------------
+-module(edis_pubsub_sup).
+-author('Fernando Benavides <fernando.benavides@inakanetworks.com>').
+-author('Chad DePue <chad@inakanetworks.com>').
+
+-include("edis.hrl").
+
+-behaviour(supervisor).
+
+-export([start_link/0, reload/0, init/1]).
+
+%% ====================================================================
+%% External functions
+%% ====================================================================
+%% @doc Starts the supervisor process
+-spec start_link() -> ignore | {error, term()} | {ok, pid()}.
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% @doc Reloads configuration. Restarts the managers
+-spec reload() -> ok.
+reload() ->
+ true = exit(erlang:whereis(?MODULE), kill),
+ ok.
+
+%% ====================================================================
+%% Server functions
+%% ====================================================================
+%% @hidden
+-spec init([]) -> {ok, {{one_for_one, 5, 10}, [supervisor:child_spec()]}}.
+init([]) ->
+ ?INFO("Pub/Sub supervisor initialized~n", []),
+ ChannelMgr = {edis_pubsub_channel, {edis_pubsub, start_link, [edis_pubsub_channel]},
+ permanent, brutal_kill, worker, [edis_pubsub]},
+ PatternMgr = {edis_pubsub_pattern, {edis_pubsub, start_link, [edis_pubsub_pattern]},
+ permanent, brutal_kill, worker, [edis_pubsub]},
+ {ok, {{one_for_one, 5, 1}, [ChannelMgr, PatternMgr]}}.
View
4 src/edis_sup.erl
@@ -33,4 +33,6 @@ init([]) ->
permanent, 1000, supervisor, [edis_client_sup]},
DbSup = {edis_db_sup, {edis_db_sup, start_link, []},
permanent, 1000, supervisor, [edis_db_sup]},
- {ok, {{one_for_one, 5, 10}, [DbSup, ClientSup, ListenerSup]}}.
+ PubSubSup = {edis_pubsub_sup, {edis_pubsub_sup, start_link, []},
+ permanent, 1000, supervisor, [edis_pubsub_sup]},
+ {ok, {{one_for_one, 5, 10}, [PubSubSup, DbSup, ClientSup, ListenerSup]}}.

0 comments on commit c1504fd

Please sign in to comment.