Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

MB-3798 - moxi option for ketama/weighted/modula item distributions

When using libmemcached, start moxi with an extra -Z key=value
configuration option...

  moxi -Z mcs_opts=distribution:ketama
  moxi -Z mcs_opts=distribution:ketama-weighted
  moxi -Z mcs_opts=distribution:modula

In this commit, moxi stays with distribution:ketama as its default and
a later debate can change that.

Change-Id: I36d3df3a2ba79c9d793a5e1e1a31d0d24ba48450
Reviewed-on: http://review.membase.org/6138
Tested-by: Steve Yen <steve.yen@gmail.com>
Reviewed-by: Matt Ingenthron <matt@northscale.com>
  • Loading branch information...
commit 167ac24ff289e043442ae1aab749f0f473b9fda6 1 parent 1f5d894
@steveyen steveyen authored
Showing with 51 additions and 14 deletions.
  1. +9 −5 cproxy.c
  2. +1 −0  cproxy.h
  3. +7 −0 cproxy_config.c
  4. +32 −8 mcs.c
  5. +2 −1  mcs.h
View
14 cproxy.c
@@ -48,7 +48,8 @@ bool downstream_connect_init(downstream *d, mcs_server_st *msst,
int init_mcs_st(mcs_st *mst, char *config,
const char *default_usr,
- const char *default_pwd);
+ const char *default_pwd,
+ const char *opts);
bool cproxy_on_connect_downstream_conn(conn *c);
@@ -1255,7 +1256,8 @@ downstream *cproxy_create_downstream(char *config,
behavior_pool->base.pwd :
NULL;
- int nconns = init_mcs_st(&d->mst, d->config, usr, pwd);
+ int nconns = init_mcs_st(&d->mst, d->config, usr, pwd,
+ behavior_pool->base.mcs_opts);
if (nconns > 0) {
d->downstream_conns = (conn **)
calloc(nconns, sizeof(conn *));
@@ -1277,12 +1279,13 @@ downstream *cproxy_create_downstream(char *config,
int init_mcs_st(mcs_st *mst, char *config,
const char *default_usr,
- const char *default_pwd) {
+ const char *default_pwd,
+ const char *opts) {
assert(mst);
assert(config);
if (mcs_create(mst, config,
- default_usr, default_pwd) != NULL) {
+ default_usr, default_pwd, opts) != NULL) {
return mcs_server_count(mst);
} else {
if (settings.verbose > 1) {
@@ -1323,7 +1326,8 @@ bool cproxy_check_downstream_config(downstream *d) {
mcs_st next;
- int n = init_mcs_st(&next, d->ptd->config, usr, pwd);
+ int n = init_mcs_st(&next, d->ptd->config, usr, pwd,
+ d->ptd->behavior_pool.base.mcs_opts);
if (n > 0) {
if (mcs_stable_update(&d->mst, &next)) {
if (settings.verbose > 2) {
View
1  cproxy.h
@@ -119,6 +119,7 @@ struct proxy_behavior {
struct timeval connect_timeout; // PL: Fields of 0 mean no timeout.
struct timeval auth_timeout; // PL: Fields of 0 mean no timeout.
bool time_stats; // IL: Capture timing stats.
+ char mcs_opts[80]; // PL: Extra options for mcs initialization.
uint32_t connect_max_errors; // IL: Pause when too many connect() errs.
uint32_t connect_retry_interval; // IL: Time in millisecs before retrying
View
7 cproxy_config.c
@@ -68,6 +68,7 @@ proxy_behavior behavior_default_g = {
.tv_usec = 100000
},
.time_stats = false,
+ .mcs_opts = {0},
.connect_max_errors = 5, // In zstored, 10.
.connect_retry_interval = 30000, // In zstored, 30000.
.front_cache_max = 200,
@@ -685,6 +686,11 @@ void cproxy_parse_behavior_key_val(char *key,
} else if (wordeq(key, "time_stats")) {
ok = safe_strtoul(val, &x);
behavior->time_stats = x;
+ } else if (wordeq(key, "mcs_opts")) {
+ if (strlen(val) < sizeof(behavior->mcs_opts)) {
+ strcpy(behavior->mcs_opts, val);
+ ok = true;
+ }
} else if (wordeq(key, "connect_max_errors")) {
ok = safe_strtoul(val, &behavior->connect_max_errors);
} else if (wordeq(key, "connect_retry_interval")) {
@@ -873,6 +879,7 @@ void cproxy_dump_behavior_ex(proxy_behavior *b, char *prefix, int level,
(b->auth_timeout.tv_sec * 1000 +
b->auth_timeout.tv_usec / 1000));
vdump("time_stats", "%d", b->time_stats);
+ vdump("mcs_opts", "%s", b->mcs_opts);
vdump("connect_max_errors", "%u", b->connect_max_errors);
vdump("connect_retry_interval", "%u", b->connect_retry_interval);
vdump("front_cache_max", "%u", b->front_cache_max);
View
40 mcs.c
@@ -9,6 +9,7 @@
#include <fcntl.h>
#include <assert.h>
#include "memcached.h"
+#include "cproxy.h"
#include "mcs.h"
#include "log.h"
@@ -20,7 +21,8 @@
//
mcs_st *lvb_create(mcs_st *ptr, const char *config,
const char *default_usr,
- const char *default_pwd);
+ const char *default_pwd,
+ const char *opts);
void lvb_free_data(mcs_st *ptr);
bool lvb_stable_update(mcs_st *curr_version, mcs_st *next_version);
uint32_t lvb_key_hash(mcs_st *ptr, const char *key, size_t key_length,
@@ -32,7 +34,8 @@ void lvb_server_invalid_vbucket(mcs_st *ptr, int server_index,
//
mcs_st *lmc_create(mcs_st *ptr, const char *config,
const char *default_usr,
- const char *default_pwd);
+ const char *default_pwd,
+ const char *opts);
void lmc_free_data(mcs_st *ptr);
uint32_t lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length,
int *vbucket);
@@ -41,13 +44,14 @@ uint32_t lmc_key_hash(mcs_st *ptr, const char *key, size_t key_length,
mcs_st *mcs_create(mcs_st *ptr, const char *config,
const char *default_usr,
- const char *default_pwd) {
+ const char *default_pwd,
+ const char *opts) {
#ifdef MOXI_USE_LIBVBUCKET
if (config[0] == '{') {
if (settings.verbose > 2) {
moxi_log_write("mcs_create using libvbucket\n");
}
- return lvb_create(ptr, config, default_usr, default_pwd);
+ return lvb_create(ptr, config, default_usr, default_pwd, opts);
}
#endif
#ifdef MOXI_USE_LIBMEMCACHED
@@ -55,7 +59,7 @@ mcs_st *mcs_create(mcs_st *ptr, const char *config,
if (settings.verbose > 2) {
moxi_log_write("mcs_create using libmemcached\n");
}
- return lmc_create(ptr, config, default_usr, default_pwd);
+ return lmc_create(ptr, config, default_usr, default_pwd, opts);
}
#endif
moxi_log_write("ERROR: unconfigured hash library\n");
@@ -142,7 +146,10 @@ void mcs_server_invalid_vbucket(mcs_st *ptr, int server_index,
mcs_st *lvb_create(mcs_st *ptr, const char *config,
const char *default_usr,
- const char *default_pwd) {
+ const char *default_pwd,
+ const char *opts) {
+ (void) opts;
+
assert(ptr);
memset(ptr, 0, sizeof(*ptr));
ptr->kind = MCS_KIND_LIBVBUCKET;
@@ -295,15 +302,32 @@ void lvb_server_invalid_vbucket(mcs_st *ptr, int server_index,
mcs_st *lmc_create(mcs_st *ptr, const char *config,
const char *default_usr,
- const char *default_pwd) {
+ const char *default_pwd,
+ const char *opts) {
assert(ptr);
memset(ptr, 0, sizeof(*ptr));
ptr->kind = MCS_KIND_LIBMEMCACHED;
memcached_st *mst = memcached_create(NULL);
if (mst != NULL) {
+ memcached_behavior_t b = MEMCACHED_BEHAVIOR_KETAMA;
+ uint64_t v = 1;
+
+ if (opts != NULL) {
+ if (strstr(opts, "distribution:ketama-weighted") != NULL) {
+ b = MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED;
+ v = 1;
+ } else if (strstr(opts, "distribution:ketama") != NULL) {
+ b = MEMCACHED_BEHAVIOR_KETAMA;
+ v = 1;
+ } else if (strstr(opts, "distribution:modula") != NULL) {
+ b = MEMCACHED_BEHAVIOR_KETAMA;
+ v = 0;
+ }
+ }
+
+ memcached_behavior_set(mst, b, v);
memcached_behavior_set(mst, MEMCACHED_BEHAVIOR_NO_BLOCK, 1);
- memcached_behavior_set(mst, MEMCACHED_BEHAVIOR_KETAMA, 1);
memcached_behavior_set(mst, MEMCACHED_BEHAVIOR_TCP_NODELAY, 1);
memcached_server_st *mservers;
View
3  mcs.h
@@ -54,7 +54,8 @@ typedef struct {
mcs_st *mcs_create(mcs_st *ptr, const char *config,
const char *default_usr,
- const char *default_pwd);
+ const char *default_pwd,
+ const char *opts);
void mcs_free(mcs_st *ptr);
Please sign in to comment.
Something went wrong with that request. Please try again.