Skip to content
This repository has been archived by the owner on Nov 20, 2019. It is now read-only.

Commit

Permalink
added better support for cas and argument types, added flush_all
Browse files Browse the repository at this point in the history
  • Loading branch information
joewilliams committed Jan 21, 2009
1 parent c3ac087 commit d1b4a30
Showing 1 changed file with 117 additions and 57 deletions.
174 changes: 117 additions & 57 deletions src/merle.erl
Expand Up @@ -46,8 +46,9 @@


%% gen_server API %% gen_server API
-export([ -export([
start_link/2, stats/0, stats/1, version/0, get/1, delete/2, set/4, add/4, start_link/2, stats/0, stats/1, version/0, getkey/1, delete/2, set/4, add/4,
replace/4, cas/5, set/2, append/2, prepend/2, increment/2, decrement/2 replace/4, cas/5, set/2, flushall/0, flushall/1, verbosity/1, add/2, replace/2,
cas/3, getskey/1
]). ]).


%% gen_server callbacks %% gen_server callbacks
Expand All @@ -63,21 +64,54 @@ stats() ->
gen_server:call(?SERVER, {stats}). gen_server:call(?SERVER, {stats}).


%% @doc retrieve memcached stats based on args %% @doc retrieve memcached stats based on args
stats(Args) when is_atom(Args)->
stats(atom_to_list(Args));
stats(Args) -> stats(Args) ->
gen_server:call(?SERVER, {stats, {Args}}). gen_server:call(?SERVER, {stats, {Args}}).


%% @doc retrieve memcached version %% @doc retrieve memcached version
version() -> version() ->
gen_server:call(?SERVER, {version}). gen_server:call(?SERVER, {version}).


%% @doc set the verbosity level of the logging output
verbosity(Args) when is_integer(Args) ->
verbosity(integer_to_list(Args));
verbosity(Args)->
gen_server:call(?SERVER, {verbosity, {Args}}).

%% @doc invalidate all existing items immediately
flushall() ->
gen_server:call(?SERVER, {flushall}).

%% @doc invalidate all existing items based on the expire time argument
flushall(Args) when is_integer(Args) ->
flushall(integer_to_list(Args));
flushall(Args) ->
gen_server:call(?SERVER, {flushall, {Args}}).

%% @doc retrieve value based off of key %% @doc retrieve value based off of key
get(Key) -> getkey(Key) when is_atom(Key) ->
getkey(atom_to_list(Key));
getkey(Key) ->
case gen_server:call(?SERVER, {getkey,{Key}}) of case gen_server:call(?SERVER, {getkey,{Key}}) of
["END"] -> undefined; ["END"] -> undefined;
[X] -> X [X] -> X
end. end.


%% @doc retrieve value based off of key for use with cas
getskey(Key) when is_atom(Key) ->
getskey(atom_to_list(Key));
getskey(Key) ->
case gen_server:call(?SERVER, {getskey,{Key}}) of
["END"] -> undefined;
[X] -> X
end.

%% @doc delete a key and specify time %% @doc delete a key and specify time
delete(Key, Time) when is_atom(Key) ->
delete(atom_to_list(Key), Time);
delete(Key, Time) when is_integer(Time) ->
delete(Key, integer_to_list(Time));
delete(Key, Time) -> delete(Key, Time) ->
gen_server:call(?SERVER, {delete, {Key, Time}}). gen_server:call(?SERVER, {delete, {Key, Time}}).


Expand Down Expand Up @@ -105,9 +139,11 @@ delete(Key, Time) ->


%% @doc Store a key/value pair. %% @doc Store a key/value pair.
set(Key, Value) -> set(Key, Value) ->
Flag = random:uniform(65000), Flag = random:uniform(65535),
set(Key, Flag, "0", Value). set(Key, integer_to_list(Flag), "0", Value).


set(Key, Flag, ExpTime, Value) when is_atom(Key) ->
set(atom_to_list(Key), Flag, ExpTime, Value);
set(Key, Flag, ExpTime, Value) when is_integer(Flag) -> set(Key, Flag, ExpTime, Value) when is_integer(Flag) ->
set(Key, integer_to_list(Flag), ExpTime, Value); set(Key, integer_to_list(Flag), ExpTime, Value);
set(Key, Flag, ExpTime, Value) when is_integer(ExpTime) -> set(Key, Flag, ExpTime, Value) when is_integer(ExpTime) ->
Expand All @@ -119,33 +155,49 @@ set(Key, Flag, ExpTime, Value) ->
end. end.


%% @doc Store a key/value pair if it doesn't already exist. %% @doc Store a key/value pair if it doesn't already exist.
add(Key, Value) ->
Flag = random:uniform(65535),
add(Key, integer_to_list(Flag), "0", Value).

