Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

added --key-prefix feature on kumo-gateway

  • Loading branch information...
commit a224108c2e330a0581f981a5cd33fbb7ab35dda1 1 parent 78879b5
@frsyuki frsyuki authored
View
3  src/gate/cloudy.cc
@@ -273,7 +273,6 @@ void request_getx(void* user,
gate::req_get req;
req.keylen = keylen;
req.key = key;
- req.hash = kumo::gate::stdhash(req.key, req.keylen);
req.user = static_cast<void*>(e);
req.callback = &response_getx;
req.life = life;
@@ -304,7 +303,6 @@ void request_set(void* user,
req.key = key;
req.vallen = vallen;
req.val = val;
- req.hash = kumo::gate::stdhash(req.key, req.keylen);
req.user = static_cast<void*>(e);
req.callback = &response_set;
req.life = life;
@@ -332,7 +330,6 @@ void request_delete(void* user,
gate::req_delete req;
req.key = key;
req.keylen = keylen;
- req.hash = kumo::gate::stdhash(req.key, req.keylen);
req.user = static_cast<void*>(e);
req.callback = &response_delete;
req.life = life;
View
21 src/gate/interface.h
@@ -49,6 +49,7 @@ struct res_get {
int error;
const char* key;
uint32_t keylen;
+
uint64_t hash;
char* val;
@@ -59,11 +60,13 @@ struct res_get {
typedef void (*callback_get)(void* user, res_get& res, auto_zone z);
struct req_get {
- req_get() { }
+ req_get() : has_user_hash(false) { }
const char* key;
uint32_t keylen;
- uint64_t hash;
+
+ bool has_user_hash;
+ uint64_t user_hash;
shared_zone life;
callback_get callback;
@@ -87,6 +90,7 @@ struct res_set {
const char* key;
uint32_t keylen;
+
uint64_t hash;
const char* val;
@@ -99,11 +103,13 @@ struct res_set {
typedef void (*callback_set)(void* user, res_set& res, auto_zone z);
struct req_set {
- req_set() : operation(OP_SET) { }
+ req_set() : has_user_hash(false), operation(OP_SET) { }
const char* key;
uint32_t keylen;
- uint64_t hash;
+
+ bool has_user_hash;
+ uint64_t user_hash;
const char* val;
uint32_t vallen;
@@ -124,6 +130,7 @@ struct res_delete {
const char* key;
uint32_t keylen;
+
uint64_t hash;
bool deleted;
@@ -132,11 +139,13 @@ struct res_delete {
typedef void (*callback_delete)(void* user, res_delete& res, auto_zone z);
struct req_delete {
- req_delete() : async(false) { }
+ req_delete() : has_user_hash(false), async(false) { }
const char* key;
uint32_t keylen;
- uint64_t hash;
+
+ bool has_user_hash;
+ uint64_t user_hash;
bool async;
View
3  src/gate/memcache_binary.cc
@@ -559,7 +559,6 @@ void handler::request_getx(memproto_header* h,
gate::req_get req;
req.keylen = keylen;
req.key = key;
- req.hash = gate::stdhash(req.key, req.keylen);
req.user = reinterpret_cast<void*>(e);
req.callback = &handler::response_getx;
req.life = life;
@@ -625,7 +624,6 @@ void handler::request_set(memproto_header* h,
req.key = key;
req.vallen = vallen;
req.val = val;
- req.hash = gate::stdhash(req.key, req.keylen);
req.user = reinterpret_cast<void*>(e);
req.callback = &handler::response_set;
req.life = life;
@@ -658,7 +656,6 @@ void handler::request_delete(memproto_header* h,
gate::req_delete req;
req.key = key;
req.keylen = keylen;
- req.hash = gate::stdhash(req.key, req.keylen);
req.user = reinterpret_cast<void*>(e);
req.callback = &handler::response_delete;
req.life = life;
View
4 src/gate/memcache_text.cc
@@ -429,7 +429,6 @@ int request_get_single(void* user,
gate::req_get req;
req.keylen = key_len;
req.key = key;
- req.hash = gate::stdhash(req.key, req.keylen);
req.user = reinterpret_cast<void*>(e);
req.callback = &response_get;
req.life = life;
@@ -483,7 +482,6 @@ int request_get_multi(void* user,
req.keylen = r->key_len[i];
req.key = r->key[i];
req.user = reinterpret_cast<void*>(me[i]);
- req.hash = gate::stdhash(req.key, req.keylen);
req.callback = &response_get_multi;
req.life = life;
@@ -563,7 +561,6 @@ int request_set_impl(void* user,
req.key = r->key;
req.vallen = r->data_len;
req.val = r->data;
- req.hash = gate::stdhash(req.key, req.keylen);
req.user = reinterpret_cast<void*>(e);
req.life = life;
if(r->noreply) {
@@ -635,7 +632,6 @@ int request_delete(void* user,
gate::req_delete req;
req.key = r->key;
req.keylen = r->key_len;
- req.hash = gate::stdhash(req.key, req.keylen);
req.user = reinterpret_cast<void*>(e);
if(r->noreply) {
req.async = true;
View
4 src/logic/gateway/framework.h
@@ -82,6 +82,8 @@ class resource {
unsigned short m_error_count;
+ std::string m_cfg_key_prefix;
+
public:
// mod_store.cc
void incr_error_renew_count();
@@ -114,6 +116,8 @@ class resource {
RESOURCE_CONST_ACCESSOR(unsigned short, cfg_renew_threshold);
+ RESOURCE_CONST_ACCESSOR(std::string, cfg_key_prefix);
+
private:
resource();
resource(const resource&);
View
1  src/logic/gateway/init.h
@@ -57,6 +57,7 @@ resource::resource(const Config& cfg) :
m_cfg_set_retry_num(cfg.set_retry_num),
m_cfg_delete_retry_num(cfg.delete_retry_num),
m_cfg_renew_threshold(cfg.renew_threshold),
+ m_cfg_key_prefix(cfg.key_prefix),
m_error_count(0)
{ }
View
6 src/logic/gateway/main.cc
@@ -62,6 +62,8 @@ struct arg_t : rpc_args {
bool mc_save_flag;
bool mc_save_exptime;
+ std::string key_prefix;
+
virtual void convert()
{
rpc_args::convert();
@@ -119,6 +121,8 @@ struct arg_t : rpc_args {
type::boolean(&async_replicate_set));
on("-Ad", "--async-replicate-delete",
type::boolean(&async_replicate_delete));
+ on("-k", "--key-prefix",
+ type::string(&key_prefix, ""));
parse(argc, argv);
}
@@ -158,6 +162,8 @@ struct arg_t : rpc_args {
"--delete-retry delete retry limit\n"
" -rn <number="<<renew_threshold<<"> "
"--renew-threshold hash space renew threshold\n"
+ " -k <string> "
+ "--key-prefix add prefix to keys automatically\n"
;
rpc_args::show_usage();
}
View
80 src/logic/gateway/mod_store.cc
@@ -79,6 +79,44 @@ framework::shared_session resource::server_for(uint64_t h, unsigned int offset)
return net->get_session(addr);
}
+template <typename ReqType>
+static msgtype::DBKey dbkey_with_prefix(const ReqType& req, shared_zone& life) {
+ const std::string prefix(share->cfg_key_prefix());
+ uint64_t hash;
+ if(prefix.empty()) {
+ if(req.has_user_hash) {
+ hash = req.user_hash;
+ } else {
+ hash = gate::stdhash(req.key, req.keylen);
+ }
+ return msgtype::DBKey(req.key, req.keylen, hash);
+
+ } else {
+ size_t klen = prefix.size()+req.keylen;
+ char* kp = (char*)life->malloc(klen);
+ memcpy(kp, prefix.data(), prefix.size());
+ memcpy(kp+prefix.size(), req.key, req.keylen);
+ if(req.has_user_hash) {
+ hash = req.user_hash;
+ } else {
+ hash = gate::stdhash(kp, klen);
+ }
+ return msgtype::DBKey(kp, klen, hash);
+ }
+}
+
+template <typename ResType>
+static void dbkey_remove_prefix(ResType* ret, const msgtype::DBKey& key) {
+ const std::string prefix(share->cfg_key_prefix());
+ if(prefix.empty()) {
+ ret->key = key.data();
+ ret->keylen = key.size();
+ } else {
+ ret->key = key.data()+prefix.size();
+ ret->keylen = key.size()-prefix.size();
+ }
+}
+
void resource::incr_error_renew_count()
{
@@ -122,7 +160,7 @@ try {
shared_zone life(req.life);
if(!life) { life.reset(new msgpack::zone()); }
- msgtype::DBKey key(req.key, req.keylen, req.hash);
+ msgtype::DBKey key = dbkey_with_prefix(req, life);
msgtype::DBValue cached_val_buf;
if(net->mod_cache.get(key, &cached_val_buf, life.get())) {
@@ -137,7 +175,7 @@ try {
BIND_RESPONSE(mod_store_t, GetIfModified, retry,
req.callback, req.user, cached_val) );
- retry->call(share->server_for<resource::HS_READ>(req.hash), life, 10);
+ retry->call(share->server_for<resource::HS_READ>(key.hash()), life, 10);
} else {
rpc::retry<server::mod_store_t::Get>* retry =
@@ -149,7 +187,7 @@ try {
BIND_RESPONSE(mod_store_t, Get, retry,
req.callback, req.user) );
- retry->call(share->server_for<resource::HS_READ>(req.hash), life, 10);
+ retry->call(share->server_for<resource::HS_READ>(key.hash()), life, 10);
}
}
SUBMIT_CATCH(_get);
@@ -181,17 +219,19 @@ try {
break;
}
+ msgtype::DBKey key = dbkey_with_prefix(req, life);
+
uint16_t meta = 0;
rpc::retry<server::mod_store_t::Set>* retry =
life->allocate< rpc::retry<server::mod_store_t::Set> >(
server::mod_store_t::Set(op,
- msgtype::DBKey(req.key, req.keylen, req.hash),
+ key,
msgtype::DBValue(req.val, req.vallen, meta, clocktime))
);
retry->set_callback(
BIND_RESPONSE(mod_store_t, Set, retry, req.callback, req.user) );
- retry->call(share->server_for<resource::HS_WRITE>(req.hash), life, 10);
+ retry->call(share->server_for<resource::HS_WRITE>(key.hash()), life, 10);
}
SUBMIT_CATCH(_set);
@@ -201,18 +241,20 @@ try {
shared_zone life(req.life);
if(!life) { life.reset(new msgpack::zone()); }
+ msgtype::DBKey key = dbkey_with_prefix(req, life);
+
rpc::retry<server::mod_store_t::Delete>* retry =
life->allocate< rpc::retry<server::mod_store_t::Delete> >(
server::mod_store_t::Delete(
(share->cfg_async_replicate_delete() || req.async) ?
static_cast<server::store_flags>(server::store_flags_async()) :
static_cast<server::store_flags>(server::store_flags_none()),
- msgtype::DBKey(req.key, req.keylen, req.hash))
+ key)
);
retry->set_callback(
BIND_RESPONSE(mod_store_t, Delete, retry, req.callback, req.user) );
- retry->call(share->server_for<resource::HS_WRITE>(req.hash), life, 10);
+ retry->call(share->server_for<resource::HS_WRITE>(key.hash()), life, 10);
}
SUBMIT_CATCH(_delete);
@@ -281,8 +323,7 @@ try {
if(err.is_nil()) {
gate::res_get ret;
ret.error = 0;
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
if(res.is_nil()) {
ret.val = NULL;
@@ -317,8 +358,7 @@ try {
}
gate::res_get ret;
ret.error = 1; // ERROR
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
ret.val = NULL;
ret.vallen = 0;
@@ -345,8 +385,7 @@ try {
if(err.is_nil()) {
gate::res_get ret;
ret.error = 0;
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
if(res.is_nil()) {
ret.val = NULL;
@@ -387,8 +426,7 @@ try {
}
gate::res_get ret;
ret.error = 1; // ERROR
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
ret.val = NULL;
ret.vallen = 0;
@@ -414,8 +452,7 @@ try {
if(!res.is_nil()) {
gate::res_set ret;
ret.error = 0;
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
ret.val = val.data();
ret.vallen = val.size();
@@ -444,8 +481,7 @@ try {
}
gate::res_set ret;
ret.error = 1; // ERROR
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
ret.val = val.data();
ret.vallen = val.size();
@@ -472,8 +508,7 @@ try {
bool st = res.as<bool>();
gate::res_delete ret;
ret.error = 0;
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
ret.deleted = st;
try { (*callback)(user, ret, z); } catch (...) { }
@@ -493,8 +528,7 @@ try {
}
gate::res_delete ret;
ret.error = 1; // ERROR
- ret.key = key.data();
- ret.keylen = key.size();
+ dbkey_remove_prefix(&ret, key);
ret.hash = key.hash();
ret.deleted = false;
try { (*callback)(user, ret, z); } catch (...) { }
Please sign in to comment.
Something went wrong with that request. Please try again.