Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

make set and delete no reply by default

  • Loading branch information...
commit 67a3678b5f4c41321040efb206fe496728ba6610 1 parent 9370e3c
@echou authored
View
93 c_src/memcached_drv.cpp
@@ -16,6 +16,8 @@ using namespace std;
#define CMD_SET 3
#define CMD_DELETE 4
#define CMD_MGET2 5
+#define CMD_SET_NOREPLY 13
+#define CMD_DELETE_NOREPLY 14
class Cache
{
@@ -25,27 +27,15 @@ class Cache
operator memcached_st*() { return mc; }
- bool setServers(const string& servers)
+ bool setServers(int binary, const string& servers)
{
-
- memcached_server_st* s = memcached_servers_parse(servers.c_str());
- printf("servers %s, %p\r\n", servers.c_str(), s);
- if (!s) return false;
-
- memcached_servers_reset(mc);
- int ret = memcached_server_push(mc, s);
- memcached_server_list_free(s);
-
- if (ret == MEMCACHED_SUCCESS)
- {
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
- memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
- }
-
- return ret == 0;
-
+ memcached_free(mc);
+ mc = memcached(servers.c_str(), servers.length());
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED, 1);
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, binary);
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
+ memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
+ return true;
}
private:
@@ -86,10 +76,16 @@ class Driver
doMGet(seq, vec);
break;
case CMD_SET:
- doSet('s', seq, vec);
+ doSet(seq, vec, true);
+ break;
+ case CMD_SET_NOREPLY:
+ doSet(seq, vec, false);
break;
case CMD_DELETE:
- doDelete(seq, vec);
+ doDelete(seq, vec, true);
+ break;
+ case CMD_DELETE_NOREPLY:
+ doDelete(seq, vec, false);
break;
case CMD_MGET2:
doMGet2(seq, vec);
@@ -145,7 +141,8 @@ class Driver
ErlDrvSSizeT doSetServers(char* buf, int len, char** rbuf, int rlen)
{
- m_cache.setServers(buf);
+ int binary = (int)(*(char*)buf);
+ m_cache.setServers(binary, buf+1);
return 0;
}
@@ -170,12 +167,15 @@ class Driver
char* value = memcached_get(m_cache, (const char*)key, klen, &vlen, &flags, &rc);
if (rc == MEMCACHED_SUCCESS)
{
- ok(td, true);
+ td.open_tuple();
+ td.add_atom((char*)"ok");
td.open_tuple();
td.add_buf(value, vlen);
td.add_uint(flags);
td.close_tuple();
+
+ td.close_tuple();
}
else if (rc == MEMCACHED_NOTFOUND)
{
@@ -330,35 +330,49 @@ class Driver
for(size_t i=0; i<results.size(); i++)
memcached_result_free(results[i]);
}
- void doSet(char type, uint32_t seq, IOVec& vec)
+
+ void doSet(uint32_t seq, IOVec& vec, bool wantReply)
{
TermData td = createReply(seq);
+ // <<Seq:32, Op:8, KeyLen:32, VLen:32, Flags:32, Expires:32, Key/binary, Value/binary>>
+ char op;
- // <<KeyLen:32, Key/binary, ValueLen:32, Value/binary, Flags:32, Expires:32>>
size_t klen, vlen;
uint32_t flags, expires;
char *key, *value;
- if (!(vec.get(klen) &&
+ if (!(vec.get(op) &&
+ vec.get(klen) &&
vec.get(vlen) &&
vec.get(flags) &&
vec.get(expires) &&
vec.get(key, klen) &&
vec.get(value, vlen)))
{
- send(badarg(td));
+ if (wantReply)
+ send(badarg(td));
return;
}
memcached_return rc;
- rc = memcached_set(m_cache, (const char*)key, klen, value, vlen, expires, flags);
- if (rc == MEMCACHED_SUCCESS)
- send(ok(td, false));
+ if (op == 's')
+ rc = memcached_set(m_cache, (const char*)key, klen, value, vlen, expires, flags);
+ else if (op == 'a')
+ rc = memcached_add(m_cache, (const char*)key, klen, value, vlen, expires, flags);
+ else if (op == 'r')
+ rc = memcached_replace(m_cache, (const char*)key, klen, value, vlen, expires, flags);
else
- send(error(td, rc));
+ rc = memcached_set(m_cache, (const char*)key, klen, value, vlen, expires, flags);
+
+ if (wantReply) {
+ if (rc == MEMCACHED_SUCCESS)
+ send(ok(td, false));
+ else
+ send(error(td, rc));
+ }
}
- void doDelete(uint32_t seq, IOVec& vec)
+ void doDelete(uint32_t seq, IOVec& vec, bool wantReply)
{
TermData td = createReply(seq);
@@ -371,16 +385,19 @@ class Driver
vec.get(klen) &&
vec.get(key, klen)))
{
- send(badarg(td));
+ if (wantReply)
+ send(badarg(td));
return;
}
memcached_return rc;
rc = memcached_delete(m_cache, (const char*)key, klen, expires);
- if (rc == MEMCACHED_SUCCESS)
- send(ok(td, false));
- else
- send(error(td, rc));
+ if (wantReply) {
+ if (rc == MEMCACHED_SUCCESS)
+ send(ok(td, false));
+ else
+ send(error(td, rc));
+ }
}
private:
View
2  src/mcache_memcached_drv_sup.erl
@@ -17,7 +17,7 @@ init([]) ->
Name = proplists:get_value(name, Pool, generic),
IsBinary = proplists:get_value(binary, Pool, true),
Servers = proplists:get_value(servers, Pool, []),
- S1 = string:join([Addr ++ ":" ++ integer_to_list(Weight)||{Addr, Weight}<-Servers], ","),
+ S1 = string:join(["--server=" ++ Addr++"/?"++integer_to_list(Weight)||{Addr, Weight}<-Servers], " "),
Options = [{binary, IsBinary}, {servers, S1}],
{{memcached_drv, Name},
{memcached_drv, start_link, [Name, Options]},
View
8 src/memcached_drv.erl
@@ -24,6 +24,8 @@
-define(CMD_SET, 3).
-define(CMD_DELETE, 4).
-define(CMD_MGET2, 5).
+-define(CMD_SET_NOREPLY,13).
+-define(CMD_DELETE_NOREPLY,14).
-define(RECV_TIMEOUT, 1000).
@@ -49,7 +51,7 @@ init([PoolName, Options]=_Args) ->
end,
IsBinary = proplists:get_value(binary, Options, true),
Servers = proplists:get_value(servers, Options, []),
- io:format("~p:init(~p) ~p, ~s~n", [?MODULE, _Args, bool(IsBinary), Servers]),
+ %io:format("~p:init(~p) ~p, ~s~n", [?MODULE, _Args, bool(IsBinary), Servers]),
catch ets:new(?DRV_TABLE, [set, public, named_table]),
lists:foreach(fun(I) ->
Port = open_port({spawn_driver, ?DRV_NAME}, []),
@@ -203,11 +205,11 @@ set(Pool, Seq, Op, Key, Value, Flags, Expires) ->
replace -> $r;
_ -> $s
end,
- erlang:port_command(Port, [<<?CMD_SET, Seq:32, Op1:8, KLen:32, VLen:32, Flags:32, Expires:32>>, K, V]).
+ erlang:port_command(Port, [<<?CMD_SET_NOREPLY, Seq:32, Op1:8, KLen:32, VLen:32, Flags:32, Expires:32>>, K, V]).
delete(Pool, Seq, Key) ->
Port = get_driver_port(Pool),
K = to_binary(Key),
KLen = byte_size(K),
- erlang:port_command(Port, [<<?CMD_DELETE, Seq:32, KLen:32>>, K]).
+ erlang:port_command(Port, [<<?CMD_DELETE_NOREPLY, Seq:32, KLen:32>>, K]).

0 comments on commit 67a3678

Please sign in to comment.
Something went wrong with that request. Please try again.