Permalink
Browse files

PUB/SUB commands ready

  • Loading branch information...
1 parent c1504fd commit a3ef586914fc54e853caa677c98bc8d14db01cac Fernando 'Brujo' Benavides committed Oct 25, 2011
Showing with 174 additions and 124 deletions.
  1. +2 −0 README.md
  2. +144 −49 src/edis_command_runner.erl
  3. +26 −70 src/edis_pubsub.erl
  4. +2 −5 src/edis_pubsub_sup.erl
View
2 README.md
@@ -11,6 +11,8 @@ Just run `$ make run` and open connections with your favourite redis client.
- cross-db commands (i.e. _FLUSHALL_, _SELECT_, _MOVE_)
- non-db commands (i.e _AUTH_, _CONFIG *_, _SHUTDOWN_, _MONITOR_)
- pub/sub commands (i.e. _PUBLISH_, _SUBSCRIBE_, _UNSUBSCRIBE_, _PSUBSCRIBE_, _PUNSUBSCRIBE_)
+* _(P)UNSUBSCRIBE_ commands are not allowed outside _PUBSUB_ mode
+* _PUBLISH_ response is not precise: it's the amount of all clients subscribed to any channel and/or pattern, not just those that will handle the message. On the other hand it runs in _O(1)_ because it's asynchronous, it just dispatches the message.
### Missing Features
* Dynamic node configuration (i.e. the _SLAVEOF_ command is not implemented)
View
193 src/edis_command_runner.erl
@@ -16,13 +16,14 @@
-include("edis.hrl").
--record(state, {socket :: port(),
- db = edis_db:process(0) :: atom(),
- db_index = 0 :: non_neg_integer(),
- peerport :: pos_integer(),
- authenticated = false :: boolean(),
- multi_queue = undefined :: undefined | [{binary(), [binary()]}],
- watched_keys = [] :: [{binary(), undefined | non_neg_integer()}]}).
+-record(state, {socket :: port(),
+ db = edis_db:process(0) :: atom(),
+ db_index = 0 :: non_neg_integer(),
+ peerport :: pos_integer(),
+ authenticated = false :: boolean(),
+ multi_queue = undefined :: undefined | [{binary(), [binary()]}],
+ watched_keys = [] :: [{binary(), undefined | non_neg_integer()}],
+ subscriptions = undefined :: undefined | {gb_set(), gb_set()}}).
-opaque state() :: #state{}.
-export([start_link/1, stop/1, err/2, run/3]).
@@ -72,7 +73,7 @@ init(Socket) ->
handle_call(X, _From, State) -> {stop, {unexpected_request, X}, {unexpected_request, X}, State}.
%% @hidden
--spec handle_cast(stop | {err, binary()} | {run, binary(), [binary()]}, state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec handle_cast(stop | {err, binary()} | {run, binary(), [binary()]}, state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast({err, Message}, State) ->
@@ -84,9 +85,11 @@ handle_cast({run, Cmd, Args}, State) ->
args = Args},
Command = parse_command(OriginalCommand),
ok = edis_db_monitor:notify(OriginalCommand),
- case State#state.multi_queue of
- undefined -> run(Command, State);
- _InMulti -> queue(Command, State)
+ case {State#state.multi_queue, State#state.subscriptions} of
+ {undefined, undefined} -> run(Command, State);
+ {undefined, _InPubSub} -> pubsub(Command, State);
+ {_InMulti, undefined} -> queue(Command, State);
+ {_InMulti, _InPubSub} -> throw(invalid_context)
end
catch
_:timeout ->
@@ -101,6 +104,25 @@ handle_cast({run, Cmd, Args}, State) ->
%% @hidden
-spec handle_info(term(), state()) -> {noreply, state(), hibernate}.
+handle_info(#edis_message{} = Message, State = #state{subscriptions = undefined}) ->
+ ?WARN("Unexpected message: ~p~n", [Message]),
+ {noreply, State, hibernate};
+handle_info(#edis_message{} = Message, State) ->
+ {ChannelSet, PatternSet} = State#state.subscriptions,
+ case gb_sets:is_member(Message#edis_message.channel, ChannelSet) of
+ true ->
+ tcp_multi_bulk([<<"message">>, Message#edis_message.channel, Message#edis_message.message], State);
+ false ->
+ case gb_sets:fold(fun(_, true) -> true;
+ (Pattern, false) ->
+ re:run(Message#edis_message.channel, Pattern) /= nomatch
+ end, false, PatternSet) of
+ true ->
+ tcp_multi_bulk([<<"pmessage">>, Message#edis_message.channel, Message#edis_message.message], State);
+ false ->
+ {noreply, State, hibernate}
+ end
+ end;
handle_info(#edis_command{db = 0} = Command, State) ->
tcp_string(io_lib:format("~p ~s ~s", [Command#edis_command.timestamp,
Command#edis_command.cmd,
@@ -537,10 +559,12 @@ parse_command(C = #edis_command{cmd = <<"SHUTDOWN">>, args = []}) -> C#edis_comm
parse_command(#edis_command{cmd = <<"SHUTDOWN">>}) -> throw(bad_arg_num);
%% -- Pub/Sub --------------------------------------------------------------------------------------
parse_command(#edis_command{cmd = <<"PSUBSCRIBE">>, args = []}) -> throw(bad_arg_num);
-parse_command(C = #edis_command{cmd = <<"PSUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
+parse_command(C = #edis_command{cmd = <<"PSUBSCRIBE">>, args = Patterns}) ->
+ C#edis_command{args = lists:map(fun edis_util:glob_to_re/1, Patterns), result_type=number,group=pubsub};
parse_command(C = #edis_command{cmd = <<"PUBLISH">>, args = [_Channel, _Message]}) -> C#edis_command{result_type=number,group=pubsub};
parse_command(#edis_command{cmd = <<"PUBLISH">>}) -> throw(bad_arg_num);
-parse_command(C = #edis_command{cmd = <<"PUNSUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
+parse_command(C = #edis_command{cmd = <<"PUNSUBSCRIBE">>, args = Patterns}) ->
+ C#edis_command{args = lists:map(fun edis_util:glob_to_re/1, Patterns), result_type=number,group=pubsub};
parse_command(#edis_command{cmd = <<"SUBSCRIBE">>, args = []}) -> throw(bad_arg_num);
parse_command(C = #edis_command{cmd = <<"SUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
parse_command(C = #edis_command{cmd = <<"UNSUBSCRIBE">>}) -> C#edis_command{result_type=number,group=pubsub};
@@ -559,11 +583,11 @@ parse_command(#edis_command{cmd = <<"SLOWLOG">>}) -> throw(unsupported);
parse_command(#edis_command{cmd = <<"SLAVEOF">>}) -> throw(unsupported);
parse_command(_Command) -> throw(unknown_command).
--spec run(#edis_command{}, state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec run(#edis_command{}, state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
%% -- Commands that don't require authorization ----------------------------------------------------
run(#edis_command{cmd = <<"QUIT">>}, State) ->
case tcp_ok(State) of
- {noreply, NewState} -> {stop, normal, NewState};
+ {noreply, NewState, hibernate} -> {stop, normal, NewState};
Error -> Error
end;
run(#edis_command{cmd = <<"AUTH">>, args = [Password]}, State) ->
@@ -630,16 +654,17 @@ run(C = #edis_command{cmd = <<"WATCH">>, args = Keys}, State) ->
end, State#state.watched_keys, Keys),
tcp_ok(State#state{watched_keys = NewWatchedKeys});
%% -- Pub/Sub commands -----------------------------------------------------------------------------
-run(C = #edis_command{cmd = <<"PSUBSCRIBE">>, args = Patterns}, State) ->
- throw(unsupported);
+run(C = #edis_command{cmd = <<"PSUBSCRIBE">>}, State) ->
+ ok = edis_pubsub:add_sup_handler(),
+ pubsub(C, State#state{subscriptions = {gb_sets:empty(), gb_sets:empty()}});
run(#edis_command{cmd = <<"PUBLISH">>, args = [Channel, Message]}, State) ->
- tcp_number(edis_pubsub:publish(Channel, Message), State);
-run(C = #edis_command{cmd = <<"PUNSUBSCRIBE">>, args = Patterns}, State) ->
- throw(unsupported);
+ ok = edis_pubsub:notify(#edis_message{channel = Channel, message = Message}),
+ tcp_number(edis_pubsub:count_handlers(), State);
+run(#edis_command{cmd = <<"PUNSUBSCRIBE">>}, _State) -> throw(out_of_pubsub);
run(C = #edis_command{cmd = <<"SUBSCRIBE">>}, State) ->
- throw(unsupported);
-run(C = #edis_command{cmd = <<"UNSUBSCRIBE">>}, State) ->
- throw(unsupported);
+ ok = edis_pubsub:add_sup_handler(),
+ pubsub(C, State#state{subscriptions = {gb_sets:empty(), gb_sets:empty()}});
+run(#edis_command{cmd = <<"UNSUBSCRIBE">>}, _State) -> throw(out_of_pubsub);
%% -- All the other commands -----------------------------------------------------------------------
run(C = #edis_command{result_type = ResType, timeout = Timeout}, State) ->
Res = case Timeout of
@@ -660,10 +685,10 @@ run(C = #edis_command{result_type = ResType, timeout = Timeout}, State) ->
tcp_zrange(Res, ShowScores, Limit, State)
end.
--spec queue(#edis_command{}, state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec queue(#edis_command{}, state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
queue(#edis_command{cmd = <<"QUIT">>}, State) -> %% User may quit even on MULTI block
case tcp_ok(State) of
- {noreply, NewState} -> {stop, normal, NewState};
+ {noreply, NewState, hibernate} -> {stop, normal, NewState};
Error -> Error
end;
queue(#edis_command{cmd = <<"MULTI">>}, _State) -> throw(nested);
@@ -705,69 +730,137 @@ queue(C = #edis_command{cmd = <<"EXEC">>}, State) ->
queue(C, State) ->
tcp_string("QUEUED", State#state{multi_queue = [C|State#state.multi_queue]}).
+-spec pubsub(#edis_command{}, state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
+pubsub(#edis_command{cmd = <<"QUIT">>}, State) -> %% User may quit even on PUBSUB block
+ case tcp_ok(State) of
+ {noreply, NewState, hibernate} -> {stop, normal, NewState};
+ Error -> Error
+ end;
+pubsub(#edis_command{cmd = <<"PSUBSCRIBE">>, args = Patterns}, State) ->
+ Subscriptons =
+ lists:foldl(
+ fun(Pattern, {AccChannelSet, AccPatternSet}) ->
+ NextPatternSet = gb_sets:add_element(Pattern, AccPatternSet),
+ tcp_multi_bulk([<<"psubscribe">>, Pattern,
+ gb_sets:size(AccChannelSet) + gb_sets:size(NextPatternSet)], State),
+ {AccChannelSet, NextPatternSet}
+ end, State#state.subscriptions, Patterns),
+ {noreply, State#state{subscriptions = Subscriptons}, hibernate};
+pubsub(C = #edis_command{cmd = <<"PUNSUBSCRIBE">>, args = []}, State) ->
+ {_ChannelSet, PatternSet} = State#state.subscriptions,
+ pubsub(C#edis_command{args = gb_sets:to_list(PatternSet)}, State);
+pubsub(#edis_command{cmd = <<"PUNSUBSCRIBE">>, args = Patterns}, State) ->
+ {ChannelSet, PatternSet} =
+ lists:foldl(
+ fun(Pattern, {AccChannelSet, AccPatternSet}) ->
+ NextPatternSet = gb_sets:del_element(Pattern, AccPatternSet),
+ tcp_multi_bulk([<<"punsubscribe">>, Pattern,
+ gb_sets:size(AccChannelSet) + gb_sets:size(NextPatternSet)], State),
+ {AccChannelSet, NextPatternSet}
+ end, State#state.subscriptions, Patterns),
+ case gb_sets:size(ChannelSet) + gb_sets:size(PatternSet) of
+ 0 ->
+ ok = edis_pubsub:delete_handler(),
+ {noreply, State#state{subscriptions = undefined}, hibernate};
+ _ ->
+ {noreply, State#state{subscriptions = {ChannelSet, PatternSet}}, hibernate}
+ end;
+pubsub(#edis_command{cmd = <<"SUBSCRIBE">>, args = Channels}, State) ->
+ Subscriptons =
+ lists:foldl(
+ fun(Channel, {AccChannelSet, AccPatternSet}) ->
+ NextChannelSet = gb_sets:add_element(Channel, AccChannelSet),
+ tcp_multi_bulk([<<"subscribe">>, Channel,
+ gb_sets:size(NextChannelSet) + gb_sets:size(AccPatternSet)], State),
+ {NextChannelSet, AccPatternSet}
+ end, State#state.subscriptions, Channels),
+ {noreply, State#state{subscriptions = Subscriptons}, hibernate};
+pubsub(C = #edis_command{cmd = <<"UNSUBSCRIBE">>, args = []}, State) ->
+ {ChannelSet, _PatternSet} = State#state.subscriptions,
+ pubsub(C#edis_command{args = gb_sets:to_list(ChannelSet)}, State);
+pubsub(#edis_command{cmd = <<"UNSUBSCRIBE">>, args = Channels}, State) ->
+ {ChannelSet, PatternSet} =
+ lists:foldl(
+ fun(Channel, {AccChannelSet, AccPatternSet}) ->
+ NextChannelSet = gb_sets:del_element(Channel, AccChannelSet),
+ tcp_multi_bulk([<<"unsubscribe">>, Channel,
+ gb_sets:size(NextChannelSet) + gb_sets:size(AccPatternSet)], State),
+ {NextChannelSet, AccPatternSet}
+ end, State#state.subscriptions, Channels),
+ case gb_sets:size(ChannelSet) + gb_sets:size(PatternSet) of
+ 0 ->
+ ok = edis_pubsub:delete_handler(),
+ {noreply, State#state{subscriptions = undefined}, hibernate};
+ _ ->
+ {noreply, State#state{subscriptions = {ChannelSet, PatternSet}}, hibernate}
+ end;
+pubsub(_C, _State) -> throw(not_in_pubsub).
+
%% @private
--spec tcp_multi_result([{edis:result_type() | error, term()} | {error, iodata()}], state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_multi_result([{edis:result_type() | error, term()} | {error, iodata()}], state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_multi_result(Results, State) ->
lists:foldl(
- fun({#edis_command{result_type = ok}, _}, {noreply, AccState}) -> tcp_ok(AccState);
- ({#edis_command{result_type = string}, Res}, {noreply, AccState}) -> tcp_string(Res, AccState);
- ({#edis_command{result_type = bulk}, Res}, {noreply, AccState}) -> tcp_bulk(Res, AccState);
- ({#edis_command{result_type = multi_bulk}, Res}, {noreply, AccState}) -> tcp_multi_bulk(Res, AccState);
- ({#edis_command{result_type = number}, Res}, {noreply, AccState}) -> tcp_number(Res, AccState);
- ({#edis_command{result_type = boolean}, Res}, {noreply, AccState}) -> tcp_boolean(Res, AccState);
- ({#edis_command{result_type = float}, Res}, {noreply, AccState}) -> tcp_float(Res, AccState);
- ({C = #edis_command{result_type = zrange}, Res}, {noreply, AccState}) ->
+ fun({#edis_command{result_type = ok}, _}, {noreply, AccState, hibernate}) -> tcp_ok(AccState);
+ ({#edis_command{result_type = string}, Res}, {noreply, AccState, hibernate}) -> tcp_string(Res, AccState);
+ ({#edis_command{result_type = bulk}, Res}, {noreply, AccState, hibernate}) -> tcp_bulk(Res, AccState);
+ ({#edis_command{result_type = multi_bulk}, Res}, {noreply, AccState, hibernate}) -> tcp_multi_bulk(Res, AccState);
+ ({#edis_command{result_type = number}, Res}, {noreply, AccState, hibernate}) -> tcp_number(Res, AccState);
+ ({#edis_command{result_type = boolean}, Res}, {noreply, AccState, hibernate}) -> tcp_boolean(Res, AccState);
+ ({#edis_command{result_type = float}, Res}, {noreply, AccState, hibernate}) -> tcp_float(Res, AccState);
+ ({C = #edis_command{result_type = zrange}, Res}, {noreply, AccState, hibernate}) ->
[_Key, _Min, _Max, ShowScores, Limit] = C#edis_command.args,
tcp_zrange(Res, ShowScores, Limit, AccState);
- ({error, Err}, {noreply, AccState}) -> tcp_err(Err, AccState);
+ ({error, Err}, {noreply, AccState, hibernate}) -> tcp_err(Err, AccState);
(_Result, Error) -> Error
end, tcp_send(["*", integer_to_list(erlang:length(Results))], State), Results).
%% @private
--spec tcp_boolean(boolean(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_boolean(boolean(), state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_boolean(true, State) -> tcp_number(1, State);
tcp_boolean(false, State) -> tcp_number(0, State).
%% @private
--spec tcp_sort(undefined | pos_integer() | [binary()], state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_sort(undefined | pos_integer() | [binary()], state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_sort(Number, State) when is_integer(Number) -> tcp_number(Number, State);
tcp_sort(Lines, State) -> tcp_multi_bulk(Lines, State).
%% @private
--spec tcp_multi_bulk(undefined | [binary()], state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_multi_bulk(undefined | [binary() | float() | integer()], state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_multi_bulk(undefined, State) ->
tcp_bulk(undefined, State);
tcp_multi_bulk(Lines, State) ->
lists:foldl(
- fun(Float, {noreply, AccState}) when is_float(Float) ->
+ fun(Float, {noreply, AccState, hibernate}) when is_float(Float) ->
tcp_float(Float, AccState);
- (Line, {noreply, AccState}) ->
+ (Integer, {noreply, AccState, hibernate}) when is_integer(Integer) ->
+ tcp_number(Integer, AccState);
+ (Line, {noreply, AccState, hibernate}) ->
tcp_bulk(Line, AccState);
(_Line, Error) ->
Error
end, tcp_send(["*", integer_to_list(erlang:length(Lines))], State), Lines).
%% @private
--spec tcp_bulk(undefined | iodata(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_bulk(undefined | iodata(), state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_bulk(undefined, State) ->
tcp_send("$-1", State);
tcp_bulk(<<>>, State) ->
tcp_send("$0\r\n", State);
tcp_bulk(Message, State) ->
case tcp_send(["$", integer_to_list(iolist_size(Message))], State) of
- {noreply, NewState} -> tcp_send(Message, NewState);
+ {noreply, NewState, hibernate} -> tcp_send(Message, NewState);
Error -> Error
end.
%% @private
--spec tcp_number(undefined | integer(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_number(undefined | integer(), state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_number(undefined, State) ->
tcp_bulk(undefined, State);
tcp_number(Number, State) ->
tcp_send([":", integer_to_list(Number)], State).
%% @private
--spec tcp_float(undefined | float(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_float(undefined | float(), state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_float(undefined, State) ->
tcp_bulk(undefined, State);
tcp_float(Float, State) ->
@@ -777,28 +870,28 @@ tcp_float(Float, State) ->
end.
%% @private
--spec tcp_err(binary(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_err(binary(), state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_err(Message, State) ->
tcp_send(["-ERR ", Message], State).
%% @private
--spec tcp_ok(state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_ok(state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_ok(State) ->
tcp_string("OK", State).
%% @private
--spec tcp_string(binary(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_string(binary(), state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_string(Message, State) ->
tcp_send(["+", Message], State).
%% @private
--spec tcp_send(iodata(), state()) -> {noreply, state()} | {stop, normal | {error, term()}, state()}.
+-spec tcp_send(iodata(), state()) -> {noreply, state(), hibernate} | {stop, normal | {error, term()}, state()}.
tcp_send(Message, State) ->
?CDEBUG(data, "~p << ~s~n", [State#state.peerport, Message]),
try gen_tcp:send(State#state.socket, [Message, "\r\n"]) of
ok ->
- {noreply, State};
+ {noreply, State, hibernate};
{error, closed} ->
?DEBUG("Connection closed~n", []),
{stop, normal, State};
@@ -935,6 +1028,8 @@ parse_error(Cmd, nested) -> <<Cmd/binary, " calls can not be nested">>;
parse_error(Cmd, out_of_multi) -> <<Cmd/binary, " without MULTI">>;
parse_error(Cmd, not_in_multi) -> <<Cmd/binary, " inside MULTI is not allowed">>;
parse_error(_Cmd, db_in_multi) -> <<"Transactions may include just one database">>;
+parse_error(Cmd, out_of_pubsub) -> <<Cmd/binary, " outside PUBSUB mode is not allowed">>;
+parse_error(_Cmd, not_in_pubsub) -> <<"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context">>;
parse_error(Cmd, unknown_command) -> <<"unknown command '", Cmd/binary, "'">>;
parse_error(_Cmd, no_such_key) -> <<"no such key">>;
parse_error(_Cmd, syntax) -> <<"syntax error">>;
View
96 src/edis_pubsub.erl
@@ -13,10 +13,8 @@
-behaviour(gen_event).
--export([subscribe_channel/1, subscribe_pattern/1,
- unsubscribe_channel/1, unsubscribe_pattern/1,
- publish/2]).
--export([start_link/1, init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+-export([start_link/0, add_sup_handler/0, delete_handler/0, count_handlers/0, notify/1]).
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {client :: pid(),
type :: pattern | channel,
@@ -27,12 +25,12 @@
%% External functions
%% ====================================================================
%% @doc Starts the event manager
--spec start_link(edis_pubsub_channel | edis_pubsub_pattern) -> {ok, pid()} | {error, term()}.
-start_link(Name) ->
- gen_event:start_link({local, Name}).
+-spec start_link() -> {ok, pid()} | {error, term()}.
+start_link() ->
+ gen_event:start_link({local, ?MODULE}).
-%% @doc Subscribes client to channel.
-%% From this point on, all events on that channel will be notified to the client procedure using
+%% @doc Subscribes client.
+%% From this point on, all db operations will be notified to the client procedure using
%% Erlang messages in the form of #edis_message{}.
%% If the event handler later is deleted, the event manager sends a message {gen_event_EXIT,Handler,Reason} to the calling process. Reason is one of the following:
%%
@@ -41,79 +39,37 @@ start_link(Name) ->
%% {swapped,NewHandler,Pid}, if the process Pid has replaced the event handler with another event handler NewHandler using a call to swap_handler/3 or swap_sup_handler/3.
%% a term, if the event handler is removed due to an error. Which term depends on the error.
%% @end
--spec subscribe_channel(binary()) -> ok.
-subscribe_channel(Channel) ->
- Subscription = {channel, Channel, self()},
- gen_event:add_sup_handler(edis_pubsub_channel, {?MODULE, Subscription}, Subscription).
+-spec add_sup_handler() -> ok.
+add_sup_handler() ->
+ Self = self(),
+ gen_event:add_sup_handler(?MODULE, {?MODULE, Self}, Self).
-%% @doc Subscribes client to pattern.
-%% From this point on, all events on channels that match the pattern will be notified to the client procedure using
-%% Erlang messages in the form of #edis_message{}.
-%% If the event handler later is deleted, the event manager sends a message {gen_event_EXIT,Handler,Reason} to the calling process. Reason is one of the following:
-%%
-%% normal, if the event handler has been removed due to a call to delete_handler/3, or remove_handler has been returned by a callback function (see below).
-%% shutdown, if the event handler has been removed because the event manager is terminating.
-%% {swapped,NewHandler,Pid}, if the process Pid has replaced the event handler with another event handler NewHandler using a call to swap_handler/3 or swap_sup_handler/3.
-%% a term, if the event handler is removed due to an error. Which term depends on the error.
-%% @end
--spec subscribe_pattern(binary()) -> ok.
-subscribe_pattern(Pattern) ->
- Subscription = {pattern, Pattern, self()},
- gen_event:add_sup_handler(edis_pubsub_pattern, {?MODULE, Subscription}, Subscription).
-
-%% @doc Unsubscribes client from channel.
--spec unsubscribe_channel(binary()) -> ok.
-unsubscribe_channel(Channel) ->
- Subscription = {channel, Channel, self()},
- gen_event:delete_handler(edis_pubsub_channel, {?MODULE, Subscription}, normal).
+%% @doc Unsubscribes client.
+-spec delete_handler() -> ok.
+delete_handler() ->
+ gen_event:delete_handler(?MODULE, {?MODULE, self()}, normal).
-%% @doc Unsubscribes client from pattern.
--spec unsubscribe_pattern(binary()) -> ok.
-unsubscribe_pattern(Pattern) ->
- Subscription = {pattern, Pattern, self()},
- gen_event:delete_handler(edis_pubsub_pattern, {?MODULE, Subscription}, normal).
+%% @doc Returns the number of currently active handlers
+-spec count_handlers() -> non_neg_integer().
+count_handlers() ->
+ length(gen_event:which_handlers(?MODULE)).
%% @doc Notifies an event.
-%% Returns the number of clients that received the message.
--spec publish(binary(), binary()) -> non_neg_integer().
-publish(Channel, Message) ->
- ok = gen_event:notify(edis_pubsub_channel, #edis_message{channel = Channel, message = Message}),
- ok = gen_event:notify(edis_pubsub_pattern, #edis_message{channel = Channel, message = Message}),
- %%TODO: Optimize this
- Channels =
- erlang:length(
- [x || {channel, Ch, _} <- gen_event:which_handlers(edis_pubsub_channel), Ch =:= Channel]),
- Patterns =
- erlang:length(
- [x || {pattern, Re, _} <- gen_event:which_handlers(edis_pubsub_pattern), re:run(Channel, Re) =/= nomatch]),
- Channels + Patterns.
+-spec notify(#edis_message{}) -> ok.
+notify(Command) ->
+ gen_event:notify(?MODULE, Command).
%% ====================================================================
%% Server functions
%% ====================================================================
%% @hidden
--spec init({channel|pattern, binary(), pid()}) -> {ok, state()}.
-init({Type, Value, Client}) ->
- {ok, #state{client = Client,
- value = case Type of
- channel -> Value;
- pattern -> re:compile(Value)
- end,
- type = Type}}.
+-spec init(pid()) -> {ok, state()}.
+init(Client) -> {ok, #state{client = Client}}.
%% @hidden
-spec handle_event(term(), state()) -> {ok, state()}.
-handle_event(Message = #edis_message{channel = Channel}, State = #state{type = channel, value = Channel}) ->
- State#state.client ! Message,
- {ok, State};
-handle_event(Message = #edis_message{}, State = #state{type = pattern}) ->
- case re:run(Message#edis_message.channel, State#state.value) of
- nomatch -> {ok, State};
- _ ->
- State#state.client ! Message,
- {ok, State}
- end;
-handle_event(_Message, State) ->
+handle_event(Event, State = #state{client = Client}) ->
+ Client ! Event,
{ok, State}.
%% @hidden
View
7 src/edis_pubsub_sup.erl
@@ -36,8 +36,5 @@ reload() ->
-spec init([]) -> {ok, {{one_for_one, 5, 10}, [supervisor:child_spec()]}}.
init([]) ->
?INFO("Pub/Sub supervisor initialized~n", []),
- ChannelMgr = {edis_pubsub_channel, {edis_pubsub, start_link, [edis_pubsub_channel]},
- permanent, brutal_kill, worker, [edis_pubsub]},
- PatternMgr = {edis_pubsub_pattern, {edis_pubsub, start_link, [edis_pubsub_pattern]},
- permanent, brutal_kill, worker, [edis_pubsub]},
- {ok, {{one_for_one, 5, 1}, [ChannelMgr, PatternMgr]}}.
+ Mgr = {edis_pubsub, {edis_pubsub, start_link, []}, permanent, brutal_kill, worker, [edis_pubsub]},
+ {ok, {{one_for_one, 5, 1}, [Mgr]}}.

0 comments on commit a3ef586

Please sign in to comment.