Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'pubsub-2' into pubsub

  • Loading branch information...
commit f1401dcb8b1e5cd336af509497611471362485ac 2 parents 1289cda + 6dd05b9
Knut Nesheim authored
View
1  .gitignore
@@ -1,2 +1,3 @@
+tests
ebin/*
.eunit
View
4 README.md
@@ -2,8 +2,8 @@
Non-blocking Redis client with a focus on performance and robustness.
-It supports authentication, choosing a specific database, transactions
-and pipelining.
+It supports authentication, choosing a specific database, transactions,
+pipelining, and pubsub.
## Example
View
52 src/eredis.erl
@@ -16,7 +16,9 @@
-define(TIMEOUT, 5000).
-export([start_link/0, start_link/1, start_link/2, start_link/3, start_link/4,
- start_link/5, q/2, q/3, qp/2, qp/3]).
+ start_link/5, q/2, q/3, qp/2, qp/3,
+ controlling_process/1, controlling_process/2, controlling_process/3,
+ ack_message/1]).
%% Exported for testing
-export([create_multibulk/1]).
@@ -84,6 +86,54 @@ qp(Client, Pipeline, Timeout) ->
pipeline(Client, Pipeline, Timeout).
+-spec controlling_process(Client::pid()) -> ok.
+%% @doc: Make the calling process the controlling process. The
+%% controlling process received pubsub-related messages, of which
+%% there are three kinds. In each message, the pid refers to the
+%% eredis client process.
+%%
+%% {message, Channel::binary(), Message::binary(), pid()}
+%% This is sent for each pubsub message received by the client.
+%%
+%% {eredis_disconnected, pid()}
+%% This is sent when the eredis client is disconnected from redis.
+%%
+%% {eredis_connected, pid()}
+%% This is sent when the eredis client reconnects to redis after
+%% an existing connection was disconnected.
+%%
+%% Note that you must still issue SUBSCRIBE or PSUBSCRIBE redis
+%% commands to receive pubsub messages. Also, once you issue a
+%% SUBSCRIBE or PSUBSCRIBE command, that eredis client may only be
+%% used to add or remove pubsub subscriptions and to receive pubsub
+%% messages. That is how Redis pubsub works, it is not an artifact
+%% of eredis.
+%%
+%% Any message of the form {message, _, _, _} must be acknowledged
+%% before any subsequent message of the same form is sent. This
+%% prevents the controlling process from being overrun with redis
+%% pubsub messages. See ack_message/2 below.
+controlling_process(Client) ->
+ controlling_process(Client, self()).
+
+-spec controlling_process(Client::pid(), Pid::pid()) -> ok.
+%% @doc: Make the given process (pid) the controlling process.
+controlling_process(Client, Pid) ->
+ controlling_process(Client, Pid, ?TIMEOUT).
+
+%% @doc: Make the given process (pid) the controlling process subscriber
+%% with the given Timeout.
+controlling_process(Client, Pid, Timeout) ->
+ gen_server:call(Client, {controlling_process, Pid}, Timeout).
+
+
+-spec ack_message(Client::pid()) -> ok.
+%% @doc: acknowledge the receipt of a pubsub message. each pubsub
+%% message must be acknowledged before the next one is received
+ack_message(Client) ->
+ gen_server:cast(Client, {ack_message, self()}).
+
+
%%
%% INTERNAL HELPERS
%%
View
129 src/eredis_client.erl
@@ -7,7 +7,7 @@
%%
%% The client works like this:
%% * When starting up, we connect to Redis with the given connection
-%% information, or fail.
+%% information, or fail.
%% * Users calls us using gen_server:call, we send the request to Redis,
%% add the calling process at the end of the queue and reply with
%% noreply. We are then free to handle new requests and may reply to
@@ -19,13 +19,15 @@
%% * For pipeline commands, we include the number of responses we are
%% waiting for in each element of the queue. Responses are queued until
%% we have all the responses we need and then reply with all of them.
+%% * Redis pubsub messages are handled by sending Erlang messages to
+%% all registered subscribers.
%%
-module(eredis_client).
-author('knut.nesheim@wooga.com').
-behaviour(gen_server).
--include("eredis.hrl").
+-include("eredis_priv.hrl").
%% API
-export([start_link/5, stop/1, select_database/2]).
@@ -34,18 +36,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {
- host :: string() | undefined,
- port :: integer() | undefined,
- password :: binary() | undefined,
- database :: binary() | undefined,
- reconnect_sleep :: integer() | undefined,
-
- socket :: port() | undefined,
- parser_state :: #pstate{} | undefined,
- queue :: queue() | undefined
-}).
-
-define(SOCKET_OPTS, [binary, {active, once}, {packet, raw}, {reuseaddr, true}]).
%%
@@ -77,7 +67,8 @@ init([Host, Port, Database, Password, ReconnectSleep]) ->
reconnect_sleep = ReconnectSleep,
parser_state = eredis_parser:init(),
- queue = queue:new()},
+ queue = queue:new(),
+ msg_queue = queue:new()},
case connect(State) of
{ok, NewState} ->
@@ -92,6 +83,9 @@ handle_call({request, Req}, From, State) ->
handle_call({pipeline, Pipeline}, From, State) ->
do_pipeline(Pipeline, From, State);
+handle_call({controlling_process, Pid}, _From, State) ->
+ do_controlling_process(Pid, State);
+
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
@@ -99,13 +93,19 @@ handle_call(_Request, _From, State) ->
{reply, unknown_request, State}.
+handle_cast({ack_message, Pid},
+ #state{controlling_process={_, Pid}} = State) ->
+ {noreply, maybe_send_message(State#state{msg_state=ready})};
+handle_cast({ack_message, _}, State) ->
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
+
%% Receive data from socket, see handle_response/2
handle_info({tcp, _Socket, Bs}, State) ->
- inet:setopts(State#state.socket, [{active, once}]),
- {noreply, handle_response(Bs, State)};
+ NewState = State#state{conn_state=passive},
+ {noreply, handle_response(Bs, update_socket_state(NewState))};
%% Socket got closed, for example by Redis terminating idle
%% clients. Spawn of a new process which will try to reconnect and
@@ -113,6 +113,7 @@ handle_info({tcp, _Socket, Bs}, State) ->
%% an error message to all our clients.
handle_info({tcp_closed, _Socket}, State) ->
Self = self(),
+ send_to_controller({eredis_disconnected, Self}, State),
spawn(fun() -> reconnect_loop(Self, State) end),
%% Throw away the socket and the queue, as we will never get a
@@ -123,7 +124,15 @@ handle_info({tcp_closed, _Socket}, State) ->
%% Redis is ready to accept requests, the given Socket is a socket
%% already connected and authenticated.
handle_info({connection_ready, Socket}, #state{socket = undefined} = State) ->
- {noreply, State#state{socket = Socket}};
+ send_to_controller({eredis_connected, self()}, State),
+ {noreply, State#state{socket = Socket, conn_state = active_once}};
+
+%% Our controlling process is down.
+handle_info({'DOWN', Ref, process, Pid, _Reason},
+ #state{controlling_process={Ref, Pid}} = State) ->
+ {stop, shutdown, State#state{controlling_process=undefined,
+ msg_state=ready,
+ msg_queue=queue:new()}};
%% eredis can be used in Poolboy, but it requires to support a simple API
%% that Poolboy uses to manage the connections.
@@ -158,7 +167,7 @@ do_request(Req, From, State) ->
case gen_tcp:send(State#state.socket, Req) of
ok ->
NewQueue = queue:in({1, From}, State#state.queue),
- {noreply, State#state{queue = NewQueue}};
+ {noreply, update_socket_state(State#state{queue = NewQueue})};
{error, Reason} ->
{reply, {error, Reason}, State}
end.
@@ -174,31 +183,38 @@ do_pipeline(Pipeline, From, State) ->
case gen_tcp:send(State#state.socket, Pipeline) of
ok ->
NewQueue = queue:in({length(Pipeline), From, []}, State#state.queue),
- {noreply, State#state{queue = NewQueue}};
+ {noreply, update_socket_state(State#state{queue = NewQueue})};
{error, Reason} ->
{reply, {error, Reason}, State}
end.
+-spec do_controlling_process(Pid::pid(), #state{}) -> {reply, Reply::{ok, reference()}, #state{}}.
+do_controlling_process(Pid, State) ->
+ case State#state.controlling_process of
+ undefined ->
+ ok;
+ {OldRef, _OldPid} ->
+ erlang:demonitor(OldRef)
+ end,
+ Ref = erlang:monitor(process, Pid),
+ {reply, ok, State#state{controlling_process={Ref, Pid}}}.
+
-spec handle_response(Data::binary(), State::#state{}) -> NewState::#state{}.
%% @doc: Handle the response coming from Redis. This includes parsing
%% and replying to the correct client, handling partial responses,
%% handling too much data and handling continuations.
-handle_response(Data, #state{parser_state = ParserState,
- queue = Queue} = State) ->
-
+handle_response(Data, #state{parser_state = ParserState} = State) ->
case eredis_parser:parse(ParserState, Data) of
%% Got complete response, return value to client
{ReturnCode, Value, NewParserState} ->
- NewQueue = reply({ReturnCode, Value}, Queue),
- State#state{parser_state = NewParserState,
- queue = NewQueue};
+ reply({ReturnCode, Value}, State#state{parser_state=NewParserState});
%% Got complete response, with extra data, reply to client and
%% recurse over the extra data
{ReturnCode, Value, Rest, NewParserState} ->
- NewQueue = reply({ReturnCode, Value}, Queue),
- handle_response(Rest, State#state{parser_state = NewParserState,
- queue = NewQueue});
+ NewState = reply({ReturnCode, Value},
+ State#state{parser_state=NewParserState}),
+ handle_response(Rest, NewState);
%% Parser needs more data, the parser state now contains the
%% continuation data and we will try calling parse again when
@@ -210,17 +226,24 @@ handle_response(Data, #state{parser_state = ParserState,
%% @doc: Sends a value to the first client in queue. Returns the new
%% queue without this client. If we are still waiting for parts of a
%% pipelined request, push the reply to the the head of the queue and
-%% wait for another reply from redis.
-reply(Value, Queue) ->
+%% wait for another reply from redis. Pubsub messages are not part of
+%% the normal reply stream and will instead be sent as Erlang messages
+%% to the controlling process (if any).
+reply({ok, [<<"message">>, Channel, Message]}, State) ->
+ Msg = {message, Channel, Message, self()},
+ MsgQueue = queue:in(Msg, State#state.msg_queue),
+ maybe_send_message(State#state{msg_queue=MsgQueue});
+reply(Value, #state{queue=Queue} = State) ->
case queue:out(Queue) of
{{value, {1, From}}, NewQueue} ->
gen_server:reply(From, Value),
- NewQueue;
+ State#state{queue=NewQueue};
{{value, {1, From, Replies}}, NewQueue} ->
gen_server:reply(From, lists:reverse([Value | Replies])),
- NewQueue;
+ State#state{queue=NewQueue};
{{value, {N, From, Replies}}, NewQueue} when N > 1 ->
- queue:in_r({N - 1, From, [Value | Replies]}, NewQueue);
+ State#state{queue=queue:in_r({N - 1, From, [Value | Replies]},
+ NewQueue)};
{empty, Queue} ->
%% Oops
error_logger:info_msg("Nothing in queue, but got value from parser~n"),
@@ -294,3 +317,41 @@ reconnect_loop(Client, #state{reconnect_sleep=ReconnectSleep}=State) ->
timer:sleep(ReconnectSleep),
reconnect_loop(Client, State)
end.
+
+
+send_to_controller(_Msg, #state{controlling_process=undefined}) ->
+ ok;
+send_to_controller(Msg, #state{controlling_process={_Ref, Pid}}) ->
+ Pid ! Msg.
+
+
+maybe_send_message(#state{controlling_process=undefined} = State) ->
+ State#state{msg_queue=queue:new()};
+maybe_send_message(#state{msg_state=need_ack} = State) ->
+ State;
+maybe_send_message(State) ->
+ case queue:out(State#state.msg_queue) of
+ {empty, _Queue} ->
+ State;
+ {{value, Msg}, Queue} ->
+ send_to_controller(Msg, State),
+ State#state{msg_queue=Queue, msg_state=need_ack}
+ end.
+
+
+update_socket_state(#state{conn_state=active_once} = State) ->
+ State;
+update_socket_state(#state{controlling_process=undefined} = State) ->
+ inet:setopts(State#state.socket, [{active, once}]),
+ State#state{conn_state=active_once};
+update_socket_state(#state{msg_state=ready} = State) ->
+ inet:setopts(State#state.socket, [{active, once}]),
+ State#state{conn_state=active_once};
+update_socket_state(State) ->
+ case queue:is_empty(State#state.queue) of
+ true ->
+ State;
+ false ->
+ inet:setopts(State#state.socket, [{active, once}]),
+ State#state{conn_state=active_once}
+ end.
View
30 src/eredis_priv.hrl
@@ -0,0 +1,30 @@
+-include("eredis.hrl").
+
+-record(state, {
+ host :: string() | undefined,
+ port :: integer() | undefined,
+ password :: binary() | undefined,
+ database :: binary() | undefined,
+ reconnect_sleep :: integer() | undefined,
+
+ socket :: port() | undefined,
+ parser_state :: #pstate{} | undefined,
+ queue :: queue() | undefined,
+
+ % The process we send pubsub and connection state messages to.
+ controlling_process :: undefined | {reference(), pid()},
+
+ % This is the queue of messages to send to the controlling
+ % process.
+ msg_queue :: queue(),
+
+ % The msg_state keeps track of whether we are waiting
+ % for the controlling process to acknowledge the last
+ % message.
+ msg_state = ready :: ready | need_ack,
+
+ % The conn_state keeps track of whether we have set the
+ % tcp connection to {active, once} after receiving some
+ % tcp data.
+ conn_state = passive :: passive | active_once
+}).
View
104 test/eredis_tests.erl
@@ -1,6 +1,7 @@
-module(eredis_tests).
-include_lib("eunit/include/eunit.hrl").
+-include("../src/eredis_priv.hrl").
-import(eredis, [create_multibulk/1]).
@@ -121,3 +122,106 @@ multibulk_test_() ->
?_assertThrow({cannot_store_floats, 123.5},
list_to_binary(create_multibulk(['SET', foo, 123.5])))
].
+
+
+
+pubsub_test() ->
+ Pub = c(),
+ Sub = c(),
+ ok = eredis:controlling_process(Sub),
+ Res1 = eredis:q(Sub, ["SUBSCRIBE", chan]),
+ ?assertEqual({ok, [<<"subscribe">>, <<"chan">>, <<"1">>]}, Res1),
+ Res2 = eredis:q(Pub, ["PUBLISH", chan, msg]),
+ ?assertEqual({ok, <<"1">>}, Res2),
+ Msg = receive
+ {message, _, _, _} = InMsg ->
+ InMsg
+ end,
+ ?assertEqual({message, <<"chan">>, <<"msg">>, Sub}, Msg).
+
+
+pubsub_manage_subscribers_test() ->
+ Pub = c(),
+ Sub = c(),
+ unlink(Sub),
+ eredis:q(Sub, ["SUBSCRIBE", chan]),
+ #state{controlling_process=undefined} = get_state(Sub),
+ S1 = subscriber(Sub),
+ ok = eredis:controlling_process(Sub, S1),
+ #state{controlling_process={_, S1}} = get_state(Sub),
+ S2 = subscriber(Sub),
+ ok = eredis:controlling_process(Sub, S2),
+ #state{controlling_process={_, S2}} = get_state(Sub),
+ eredis:q(Pub, ["PUBLISH", chan, msg1]),
+ S1 ! stop,
+ ok = wait_for_stop(S1),
+ eredis:q(Pub, ["PUBLISH", chan, msg2]),
+ M2 = wait_for_msg(S2),
+ ?assertEqual(M2, {message, <<"chan">>, <<"msg1">>, Sub}),
+ M3 = wait_for_msg(S2),
+ ?assertEqual(M3, {message, <<"chan">>, <<"msg2">>, Sub}),
+ S2 ! stop,
+ ok = wait_for_stop(S2),
+ Ref = erlang:monitor(process, Sub),
+ receive {'DOWN', Ref, process, Sub, _} -> ok end.
+
+
+pubsub_connect_disconnect_messages_test() ->
+ Pub = c(),
+ Sub = c(),
+ eredis:q(Sub, ["SUBSCRIBE", chan]),
+ S = subscriber(Sub),
+ ok = eredis:controlling_process(Sub, S),
+ eredis:q(Pub, ["PUBLISH", chan, msg]),
+ wait_for_msg(S),
+ #state{socket=Sock} = get_state(Sub),
+ gen_tcp:close(Sock),
+ Sub ! {tcp_closed, Sock},
+ M1 = wait_for_msg(S),
+ ?assertEqual({eredis_disconnected, Sub}, M1),
+ M2 = wait_for_msg(S),
+ ?assertEqual({eredis_connected, Sub}, M2).
+
+
+subscriber(Client) ->
+ Test = self(),
+ Pid = spawn(fun () -> subscriber(Client, Test) end),
+ spawn(fun() ->
+ Ref = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ Test ! {stopped, Pid}
+ end
+ end),
+ Pid.
+
+subscriber(Client, Test) ->
+ receive
+ stop ->
+ ok;
+ Msg ->
+ Test ! {got_message, self(), Msg},
+ eredis:ack_message(Client),
+ subscriber(Client, Test)
+ end.
+
+wait_for_msg(Subscriber) ->
+ receive
+ {got_message, Subscriber, Msg} ->
+ Msg
+ end.
+
+wait_for_stop(Subscriber) ->
+ receive
+ {stopped, Subscriber} ->
+ ok
+ end.
+
+get_state(Pid)
+ when is_pid(Pid) ->
+ {status, _, _, [_, _, _, _, State]} = sys:get_status(Pid),
+ get_state(State);
+get_state([{data, [{"State", State}]} | _]) ->
+ State;
+get_state([_|Rest]) ->
+ get_state(Rest).
Please sign in to comment.
Something went wrong with that request. Please try again.