Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

1. add get and delete command

2. use IOVec to provide same function for output and outputv
  • Loading branch information...
commit 692caa8d5703befa27f02e97da6c943b51323493 1 parent 4af6e91
@echou authored
View
243 lib/mcache/c_src/memcached_drv.cpp
@@ -8,9 +8,11 @@
using namespace std;
#define CMD_SET_SERVERS 0
-#define CMD_SET 1
-#define CMD_MGET 2
-#define CMD_MGET_BY_CLASS 3
+
+#define CMD_GET 1
+#define CMD_MGET 2
+#define CMD_SET 3
+#define CMD_DELETE 4
class Cache
{
@@ -27,19 +29,19 @@ class Cache
int ret = memcached_server_push(mc, s);
memcached_server_list_free(s);
+
+ if (ret == MEMCACHED_SUCCESS)
+ {
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
+ }
+
return ret == 0;
}
- void initBehaviors()
- {
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_SORT_HOSTS, 1);
- }
-
private:
memcached_st * mc;
};
@@ -54,115 +56,178 @@ class Driver
int control(unsigned int command, char *buf, int len, char **rbuf, int rlen)
{
- switch(command) {
- case CMD_SET_SERVERS:
- return doSetServers(buf, len, rbuf, rlen);
- default:
- return 0;
+ switch(command)
+ {
+ case CMD_SET_SERVERS:
+ return doSetServers(buf, len, rbuf, rlen);
}
+ return 0;
}
- void output(char *buf, int len)
+ void output(IOVec& vec)
{
- int cmd = *buf;
- uint32_t seq = ntohl(*(uint32_t*)(buf+1));
- switch(cmd) {
- case CMD_SET:
- doSet(seq, buf+5, len-5);
+ char cmd;
+ uint32_t seq;
+
+ if (vec.get(cmd) && vec.get(seq))
+ {
+ switch(cmd)
+ {
+ case CMD_GET:
+ doGet(seq, vec);
break;
case CMD_MGET:
- doMGet(seq, buf+5, len-5);
+ doMGet(seq, vec);
+ break;
+ case CMD_SET:
+ doSet('s', seq, vec);
break;
+ case CMD_DELETE:
+ doDelete(seq, vec);
+ break;
+ }
}
}
private:
- TermData createReply(uint32_t seq, char* atom)
+ // some TermData helpers
+ TermData createReply(uint32_t seq)
{
TermData td;
td.open_tuple();
td.add_atom("mc_async");
td.add_uint(seq);
+ return td;
+ }
- td.open_tuple();
- td.add_atom(atom);
+ TermData& ok(TermData& td, bool in_tuple = true)
+ {
+ if (in_tuple) td.open_tuple();
+ td.add_atom("ok");
return td;
}
- void sendError(uint32_t seq, memcached_return rc, bool to_caller)
+ TermData& error(TermData& td, memcached_return rc)
{
- TermData td = createReply(seq, "error");
const char * errmsg = memcached_strerror(m_cache, rc);
+ td.open_tuple();
+ td.add_atom("error");
td.add_buf((char*)errmsg, strlen(errmsg));
- td.output(m_port, to_caller);
+ td.close_tuple();
+ return td;
+ }
+
+ TermData& badarg(TermData& td)
+ {
+ td.open_tuple();
+ td.add_atom("error");
+ td.add_atom("badarg");
+ td.close_tuple();
+ return td;
+ }
+
+ void send(TermData& td)
+ {
+ td.output(m_port, driver_caller(m_port));
}
+ // command handlers
int doSetServers(char* buf, int len, char** rbuf, int rlen)
{
m_cache.setServers(buf);
- m_cache.initBehaviors();
return 0;
}
- void doSet(uint32_t seq, char* buf, int len)
+ void doGet(uint32_t seq, IOVec& vec)
{
- // <<KeyLen:32, Key/binary, ValueLen:32, Value/binary, Flags:32, Expires:32>>
- char *p = buf;
- size_t klen = ntohl(*(int*)p); p+=4;
- char *key = p; p += klen;
- size_t vlen = ntohl(*(int*)p); p+=4;
- char *value = p; p += vlen;
- uint32_t flags = ntohl(*(int*)p); p+=4;
- uint32_t expires = ntohl(*(int*)p);
+ TermData td = createReply(seq);
+
+ size_t klen;
+ char * key;
+
+ // <<KeyLen:32, Key/binary>>
+ if (!(vec.get(klen) && vec.get(key, klen)))
+ {
+ send(badarg(td));
+ return;
+ }
+
+ size_t vlen;
+ uint32_t flags;
memcached_return rc;
- rc = memcached_set(m_cache, (const char*)key, klen, value, vlen, expires, flags);
+ char* value = memcached_get(m_cache, (const char*)key, klen, &vlen, &flags, &rc);
if (rc == MEMCACHED_SUCCESS)
{
- TermData td = createReply(seq, "ok");
- td.output(m_port, true);
+ ok(td, true);
+
+ td.open_tuple();
+ td.add_buf(value, vlen);
+ td.add_uint(flags);
+ td.close_tuple();
+ }
+ else if (rc == MEMCACHED_NOTFOUND)
+ {
+ ok(td, true);
+ td.add_atom("undefined");
}
else
{
- sendError(seq, rc, true);
+ error(td, rc);
}
+
+ send(td);
+ if (value) free(value);
+
}
- void doMGet(uint32_t seq, char* buf, int len)
+ void doMGet(uint32_t seq, IOVec& vec)
{
+ TermData td = createReply(seq);
+
// <<Count:32, KeyLen:32, Key/binary, ...>>
- char *p = buf;
- int num_keys = ntohl(*(int*)p); p += 4;
+ int num_keys;
+ if (!vec.get(num_keys) || num_keys <= 0 || num_keys>2000)
+ goto L_badarg;
+
char * keys[num_keys];
size_t lengths[num_keys];
for(int i=0;i<num_keys;i++)
{
- lengths[i] = ntohl(*(int*)p); p+=4;
- keys[i] = p; p += lengths[i];
+ if (!(vec.get(lengths[i]) && vec.get(keys[i], lengths[i])))
+ goto L_badarg;
}
+ goto L_arg_ok;
+
+ L_badarg:
+ send(badarg(td));
+ return;
+
+ L_arg_ok:
memcached_return rc;
rc = memcached_mget(m_cache, (const char**)keys, lengths, num_keys);
if (rc != MEMCACHED_SUCCESS)
{
- sendError(seq, rc, true);
+ send(error(td, rc));
return;
}
- TermData td = createReply(seq, "ok");
+ ok(td, true);
+ td.open_list();
vector<memcached_result_st*> free_list;
// [ {Key, Value, Flag}, ... ]
- td.open_list();
memcached_result_st *result;
while ( (result = memcached_fetch_result(m_cache, NULL, &rc)) )
{
free_list.push_back(result);
+
td.open_tuple();
td.add_buf(result->key, result->key_length);
td.add_buf(memcached_string_value(&(result->value)), memcached_string_length(&(result->value)));
@@ -170,11 +235,63 @@ class Driver
td.close_tuple();
}
td.close_list();
- td.output(m_port, true);
-
+ send(td);
for(int i=0; i<free_list.size(); i++) memcached_result_free(free_list[i]);
}
+ void doSet(char type, uint32_t seq, IOVec& vec)
+ {
+ TermData td = createReply(seq);
+
+ // <<KeyLen:32, Key/binary, ValueLen:32, Value/binary, Flags:32, Expires:32>>
+ size_t klen, vlen;
+ uint32_t flags, expires;
+ char *key, *value;
+
+ if (!(vec.get(klen) &&
+ vec.get(vlen) &&
+ vec.get(flags) &&
+ vec.get(expires) &&
+ vec.get(key, klen) &&
+ vec.get(value, vlen)))
+ {
+ send(badarg(td));
+ return;
+ }
+
+ memcached_return rc;
+ rc = memcached_set(m_cache, (const char*)key, klen, value, vlen, expires, flags);
+ if (rc == MEMCACHED_SUCCESS)
+ send(ok(td, false));
+ else
+ send(error(td, rc));
+ }
+
+ void doDelete(uint32_t seq, IOVec& vec)
+ {
+ TermData td = createReply(seq);
+
+ // <<Expires:32, KeyLen:32, Key/binary>>
+ uint32_t expires;
+ size_t klen;
+ char *key;
+
+ if (!(vec.get(expires) &&
+ vec.get(klen) &&
+ vec.get(key, klen)))
+ {
+ send(badarg(td));
+ return;
+ }
+
+ memcached_return rc;
+ rc = memcached_delete(m_cache, (const char*)key, klen, expires);
+ if (rc == MEMCACHED_SUCCESS)
+ send(ok(td, false));
+ else
+ send(error(td, rc));
+ }
+
private:
ErlDrvPort m_port;
Cache m_cache;
@@ -202,15 +319,27 @@ static int driverControl(ErlDrvData drv_data, unsigned int command, char *buf, i
static void driverOutput(ErlDrvData drv_data, char* buf, int len)
{
- ((Driver*)drv_data)->output(buf, len);
+ IOVec vec(buf, len);
+ ((Driver*)drv_data)->output(vec);
}
+static void driverOutputv(ErlDrvData drv_data, ErlIOVec* ev)
+{
+ /*
+ printf("outputv=%p, sys_iov=%p, n=%d\r\n", ev, ev->iov, ev->vsize);
+ for(int i=0; i<ev->vsize; i++)
+ printf(" iov[%d]=(%p, %d)\r\n", i, ev->iov[i].iov_base, ev->iov[i].iov_len);
+ */
+
+ IOVec vec(ev->iov+1, ev->vsize-1); //TODO: the first iov seems to be (nil, 0)
+ ((Driver*)drv_data)->output(vec);
+}
ErlDrvEntry driver_entry = {
NULL, /* F_PTR init, N/A */
driverStart, /* L_PTR start, called when port is opened */
driverStop, /* F_PTR stop, called when port is closed */
- driverOutput, /* F_PTR output, called when erlang has sent */
+ NULL, //driverOutput, /* F_PTR output, called when erlang has sent */
NULL, /* F_PTR ready_input, called when input descriptor ready */
NULL, /* F_PTR ready_output, called when output descriptor ready */
"memcached_drv", /* char *driver_name, the argument to open_port */
@@ -218,7 +347,7 @@ ErlDrvEntry driver_entry = {
NULL, /* handle */
driverControl, /* F_PTR control, port_command callback */
NULL, /* F_PTR timeout, reserved */
- NULL, /* F_PTR outputv, reserved */
+ driverOutputv, /* F_PTR outputv, reserved */
NULL,
NULL,
NULL,
View
152 lib/mcache/c_src/termdata.hpp
@@ -4,7 +4,6 @@
#include <erl_driver.h>
#include <ei.h>
-#include <list>
#include <vector>
#include <stack>
@@ -13,45 +12,39 @@ using namespace std;
class TermData
{
public:
-
typedef ErlDrvTermData Item;
void add_atom(char* atom)
{
- spec.push_back(ERL_DRV_ATOM);
- spec.push_back(driver_mk_atom(atom));
- if (!stk.empty()) stk.top().first++;
+ inc_counter();
+ add(ERL_DRV_ATOM, driver_mk_atom(atom));
}
void add_uint(uint32_t uint)
{
- spec.push_back(ERL_DRV_UINT);
- spec.push_back((Item)uint);
- if (!stk.empty()) stk.top().first++;
+ inc_counter();
+ add(ERL_DRV_UINT, (Item)uint);
}
void add_buf(char* buf, size_t size, bool copy=false)
{
- if (not copy) {
- spec.push_back(ERL_DRV_BUF2BINARY);
- spec.push_back((Item)buf);
- spec.push_back((Item)size);
+ inc_counter();
+
+ if (!copy)
+ {
+ add(ERL_DRV_BUF2BINARY, (Item)buf, (Item)size);
}
- else {
+ else
+ {
ErlDrvBinary * bin = driver_alloc_binary(size);
memcpy(bin->orig_bytes, buf, size);
- spec.push_back(ERL_DRV_BINARY);
- spec.push_back((Item)bin);
- spec.push_back((Item)size);
- spec.push_back((Item)0);
+ add(ERL_DRV_BINARY, (Item)bin, (Item)size, (Item)0);
}
-
- if (!stk.empty()) stk.top().first++;
}
void open_tuple()
{
- if (!stk.empty()) stk.top().first++;
+ inc_counter();
stk.push(make_pair(0, 't'));
}
@@ -59,14 +52,13 @@ class TermData
{
if (check && (stk.empty() || stk.top().second != 't'))
return;
- spec.push_back(ERL_DRV_TUPLE);
- spec.push_back(stk.top().first);
+ add(ERL_DRV_TUPLE, (Item)stk.top().first);
stk.pop();
}
void open_list()
{
- if (!stk.empty()) stk.top().first++;
+ inc_counter();
stk.push(make_pair(0, 'l'));
}
@@ -74,9 +66,7 @@ class TermData
{
if (check && (stk.empty() || stk.top().second != 'l'))
return;
- spec.push_back(ERL_DRV_NIL);
- spec.push_back(ERL_DRV_LIST);
- spec.push_back(stk.top().first+1);
+ add(ERL_DRV_NIL, ERL_DRV_LIST, stk.top().first+1);
stk.pop();
}
@@ -92,44 +82,118 @@ class TermData
case 't':
close_tuple(false);
break;
+ default:
+ return;
}
}
}
- int output(ErlDrvPort port)
+ int output(ErlDrvPort port, Item to=0)
{
flush();
if (!spec.empty())
- return driver_output_term(port, &(spec.begin()[0]), spec.size());
+ {
+ if (to)
+ return driver_send_term(port, to, &(spec.begin()[0]), spec.size());
+ else
+ return driver_output_term(port, &(spec.begin()[0]), spec.size());
+ }
return 0;
}
- int output(ErlDrvPort port, Item to)
+private:
+ inline void inc_counter() { if (!stk.empty()) stk.top().first++; }
+ inline void add(Item a) { spec.push_back(a); }
+ inline void add(Item a, Item b) { spec.push_back(a); spec.push_back(b); }
+ inline void add(Item a, Item b, Item c) { spec.push_back(a); spec.push_back(b); spec.push_back(c); }
+ inline void add(Item a, Item b, Item c, Item d) { spec.push_back(a); spec.push_back(b); spec.push_back(c); spec.push_back(d); }
+private:
+ vector<Item> spec;
+ stack<pair<int, char> > stk;
+};
+
+class IOVec
+{
+public:
+ IOVec(SysIOVec* vec, size_t vlen): m_vec(vec), m_vlen(vlen), vidx(0)
+ {
+ cptr = m_vec[vidx].iov_base;
+ walk(0); // skip the first (nil, 0) vectors
+ }
+
+ IOVec(char* buf, size_t len): m_vec(&m_default_vec), m_vlen(1), vidx(0)
{
- flush();
- if (!spec.empty())
- return driver_send_term(port, to, &(spec.begin()[0]), spec.size());
- return 0;
+ m_default_vec.iov_base = buf;
+ m_default_vec.iov_len = len;
+ cptr = buf;
}
- int output(ErlDrvPort port, bool to_caller)
+ bool get(char& c)
{
- flush();
- if (!spec.empty())
+ char *p = walk(1);
+ if (p) c = *p;
+ return p!=NULL;
+ }
+
+ bool get(short& s)
+ {
+ char *p = walk(2);
+ if (p) s = ntohs(*(short*)p);
+ return p != NULL;
+ }
+
+ bool get(int& i)
+ {
+ char *p = walk(4);
+ if (p) i = ntohl(*(int*)p);
+ return p != NULL;
+ }
+
+ bool get(unsigned int& i)
+ {
+ char *p = walk(4);
+ if (p) i = (unsigned int)ntohl(*(int*)p);
+ return p != NULL;
+ }
+
+ bool get(char*& buf, size_t count) // the buffer cannot cross IOVec border.
+ {
+ buf = walk(count);
+ return buf != NULL;
+ }
+
+private:
+ char* walk(size_t count)
+ {
+ if (vidx == m_vlen) // reach the end of vectors.
+ return NULL;
+
+ char * ret = NULL;
+ size_t left = m_vec[vidx].iov_len - (cptr - m_vec[vidx].iov_base);
+ if (left == count) // walk to next vec
{
- if (to_caller)
- return driver_send_term(port, driver_caller(port), &(spec.begin()[0]), spec.size());
- else
- return driver_output_term(port, &(spec.begin()[0]), spec.size());
+ ret = cptr;
+ vidx++;
+ while (vidx < m_vlen && m_vec[vidx].iov_len == 0) vidx++;
+ if (vidx < m_vlen)
+ cptr = m_vec[vidx].iov_base;
+ } else if (left > count) { // stay at current vector
+ ret = cptr;
+ cptr += count;
}
- return 0;
+ return ret;
}
private:
- vector<Item> spec;
- stack<pair<int, char> > stk;
-};
+ SysIOVec m_default_vec; // emulate one buffer (giving buf ptr and len)
+ SysIOVec* m_vec;
+ size_t m_vlen;
+
+
+ char * cptr; // current pointer in one vec.
+ size_t vidx; // vector index
+};
#endif
View
7 lib/mcache/src/mcache2.erl
@@ -5,7 +5,7 @@
-module(mcache2).
-author('echou327@gmail.com').
--export([mget/2, set/5, set/4]).
+-export([get/2, mget/2, set/5, set/4]).
-define(SEP, ":").
-define(MGET_TIMEOUT, 500).
@@ -14,6 +14,11 @@
-define(FMT_JSON, 102).
-define(FMT_INT, 103).
+get(Class, Key) ->
+ {Pool, _Expiry} = mcache_expires:expire(Class),
+ {mc_async, 0, {ok, Value}} = memcached_drv:get(Pool, 0, {Class, Key}),
+ decode_value(Value).
+
mget(Class, [_|_]=Keys) ->
{Pool, _Expiry} = mcache_expires:expire(Class),
RealKeys = lists:map(fun(K) ->
View
105 lib/mcache/src/memcached_drv.erl
@@ -7,8 +7,10 @@
% API
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
+
-export([set_servers/2]).
--export([mget/2, mget/3, set/5, set/6]).
+-export([get/3, mget/3, set/6, delete/4]).
+-export([ab_get/3, ab_mget/3, ab_set/6, ab_delete/4]).
-record(state, {pool, servers}).
@@ -18,9 +20,12 @@
-define(DRV_TAG, memcached_drv).
-define(CMD_SET_SERVERS, 0).
--define(CMD_SET, 1).
+-define(CMD_GET, 1).
-define(CMD_MGET, 2).
--define(CMD_MGET_BY_CLASS, 3).
+-define(CMD_SET, 3).
+-define(CMD_DELETE, 4).
+
+-define(RECV_TIMEOUT, 1000).
start_link(PoolName, Servers) ->
gen_server:start_link(?MODULE, [PoolName, Servers], []).
@@ -112,23 +117,30 @@ get_driver_port(Pool) ->
[{_, Port} | _] = ets:lookup(?DRV_TABLE, {?DRV_TAG, Pool, PortIndex}),
Port.
-% API
+do_receive(Timeout) ->
+ receive
+ {mc_async, _, _} = Msg -> Msg
+ after Timeout ->
+ {error, timeout}
+ end.
+
control(Pool, Cmd, Data) ->
Port = get_driver_port(Pool),
erlang:port_control(Port, Cmd, Data).
-send_command(Port, Command, Timeout) ->
- port_command(Port, Command),
- receive
- Data ->
- Data
- after Timeout->
- {error, timeout}
- end.
-
-send_command(Port, Command) ->
- send_command(Port, Command, 100).
+to_binary(B) when is_binary(B) ->
+ B;
+to_binary(L) when is_list(L) ->
+ iolist_to_binary(L);
+to_binary(A) when is_atom(A) ->
+ list_to_binary(atom_to_list(A));
+to_binary(N) when is_integer(N) ->
+ list_to_binary(integer_to_list(N));
+to_binary({Class,Key}) ->
+ C = to_binary(Class),
+ K = to_binary(Key),
+ <<C/binary, ":", K/binary>>.
%%%%%%%
@@ -141,41 +153,50 @@ set_servers(Pool, Servers) ->
false
end.
-mget(Pool, Keys) -> mget(Pool, 0, Keys).
-mget(Pool, Seq, [_|_]=Keys) ->
+ab_get(Pool, Seq, Key) ->
Port = get_driver_port(Pool),
- NumKeys = length(Keys),
- Data = lists:foldl(fun(K,A) ->
- KLen = iolist_size(K),
- [K, <<KLen:32>>|A]
- end, [<<Seq:32, NumKeys:32>>], Keys),
- Data1 = lists:reverse(Data),
- send_command(Port, [?CMD_MGET|Data1]).
-
-class_to_iolist(Class) when is_atom(Class) ->
- atom_to_list(Class);
-class_to_iolist(Class) when is_list(Class); is_binary(Class) ->
- Class.
-
-mget_by_class(Pool, Seq, Class, [_|_]=Keys) ->
+ K = to_binary(Key),
+ KLen = byte_size(K),
+ erlang:port_command(Port, [<<?CMD_GET, Seq:32, KLen:32>>, K]).
+
+get(Pool, Seq, Key) ->
+ ab_get(Pool, Seq, Key),
+ do_receive(?RECV_TIMEOUT).
+
+ab_mget(Pool, Seq, [_|_]=Keys) ->
Port = get_driver_port(Pool),
NumKeys = length(Keys),
- Class1 = class_to_iolist(Class),
- ClassLen = iolist_size(Class1),
Data = lists:foldl(fun(K,A) ->
- KLen = iolist_size(K),
- [K, <<KLen:32>>|A]
- end, [<<Seq:32, NumKeys:32, ClassLen:32>>, Class1], Keys),
- Data1 = lists:reverse(Data),
- send_command(Port, [?CMD_MGET_BY_CLASS|Data1]).
+ K1 = to_binary(K),
+ KLen = byte_size(K1),
+ [K1, <<KLen:32>>|A]
+ end, [<<?CMD_MGET, Seq:32, NumKeys:32>>], Keys),
+ erlang:port_command(Port, lists:reverse(Data)).
-set(Pool, Key, Value, Flags, Expires) -> set(Pool, 0, Key, Value, Flags, Expires).
+mget(Pool, Seq, Keys) ->
+ ab_mget(Pool, Seq, Keys),
+ do_receive(?RECV_TIMEOUT).
+
+ab_set(Pool, Seq, Key, Value, Flags, Expires) ->
+ Port = get_driver_port(Pool),
+ K = to_binary(Key),
+ V = to_binary(Value),
+ KLen = byte_size(K),
+ VLen = byte_size(V),
+ erlang:port_command(Port, [<<?CMD_SET, Seq:32, KLen:32, VLen:32, Flags:32, Expires:32>>, K, V]).
set(Pool, Seq, Key, Value, Flags, Expires) ->
+ ab_set(Pool, Seq, Key, Value, Flags, Expires),
+ do_receive(?RECV_TIMEOUT).
+
+ab_delete(Pool, Seq, Key, Expires) ->
Port = get_driver_port(Pool),
- KLen = iolist_size(Key),
- VLen = iolist_size(Value),
- send_command(Port, [?CMD_SET, <<Seq:32, KLen:32>>, Key, <<VLen:32>>, Value, <<Flags:32, Expires:32>>]).
+ K = to_binary(Key),
+ KLen = byte_size(K),
+ erlang:port_command(Port, [<<?CMD_DELETE, Seq:32, Expires:32, KLen:32>>, K]).
+delete(Pool, Seq, Key, Expires) ->
+ ab_delete(Pool, Seq, Key, Expires),
+ do_receive(?RECV_TIMEOUT).
Please sign in to comment.
Something went wrong with that request. Please try again.