Permalink
Browse files

asynchronous erlmc_recv, erlmc_pool_sup, reduced API

  • Loading branch information...
astro committed May 11, 2010
1 parent 41c8857 commit 64c47982b0b45d4e1f399e889a8bb6a300b6aa3f
Showing with 298 additions and 251 deletions.
  1. +10 −0 include/erlmc.hrl
  2. +118 −0 src/erlmc.erl
  3. +18 −194 src/erlmc_conn.erl
  4. +67 −0 src/erlmc_pool_sup.erl
  5. +85 −0 src/erlmc_recv.erl
  6. +0 −57 t/erlmc_t_001.t
View
@@ -18,4 +18,14 @@
-define(OP_Stat, 16#10).
-record(request, {op_code, data_type=16#00, reserved=16#00, opaque=16#00, cas=16#00, extras = <<>>, key = <<>>, value = <<>>}).
+
+%% Status types
+-define(STATUS_OK, 16#00).
+-define(STATUS_NOT_FOUND, 16#01).
+-define(STATUS_EXISTS, 16#02).
+-define(STATUS_TOO_LARGE, 16#03).
+-define(STATUS_INVALID_ARG, 16#04).
+-define(STATUS_NOT_STORED, 16#05).
+-define(STATUS_NON_NUMERIC, 16#06).
+
-record(response, {op_code, data_type, status, opaque, cas, extras, key, value, key_size, extras_size, body_size}).
View
@@ -0,0 +1,118 @@
+%% Copyright (c) 2010
+%% Stephan Maka <stephan@spaceboyz.net>
+%%
+%% Permission is hereby granted, free of charge, to any person
+%% obtaining a copy of this software and associated documentation
+%% files (the "Software"), to deal in the Software without
+%% restriction, including without limitation the rights to use,
+%% copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the
+%% Software is furnished to do so, subject to the following
+%% conditions:
+%%
+%% The above copyright notice and this permission notice shall be
+%% included in all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+%% OTHER DEALINGS IN THE SOFTWARE.
+%%
+%% http://code.google.com/p/memcached/wiki/MemcacheBinaryProtocol
+%% @doc a binary protocol memcached client
+-module(erlmc).
+
+%% api callbacks
+-export([get/1, add/2, add/3, set/2, set/3,
+ replace/2, replace/3, delete/1, increment/4, decrement/4,
+ append/2, prepend/2]).
+
+-include("erlmc.hrl").
+
+-define(TIMEOUT, 60000).
+
+%%--------------------------------------------------------------------
+%%% API
+%%--------------------------------------------------------------------
+get(Key) ->
+ call_pool(#request{op_code=?OP_GetK, key=Key}).
+
+add(Key, Value) ->
+ add(Key, Value, 0).
+
+add(Key, Value, Expiration)
+ when is_binary(Value), is_integer(Expiration) ->
+ call_pool(#request{op_code = ?OP_Add,
+ extras = <<0:32, Expiration:32>>,
+ key = Key,
+ value = Value}).
+
+set(Key, Value) ->
+ set(Key, Value, 0).
+
+set(Key, Value, Expiration)
+ when is_binary(Value), is_integer(Expiration) ->
+ call_pool(#request{op_code = ?OP_Set,
+ extras = <<0:32, Expiration:32>>,
+ key = Key,
+ value = Value}).
+
+replace(Key, Value) ->
+ replace(Key, Value, 0).
+
+replace(Key, Value, Expiration)
+ when is_binary(Value), is_integer(Expiration) ->
+ call_pool(#request{op_code = ?OP_Replace,
+ extras = <<0:32, Expiration:32>>,
+ key = Key,
+ value = Value}).
+
+delete(Key) ->
+ call_pool(#request{op_code = ?OP_Delete,
+ key = Key}).
+
+increment(Key, Value, Initial, Expiration)
+ when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
+ call_pool(#request{op_code = ?OP_Increment,
+ extras = <<Value:64, Initial:64, Expiration:32>>,
+ key = Key}).
+
+decrement(Key, Value, Initial, Expiration)
+ when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
+ call_pool(#request{op_code=?OP_Decrement,
+ extras = <<Value:64, Initial:64, Expiration:32>>,
+ key = Key}).
+
+append(Key, Value) when is_binary(Value) ->
+ call_pool(#request{op_code = ?OP_Append,
+ key = Key,
+ value = Value}).
+
+prepend(Key, Value) when is_binary(Value) ->
+ call_pool(#request{op_code = ?OP_Prepend,
+ key = Key,
+ value = Value}).
+
+call_pool(Req) ->
+ {ok, Conn} = erlmc_pool_sup:next_conn(),
+ case gen_server:call(Conn, {req, Req}) of
+ {reply, #response{status = ?STATUS_OK, value = Value}} ->
+ {ok, Value};
+ {reply, #response{status = Status}} ->
+ {error, status_id(Status)};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+status_id(0) -> ok;
+status_id(1) -> not_found;
+status_id(2) -> exists;
+status_id(3) -> too_large;
+status_id(4) -> invalid_arg;
+status_id(5) -> not_stored;
+status_id(6) -> non_numeric;
+status_id(_) -> unknown.
View
@@ -33,6 +33,9 @@
-export([start_link/1, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
+-record(state, {socket,
+ requesters = queue:new()}).
+
%% API functions
start_link([Host, Port]) ->
gen_server:start_link(?MODULE, [Host, Port], []).
@@ -50,11 +53,16 @@ start_link([Host, Port]) ->
%% @hidden
%%--------------------------------------------------------------------
init([Host, Port]) ->
- case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]) of
+ I = self(),
+ case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]) of
{ok, Socket} ->
- {ok, Socket};
+ erlmc_recv:start_link(Socket,
+ fun(Reply) ->
+ gen_server:cast(I, {reply, Reply})
+ end),
+ {ok, #state{socket = Socket}};
Error ->
- exit(Error)
+ exit(Error)
end.
%%--------------------------------------------------------------------
@@ -67,146 +75,21 @@ init([Host, Port]) ->
%% Description: Handling call messages
%% @hidden
%%--------------------------------------------------------------------
-handle_call({get, Key}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_GetK, key=Key}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- #response{key=Key1, value=Value} when Key == Key1 ->
- {reply, Value, Socket};
- #response{} ->
- {reply, <<>>, Socket}
- end;
-
-handle_call({add, Key, Value, Expiration}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_Add, extras = <<0:32, Expiration:32>>,
- key=Key, value=Value}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({set, Key, Value, Expiration}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_Set, extras = <<0:32, Expiration:32>>,
- key=Key, value=Value}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({replace, Key, Value, Expiration}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_Replace, extras = <<0:32, Expiration:32>>,
- key=Key, value=Value}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({delete, Key}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_Delete, key=Key}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({increment, Key, Value, Initial, Expiration}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_Increment,
- extras = <<Value:64, Initial:64, Expiration:32>>,
- key=Key}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({decrement, Key, Value, Initial, Expiration}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_Decrement,
- extras = <<Value:64, Initial:64, Expiration:32>>,
- key=Key}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({append, Key, Value}, _From, Socket) ->
- case send_recv(Socket, #request{op_code=?OP_Append,
- key=Key,
- value=Value}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({prepend, Key, Value}, _From, Socket) ->
- case send_recv(Socket,
- #request{op_code=?OP_Prepend,
- key=Key, value=Value}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call(stats, _From, Socket) ->
- send(Socket, #request{op_code=?OP_Stat}),
- case collect_stats_from_socket(Socket) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Reply ->
- {reply, Reply, Socket}
- end;
-
-handle_call(flush, _From, Socket) ->
- case send_recv(Socket, #request{op_code=?OP_Flush}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call({flush, Expiration}, _From, Socket) ->
- case send_recv(Socket, #request{op_code=?OP_Flush, extras = <<Expiration:32>>}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call(quit, _From, Socket) ->
- send_recv(Socket, #request{op_code=?OP_Quit}),
- gen_tcp:close(Socket),
- {stop, shutdown, undefined};
+handle_call({req, Req}, From, #state{socket = Socket} = State) ->
+ send(Socket, Req),
+ {noreply, State#state{requesters = queue:in(From, State#state.requesters)}}.
-handle_call(version, _From, Socket) ->
- case send_recv(Socket, #request{op_code=?OP_Version}) of
- {error, Err} ->
- {stop, Err, {error, Err}, Socket};
- Resp ->
- {reply, Resp#response.value, Socket}
- end;
-
-handle_call(_, _From, Socket) -> {reply, {error, invalid_call}, Socket}.
-
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%% @hidden
%%--------------------------------------------------------------------
-handle_cast(_Message, State) -> {noreply, State}.
+handle_cast({reply, Reply}, #state{requesters = Requesters1} = State) ->
+ {{value, From}, Requesters2} = queue:out(Requesters1),
+ gen_server:reply(From, {reply, Reply}),
+ {noreply, State#state{requesters = Requesters2}}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
@@ -241,35 +124,10 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-collect_stats_from_socket(Socket) ->
- collect_stats_from_socket(Socket, []).
-
-collect_stats_from_socket(Socket, Acc) ->
- case recv(Socket) of
- {error, Err} ->
- {error, Err};
- #response{body_size=0} ->
- Acc;
- #response{key=Key, value=Value} ->
- collect_stats_from_socket(Socket, [{binary_to_atom(Key, utf8), binary_to_list(Value)}|Acc])
- end.
-
-send_recv(Socket, Request) ->
- ok = send(Socket, Request),
- recv(Socket).
-
send(Socket, Request) ->
Bin = encode_request(Request),
gen_tcp:send(Socket, Bin).
-recv(Socket) ->
- case recv_header(Socket) of
- {error, Err} ->
- {error, Err};
- HdrResp ->
- recv_body(Socket, HdrResp)
- end.
-
encode_request(Request) when is_record(Request, request) ->
Magic = 16#80,
Opcode = Request#request.op_code,
@@ -284,37 +142,3 @@ encode_request(Request) when is_record(Request, request) ->
CAS = Request#request.cas,
<<Magic:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8, Reserved:16, BodySize:32, Opaque:32, CAS:64, Body:BodySize/binary>>.
-recv_header(Socket) ->
- decode_response_header(recv_bytes(Socket, 24)).
-
-recv_body(Socket, #response{key_size = KeySize, extras_size = ExtrasSize, body_size = BodySize}=Resp) ->
- decode_response_body(recv_bytes(Socket, BodySize), ExtrasSize, KeySize, Resp).
-
-decode_response_header({error, Err}) -> {error, Err};
-decode_response_header(<<16#81:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8, Status:16, BodySize:32, Opaque:32, CAS:64>>) ->
- #response{
- op_code = Opcode,
- data_type = DataType,
- status = Status,
- opaque = Opaque,
- cas = CAS,
- key_size = KeySize,
- extras_size = ExtrasSize,
- body_size = BodySize
- }.
-
-decode_response_body({error, Err}, _, _, _) -> {error, Err};
-decode_response_body(Bin, ExtrasSize, KeySize, Resp) ->
- <<Extras:ExtrasSize/binary, Key:KeySize/binary, Value/binary>> = Bin,
- Resp#response{
- extras = Extras,
- key = Key,
- value = Value
- }.
-
-recv_bytes(_, 0) -> <<>>;
-recv_bytes(Socket, NumBytes) ->
- case gen_tcp:recv(Socket, NumBytes) of
- {ok, Bin} -> Bin;
- Err -> Err
- end.
Oops, something went wrong.

0 comments on commit 64c4798

Please sign in to comment.