Permalink
Browse files

add another mget implementation.

  • Loading branch information...
1 parent a39ca2e commit fc4860ad6451da0a55956ef52e65eaf4ded1a7e8 @echou committed Dec 20, 2009
@@ -5,6 +5,8 @@
#include "termdata.hpp"
+#include <iostream>
+
using namespace std;
#define CMD_SET_SERVERS 0
@@ -13,6 +15,7 @@ using namespace std;
#define CMD_MGET 2
#define CMD_SET 3
#define CMD_DELETE 4
+#define CMD_MGET2 5
class Cache
{
@@ -85,6 +88,9 @@ class Driver
case CMD_DELETE:
doDelete(seq, vec);
break;
+ case CMD_MGET2:
+ doMGet2(seq, vec);
+ break;
}
}
}
@@ -239,6 +245,89 @@ class Driver
for(int i=0; i<free_list.size(); i++) memcached_result_free(free_list[i]);
}
+ void doMGet2(uint32_t seq, IOVec& vec)
+ {
+ TermData td = createReply(seq);
+
+ // <<Count:32, KeyLen:32, Key/binary, ...>>
+ int num_keys;
+ if (!vec.get(num_keys) || num_keys <= 0 || num_keys>2000)
+ goto L_badarg;
+
+ char * keys[num_keys];
+ size_t lengths[num_keys];
+
+ for(int i=0;i<num_keys;i++)
+ {
+ char nul;
+ if (!(vec.get(lengths[i]) && vec.get(keys[i], lengths[i]+1))) // trailing zero is included
+ goto L_badarg;
+ //printf("key #%d = %s\r\n", i, keys[i]);
+ }
+
+ goto L_arg_ok;
+
+ L_badarg:
+ send(badarg(td));
+ return;
+
+ L_arg_ok:
+ memcached_return rc;
+ rc = memcached_mget(m_cache, (const char**)keys, lengths, num_keys);
+
+ if (rc != MEMCACHED_SUCCESS)
+ {
+ send(error(td, rc));
+ return;
+ }
+
+ vector<memcached_result_st*> results;
+
+ // [ {Key, Value, Flag}, ... ]
+ memcached_result_st *result;
+ while ( (result = memcached_fetch_result(m_cache, NULL, &rc)) )
+ {
+ results.push_back(result);
+ //std::cout << "result: " << std::string(result->key, result->key_length) << "\r\n";
+ }
+
+ ok(td, true);
+ td.open_list();
+
+ int ri = 0;
+ for(int i=0; i<num_keys; i++)
+ {
+ memcached_result_st* r = NULL;
+ for(int j=ri; j<results.size(); j++)
+ {
+ if (lengths[i] == results[j]->key_length &&
+ memcmp(keys[i], results[j]->key, lengths[i]) == 0)
+ {
+ r = results[j];
+ //ri = j+1;
+ break;
+ }
+ }
+ if (r)
+ {
+ td.open_tuple();
+ //td.add_buf(result->key, result->key_length);
+ td.add_buf(memcached_string_value(&(r->value)), memcached_string_length(&(r->value)));
+ td.add_uint(r->flags);
+ td.close_tuple();
+ }
+ else
+ {
+ td.add_atom("undefined");
+ }
+ }
+
+ td.close_list();
+ send(td);
+
+ for(int i=0; i<results.size(); i++)
+ memcached_result_free(results[i]);
+ }
void doSet(char type, uint32_t seq, IOVec& vec)
{
TermData td = createReply(seq);
View
@@ -5,7 +5,9 @@
-module(mcache2).
-author('echou327@gmail.com').
--export([get/2, mget/2, set/5, delete/3]).
+-export([get/2, mget/2, mget2/2, set/5, delete/3]).
+
+-define(DICT, dict).
get(Class, Key) ->
{Pool, _Expiry} = mcache_expires:expire(Class),
@@ -14,19 +16,31 @@ get(Class, Key) ->
mget(Class, [_|_]=Keys) ->
{Pool, _Expiry} = mcache_expires:expire(Class),
- KeyDict = lists:foldl(fun(K, D) ->
+ {RealKeys, KeyDict} = lists:foldl(fun(K, {RK,D}) ->
K1 = mcache_util:map_key(Class, K),
- dict:store(K1, K, D)
- end, dict:new(), Keys),
+ {[K1|RK], ?DICT:store(K1, K, D)}
+ end, {[], ?DICT:new()}, Keys),
- {mc_async, 0, {ok, Values}} = memcached_drv:mget(Pool, 0, dict:fetch_keys(KeyDict)),
+ {mc_async, 0, {ok, Values}} = memcached_drv:mget(Pool, 0, RealKeys),
lists:foldl(fun({Key, Val, Flag}, Acc) ->
- case dict:find(Key, KeyDict) of
+ case ?DICT:find(Key, KeyDict) of
error -> Acc;
{ok, K} -> [{K, mcache_util:decode_value({Val, Flag})}|Acc]
end
end, [], Values).
+decode_values([], [], L) ->
+ lists:reverse(L);
+decode_values([K|Ks], [undefined|Vs], L) ->
+ decode_values(Ks, Vs, L);
+decode_values([K|Ks], [V|Vs], L) ->
+ decode_values(Ks, Vs, [{K, mcache_util:decode_value(V)}|L]).
+
+mget2(Class, [_|_]=Keys) ->
+ {Pool, _Expiry} = mcache_expires:expire(Class),
+ {mc_async, 0, {ok, Values}} = memcached_drv:mget2(Pool, 0, Class, Keys),
+ decode_values(Keys, Values, []).
+
set(Class, Key, Value, Format, Expiry) ->
{Pool, DefaultExpiry} = mcache_expires:expire(Class),
{Value1, Flags} = mcache_util:encode_value(Value, Format),
@@ -32,6 +32,11 @@ test_mget(P, R, M) ->
end,
stresstest:start("test_mget", P, R, F).
+test_mget2(P, R, M) ->
+ F = fun(I) ->
+ M:mget(mcache.test, [<<"key:", V:32>> || V <- lists:seq(I, I + 20)])
+ end,
+ stresstest:start("test_mget", P, R, F).
gen_continuum(Mode) ->
Servers = [
@@ -9,8 +9,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
-export([set_servers/2]).
--export([get/3, mget/3, set/6, delete/4]).
--export([ab_get/3, ab_mget/3, ab_set/6, ab_delete/4]).
+-export([get/3, mget/3, mget2/4, set/6, delete/4]).
+-export([ab_get/3, ab_mget/3, ab_mget2/4, ab_set/6, ab_delete/4]).
-record(state, {pool, servers}).
@@ -24,6 +24,7 @@
-define(CMD_MGET, 2).
-define(CMD_SET, 3).
-define(CMD_DELETE, 4).
+-define(CMD_MGET2, 5).
-define(RECV_TIMEOUT, 1000).
@@ -178,6 +179,22 @@ mget(Pool, Seq, Keys) ->
ab_mget(Pool, Seq, Keys),
do_receive(?RECV_TIMEOUT).
+ab_mget2(Pool, Seq, Class, Keys) ->
+ Port = get_driver_port(Pool),
+ NumKeys = length(Keys),
+ Class1 = to_binary(Class),
+ CLen = byte_size(Class1),
+ Data = lists:foldl(fun(K, A) ->
+ K1 = to_binary(K),
+ KLen = byte_size(K1) + CLen + 1,
+ [<<KLen:32, Class1/binary, ":", K1/binary, 0>>|A]
+ end, [<<?CMD_MGET2, Seq:32, NumKeys:32>>], Keys),
+ erlang:port_command(Port, lists:reverse(Data)).
+
+mget2(Pool, Seq, Class, Keys) ->
+ ab_mget2(Pool, Seq, Class, Keys),
+ do_receive(?RECV_TIMEOUT).
+
ab_set(Pool, Seq, Key, Value, Flags, Expires) ->
Port = get_driver_port(Pool),
K = to_binary(Key),

0 comments on commit fc4860a

Please sign in to comment.