Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #10 from jdavisp3/pipeline

Support sending a pipeline of redis commands.
  • Loading branch information...
commit bfc9f1de46254100178a602a18a5845ddd7699a6 2 parents 39db581 + aefb647
@knutin knutin authored
View
1  include/eredis.hrl
@@ -5,6 +5,7 @@
-type return_value() :: binary() | [binary()].
+-type pipeline() :: [iolist()].
%% Continuation data is whatever data returned by any of the parse
%% functions. This is used to continue where we left off the next time
View
21 src/eredis.erl
@@ -16,7 +16,7 @@
-define(TIMEOUT, 5000).
-export([start_link/0, start_link/1, start_link/2, start_link/3, start_link/4,
- start_link/5, q/2, q/3]).
+ start_link/5, q/2, q/3, qp/2, qp/3]).
%% Exported for testing
-export([create_multibulk/1]).
@@ -70,6 +70,19 @@ q(Client, Command, Timeout) ->
call(Client, Command, Timeout).
+-spec qp(Client::pid(), Pipeline::pipeline()) ->
+ {ok, return_value()} | {error, Reason::binary()}.
+%% @doc: Executes the given pipeline (list of commands) in the
+%% specified connection. The commands must be valid Redis commands and
+%% may contain arbitrary data which will be converted to binaries. The
+%% values returned by each command in the pipeline are returned in a list.
+qp(Client, Pipeline) ->
+ pipeline(Client, Pipeline, ?TIMEOUT).
+
+qp(Client, Pipeline, Timeout) ->
+ pipeline(Client, Pipeline, Timeout).
+
+
%%
%% INTERNAL HELPERS
%%
@@ -78,6 +91,12 @@ call(Client, Command, Timeout) ->
Request = {request, create_multibulk(Command)},
gen_server:call(Client, Request, Timeout).
+pipeline(_Client, [], _Timeout) ->
+ [];
+pipeline(Client, Pipeline, Timeout) ->
+ Request = {pipeline, [create_multibulk(Command) || Command <- Pipeline]},
+ gen_server:call(Client, Request, Timeout).
+
-spec create_multibulk(Args::iolist()) -> Command::iolist().
%% @doc: Creates a multibulk command with all the correct size headers
create_multibulk(Args) ->
View
31 src/eredis_client.erl
@@ -16,6 +16,9 @@
%% the client at the front of the queue. If the parser does not have
%% enough data to parse the complete response, we will wait for more
%% data to arrive.
+%% * For pipeline commands, we include the number of responses we are
+%% waiting for in each element of the queue. Responses are queued until
+%% we have all the responses we need and then reply with all of them.
%%
-module(eredis_client).
-author('knut.nesheim@wooga.com').
@@ -86,6 +89,9 @@ init([Host, Port, Database, Password, ReconnectSleep]) ->
handle_call({request, Req}, From, State) ->
do_request(Req, From, State);
+handle_call({pipeline, Pipeline}, From, State) ->
+ do_pipeline(Pipeline, From, State);
+
handle_call(stop, _From, State) ->
{stop, normal, State};
@@ -151,7 +157,23 @@ do_request(_Req, _From, #state{socket = undefined} = State) ->
do_request(Req, From, State) ->
case gen_tcp:send(State#state.socket, Req) of
ok ->
- NewQueue = queue:in(From, State#state.queue),
+ NewQueue = queue:in({1, From}, State#state.queue),
+ {noreply, State#state{queue = NewQueue}};
+ {error, Reason} ->
+ {reply, {error, Reason}, State}
+ end.
+
+-spec do_pipeline(Pipeline::pipeline(), From::pid(), #state{}) ->
+ {noreply, #state{}} | {reply, Reply::any(), #state{}}.
+%% @doc: Sends the entire pipeline to redis. If we do not have a
+%% connection, returns error.
+do_pipeline(_Pipeline, _From, #state{socket = undefined} = State) ->
+ {reply, {error, no_connection}, State};
+
+do_pipeline(Pipeline, From, State) ->
+ case gen_tcp:send(State#state.socket, Pipeline) of
+ ok ->
+ NewQueue = queue:in({length(Pipeline), From, []}, State#state.queue),
{noreply, State#state{queue = NewQueue}};
{error, Reason} ->
{reply, {error, Reason}, State}
@@ -189,9 +211,14 @@ handle_response(Data, #state{parser_state = ParserState,
%% queue without this client.
reply(Value, Queue) ->
case queue:out(Queue) of
- {{value, From}, NewQueue} ->
+ {{value, {1, From}}, NewQueue} ->
gen_server:reply(From, Value),
NewQueue;
+ {{value, {1, From, Replies}}, NewQueue} ->
+ gen_server:reply(From, lists:reverse([Value | Replies])),
+ NewQueue;
+ {{value, {N, From, Replies}}, NewQueue} when N > 1 ->
+ queue:in({N - 1, From, [Value | Replies]}, NewQueue);
{empty, Queue} ->
%% Oops
error_logger:info_msg("Nothing in queue, but got value from parser~n"),
View
24 test/eredis_tests.erl
@@ -52,6 +52,30 @@ exec_test() ->
?assertMatch({ok, _}, eredis:q(C, ["DEL", "k1", "k2"])).
+pipeline_test() ->
+ C = c(),
+
+ P1 = [["SET", a, "1"],
+ ["LPUSH", b, "3"],
+ ["LPUSH", b, "2"]],
+
+ ?assertEqual([{ok, <<"OK">>}, {ok, <<"1">>}, {ok, <<"2">>}],
+ eredis:qp(C, P1)),
+
+ P2 = [["MULTI"],
+ ["GET", a],
+ ["LRANGE", b, "0", "-1"],
+ ["EXEC"]],
+
+ ?assertEqual([{ok, <<"OK">>},
+ {ok, <<"QUEUED">>},
+ {ok, <<"QUEUED">>},
+ {ok, [<<"1">>, [<<"2">>, <<"3">>]]}],
+ eredis:qp(C, P2)),
+
+ ?assertMatch({ok, _}, eredis:q(C, ["DEL", a, b])).
+
+
c() ->
Res = eredis:start_link(),
?assertMatch({ok, _}, Res),
Please sign in to comment.
Something went wrong with that request. Please try again.