Skip to content

Commit

Permalink
The only way to manage channels is to use the asynchronous eredis_sub…
Browse files Browse the repository at this point in the history
…:subscribe/2 and eredis_sub:unsubscribe/2.
  • Loading branch information
knutin committed Jan 19, 2012
1 parent 5f7418f commit 111a702
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 50 deletions.
18 changes: 10 additions & 8 deletions src/eredis_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,32 @@
%% Specified in http://www.erlang.org/doc/man/gen_server.html#call-3
-define(TIMEOUT, 5000).

-export([start_link/1, start_link/4, start_link/7, stop/1, receiver/1, sub_test/0,
-export([start_link/0, start_link/1, start_link/3, start_link/6, stop/1,
controlling_process/1, controlling_process/2, controlling_process/3,
ack_message/1, subscribe/2, unsubscribe/2, channels/1]).

-export([receiver/1, sub_test/0, pub_test/0]).

%%
%% PUBLIC API
%%

start_link(Host, Port, Password, Channels) ->
start_link(Host, Port, Password, Channels, 100, infinity, drop).
start_link() ->
start_link([]).

start_link(Host, Port, Password, Channels, ReconnectSleep,
start_link(Host, Port, Password) ->
start_link(Host, Port, Password, 100, infinity, drop).

start_link(Host, Port, Password, ReconnectSleep,
MaxQueueSize, QueueBehaviour)
when is_list(Host) andalso
is_integer(Port) andalso
is_list(Password) andalso
is_list(Channels) andalso
is_integer(ReconnectSleep) andalso
(is_integer(MaxQueueSize) orelse MaxQueueSize =:= infinity) andalso
(QueueBehaviour =:= drop orelse QueueBehaviour =:= exit) ->

eredis_sub_client:start_link(Host, Port, Password, Channels, ReconnectSleep,
eredis_sub_client:start_link(Host, Port, Password, ReconnectSleep,
MaxQueueSize, QueueBehaviour).


Expand All @@ -41,12 +44,11 @@ start_link(Host, Port, Password, Channels, ReconnectSleep,
start_link(Args) ->
Host = proplists:get_value(host, Args, "127.0.0.1"),
Port = proplists:get_value(port, Args, 6379),
Database = proplists:get_value(database, Args, 0),
Password = proplists:get_value(password, Args, ""),
ReconnectSleep = proplists:get_value(reconnect_sleep, Args, 100),
MaxQueueSize = proplists:get_value(max_queue_size, Args, infinity),
QueueBehaviour = proplists:get_value(queue_behaviour, Args, drop),
start_link(Host, Port, Database, Password, ReconnectSleep,
start_link(Host, Port, Password, ReconnectSleep,
MaxQueueSize, QueueBehaviour).

stop(Pid) ->
Expand Down
50 changes: 8 additions & 42 deletions src/eredis_sub_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
%%
%% This client implements a subscriber to a Redis pubsub channel. It
%% is implemented in the same way as eredis_client, except channel
%% messages are streamed to the controlling process. eredis will only
%% receive the next message on the socket when the current message has
%% been acked.
%% messages are streamed to the controlling process. Messages are
%% queued and delivered when the client acknowledges receipt.
%%
%% There is one consuming process per eredis_sub_client.
-module(eredis_sub_client).
Expand All @@ -17,7 +16,7 @@


%% API
-export([start_link/7, stop/1]).
-export([start_link/6, stop/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand All @@ -30,13 +29,12 @@
-spec start_link(Host::list(),
Port::integer(),
Password::string(),
Channels::[channel()],
ReconnectSleep::integer(),
MaxQueueSize::integer(),
QueueBehaviour::drop | exit) ->
{ok, Pid::pid()} | {error, Reason::term()}.
start_link(Host, Port, Password, Channels, ReconnectSleep, MaxQueueSize, QueueBehaviour) ->
Args = [Host, Port, Password, Channels, ReconnectSleep, MaxQueueSize, QueueBehaviour],
start_link(Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour) ->
Args = [Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour],
gen_server:start_link(?MODULE, Args, []).


Expand All @@ -47,12 +45,12 @@ stop(Pid) ->
%% gen_server callbacks
%%====================================================================

init([Host, Port, Password, Channels, ReconnectSleep, MaxQueueSize, QueueBehaviour]) ->
init([Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour]) ->
State = #state{host = Host,
port = Port,
password = list_to_binary(Password),
reconnect_sleep = ReconnectSleep,
channels = Channels,
channels = [],
parser_state = eredis_parser:init(),
msg_queue = queue:new(),
max_queue_size = MaxQueueSize,
Expand Down Expand Up @@ -248,46 +246,14 @@ connect(State) ->
inet:setopts(Socket, [{active, false}]),
case authenticate(Socket, State#state.password) of
ok ->
case do_subscribe(Socket, State#state.channels) of
ok ->
{ok, State#state{socket = Socket}};
{error, Reason} ->
{error, {channel_subscribe_error, Reason}}
end;
{ok, State#state{socket = Socket}};
{error, Reason} ->
{error, {authentication_error, Reason}}
end;
{error, Reason} ->
{error, {connection_error, Reason}}
end.

do_subscribe(_Socket, []) ->
{error, no_channels};
do_subscribe(Socket, Channels) ->
Command = eredis:create_multibulk(["SUBSCRIBE" | Channels]),
ok = gen_tcp:send(Socket, Command),

case gen_tcp:recv(Socket, 0) of
{ok, Data} ->
case parse_subscribe_result(Data, eredis_parser:init(), []) of
{ok, Channels} ->
ok;
Other ->
{error, {unexpected_data, Other}}
end;
{error, Reason} ->
{error, Reason}
end.

parse_subscribe_result(Data, ParserState, Acc) ->
case eredis_parser:parse(ParserState, Data) of
{ok, [<<"subscribe">>, Chan, _], _} ->
{ok, lists:reverse([Chan | Acc])};
{ok, [<<"subscribe">>, Chan, _], Rest, NewParserState} ->
parse_subscribe_result(Rest, NewParserState, [Chan | Acc])
end.



authenticate(_Socket, <<>>) ->
ok;
Expand Down

0 comments on commit 111a702

Please sign in to comment.