add(Key, Flag, ExpTime, Value) when is_atom(Key) ->
add(atom_to_list(Key), Flag, ExpTime, Value);
add(Key, Flag, ExpTime, Value) when is_integer(Flag) ->
add(Key, integer_to_list(Flag), ExpTime, Value);
add(Key, Flag, ExpTime, Value) when is_integer(ExpTime) ->
add(Key, Flag, integer_to_list(ExpTime), Value);
add(Key, Flag, ExpTime, Value) -> add(Key, Flag, ExpTime, Value) ->
gen_server:call(?SERVER, {add, {Key, Flag, ExpTime, Value}}). gen_server:call(?SERVER, {add, {Key, Flag, ExpTime, Value}}).


%% @doc Replace an existing key/value pair. %% @doc Replace an existing key/value pair.
replace(Key, Value) ->
Flag = random:uniform(65535),
replace(Key, integer_to_list(Flag), "0", Value).

replace(Key, Flag, ExpTime, Value) when is_atom(Key) ->
replace(atom_to_list(Key), Flag, ExpTime, Value);
replace(Key, Flag, ExpTime, Value) when is_integer(Flag) ->
replace(Key, integer_to_list(Flag), ExpTime, Value);
replace(Key, Flag, ExpTime, Value) when is_integer(ExpTime) ->
replace(Key, Flag, integer_to_list(ExpTime), Value);
replace(Key, Flag, ExpTime, Value) -> replace(Key, Flag, ExpTime, Value) ->
gen_server:call(?SERVER, {replace, {Key, Flag, ExpTime, Value}}). gen_server:call(?SERVER, {replace, {Key, Flag, ExpTime, Value}}).


%% @doc Store a key/value pair if possible. %% @doc Store a key/value pair if possible.
cas(Key, CasUniq, Value) ->
Flag = random:uniform(65535),
cas(Key, integer_to_list(Flag), "0", CasUniq, Value).

cas(Key, Flag, ExpTime, CasUniq, Value) when is_atom(Key) ->
cas(atom_to_list(Key), Flag, ExpTime, CasUniq, Value);
cas(Key, Flag, ExpTime, CasUniq, Value) when is_integer(Flag) ->
cas(Key, integer_to_list(Flag), ExpTime, CasUniq, Value);
cas(Key, Flag, ExpTime, CasUniq, Value) when is_integer(ExpTime) ->
cas(Key, Flag, integer_to_list(ExpTime), CasUniq, Value);
cas(Key, Flag, ExpTime, CasUniq, Value) when is_integer(CasUniq) ->
cas(Key, Flag, ExpTime, integer_to_list(CasUniq), Value);
cas(Key, Flag, ExpTime, CasUniq, Value) -> cas(Key, Flag, ExpTime, CasUniq, Value) ->
gen_server:call(?SERVER, {cas, {Key, Flag, ExpTime, CasUniq, Value}}). gen_server:call(?SERVER, {cas, {Key, Flag, ExpTime, CasUniq, Value}}).


%% @doc Append data to already existing value.
append(Key, Value) ->
gen_server:call(?SERVER, {append, {Key, Value}}).

%% @doc Prepend data to already existing value.
prepend(Key, Value) ->
gen_server:call(?SERVER, {prepend, {Key, Value}}).

%% @doc Increment already existing value.
increment(Key, Value) ->
gen_server:call(?SERVER, {increment, {Key, Value}}).

%% @doc Decrement already existing value.
decrement(Key, Value) ->
gen_server:call(?SERVER, {decrement, {Key, Value}}).

%% @private %% @private
start_link(Host, Port) -> start_link(Host, Port) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Host, Port], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [Host, Port], []).
Expand All @@ -167,10 +219,26 @@ handle_call({version}, _From, Socket) ->
Reply = send_generic_cmd(Socket, iolist_to_binary([<<"version">>])), Reply = send_generic_cmd(Socket, iolist_to_binary([<<"version">>])),
{reply, Reply, Socket}; {reply, Reply, Socket};


handle_call({verbosity, {Args}}, _From, Socket) ->
Reply = send_generic_cmd(Socket, iolist_to_binary([<<"verbosity ">>, Args])),
{reply, Reply, Socket};

handle_call({flushall}, _From, Socket) ->
Reply = send_generic_cmd(Socket, iolist_to_binary([<<"flush_all">>])),
{reply, Reply, Socket};

handle_call({flushall, {Args}}, _From, Socket) ->
Reply = send_generic_cmd(Socket, iolist_to_binary([<<"flush_all ">>, Args])),
{reply, Reply, Socket};

handle_call({getkey, {Key}}, _From, Socket) -> handle_call({getkey, {Key}}, _From, Socket) ->
Reply = send_get_cmd(Socket, iolist_to_binary([<<"get ">>, Key])), Reply = send_get_cmd(Socket, iolist_to_binary([<<"get ">>, Key])),
{reply, Reply, Socket}; {reply, Reply, Socket};


handle_call({getskey, {Key}}, _From, Socket) ->
Reply = send_gets_cmd(Socket, iolist_to_binary([<<"gets ">>, Key])),
{reply, [Reply], Socket};

handle_call({delete, {Key, Time}}, _From, Socket) -> handle_call({delete, {Key, Time}}, _From, Socket) ->
Reply = send_generic_cmd( Reply = send_generic_cmd(
Socket, Socket,
Expand Down Expand Up @@ -226,40 +294,6 @@ handle_call({cas, {Key, Flag, ExpTime, CasUniq, Value}}, _From, Socket) ->
]), ]),
Bin Bin
), ),
{reply, Reply, Socket};

handle_call({append, {Key, Value}}, _From, Socket) ->
Bin = term_to_binary(Value),
Bytes = integer_to_list(size(Bin)),
Reply = send_storage_cmd(
Socket,
iolist_to_binary([<<"append ">>, Key, <<" 0 0 ">>, Bytes]),
Bin
),
{reply, Reply, Socket};

handle_call({prepend, {Key, Value}}, _From, Socket) ->
Bin = term_to_binary(Value),
Bytes = integer_to_list(size(Bin)),
Reply = send_storage_cmd(
Socket,
iolist_to_binary([<<"prepend ">>, Key, <<" 0 0 ">>, Bytes]),
Bin
),
{reply, Reply, Socket};

handle_call({increment, {Key, Value}}, _From, Socket) ->
Bin = term_to_binary(Value),
Reply = send_generic_cmd(
Socket,
iolist_to_binary([<<"incr ">>, Key, Bin])),
{reply, Reply, Socket};

handle_call({decrement, {Key, Value}}, _From, Socket) ->
Bin = term_to_binary(Value),
Reply = send_generic_cmd(
Socket,
iolist_to_binary([<<"decr ">>, Key, Bin])),
{reply, Reply, Socket}. {reply, Reply, Socket}.


%% @private %% @private
Expand Down Expand Up @@ -296,7 +330,14 @@ send_storage_cmd(Socket, Cmd, Value) ->
%% @doc send_get_cmd/2 function for retreival commands %% @doc send_get_cmd/2 function for retreival commands
send_get_cmd(Socket, Cmd) -> send_get_cmd(Socket, Cmd) ->
gen_tcp:send(Socket, <<Cmd/binary, "\r\n">>), gen_tcp:send(Socket, <<Cmd/binary, "\r\n">>),
Reply = recv_complex_reply(Socket), Reply = recv_complex_get_reply(Socket),
Reply.

%% @private
%% @doc send_gets_cmd/2 function for cas retreival commands
send_gets_cmd(Socket, Cmd) ->
gen_tcp:send(Socket, <<Cmd/binary, "\r\n">>),
Reply = recv_complex_gets_reply(Socket),
Reply. Reply.


%% @private %% @private
Expand All @@ -313,7 +354,7 @@ recv_simple_reply() ->


%% @private %% @private
%% @doc receive function for respones containing VALUEs %% @doc receive function for respones containing VALUEs
recv_complex_reply(Socket) -> recv_complex_get_reply(Socket) ->
receive receive
%% For receiving get responses where the key does not exist %% For receiving get responses where the key does not exist
{tcp, Socket, <<"END\r\n">>} -> ["END"]; {tcp, Socket, <<"END\r\n">>} -> ["END"];
Expand All @@ -330,6 +371,25 @@ recv_complex_reply(Socket) ->
after ?TIMEOUT -> timeout after ?TIMEOUT -> timeout
end. end.


%% @private
%% @doc receive function for cas respones containing VALUEs
recv_complex_gets_reply(Socket) ->
receive
%% For receiving get responses where the key does not exist
{tcp, Socket, <<"END\r\n">>} -> ["END"];
%% For receiving get responses containing data
{tcp, Socket, Data} ->
%% Reply format <<"VALUE SOMEKEY FLAG BYTES\r\nSOMEVALUE\r\nEND\r\n">>
Parse = io_lib:fread("~s ~s ~u ~u ~u\r\n", binary_to_list(Data)),
{ok,[_,_,_,Bytes,CasUniq], ListBin} = Parse,
Bin = list_to_binary(ListBin),
Reply = get_data(Socket, Bin, Bytes, length(ListBin)),
[CasUniq, Reply];
{error, closed} ->
connection_closed
after ?TIMEOUT -> timeout
end.

%% @private %% @private
%% @doc recieve loop to get all data %% @doc recieve loop to get all data
get_data(Socket, Bin, Bytes, Len) when Len < Bytes + 7-> get_data(Socket, Bin, Bytes, Len) when Len < Bytes + 7->
Expand Down

0 comments on commit d1b4a30

Please sign in to comment.