Skip to content

Commit

Permalink
durability: Refactor option validation
Browse files Browse the repository at this point in the history
This refactors option validation and exposes validation as a standalone
function. This also ensures behavior is correct when there are some
non-data nodes.

Change-Id: Ifcd59d140e17d621bfba79d6916cab112ebe814f
Reviewed-on: http://review.couchbase.org/51797
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Subhashni Balakrishnan <b.subhashni@gmail.com>
  • Loading branch information
mnunberg authored and Mark Nunberg committed Jun 15, 2015
1 parent 8fe4c1b commit ef543ac
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 24 deletions.
31 changes: 31 additions & 0 deletions include/libcouchbase/couchbase.h
Expand Up @@ -2190,6 +2190,37 @@ typedef void (*lcb_durability_callback)(lcb_t instance,
LIBCOUCHBASE_API
lcb_durability_callback lcb_set_durability_callback(lcb_t,
lcb_durability_callback);

#define LCB_DURABILITY_VALIDATE_CAPMAX 1<<1

/**
* @committed
*
* Validate durability settings.
*
* This function will validate (and optionally modify) the settings. This is
* helpful to ensure the durability options are valid _before_ issuing a command
*
* @param instance the instance
* @param[in,out] persist_to The desired number of servers to persist to.
* If the `CAPMAX` flag is set, on output this will contain the effective
* number of servers the item can be persisted to
* @param[in,out] replicate_to The desired number of servers to replicate to.
* If the `CAPMAX` flag is set, on output this will contain the effective
* number of servers the item can be replicated to
* @param options Options to use for validating. The only recognized option
* is currently `LCB_DURABILITY_VALIDATE_CAPMAX` which has the same semantics
* as lcb_DURABILITYOPTSv0::cap_max.
* @return LCB_SUCCESS on success
* @return LCB_DURABILITY_ETOOMANY if the requirements exceed the number of
* servers currently available, and `CAPMAX` was not specifie
* @return LCB_EINVAL if both `persist_to` and `replicate_to` are 0.
*/
LIBCOUCHBASE_API
lcb_error_t
lcb_durability_validate(lcb_t instance,
lcb_U16 *persist_to, lcb_U16 *replicate_to, int options);

/**@}*/

/******************************************************************************
Expand Down
1 change: 1 addition & 0 deletions include/libcouchbase/vbucket.h
Expand Up @@ -136,6 +136,7 @@ typedef struct {


#define LCBVB_NSERVERS(cfg) (cfg)->nsrv
#define LCBVB_NDATASERVERS(cfg) (cfg)->ndatasrv
#define LCBVB_NREPLICAS(cfg) (cfg)->nrepl
#define LCBVB_DISTTYPE(cfg) (cfg)->dtype
#define LCBVB_GET_SERVER(conf, ix) ((conf)->servers + ix)
Expand Down
1 change: 1 addition & 0 deletions src/internal.h
Expand Up @@ -115,6 +115,7 @@ struct lcb_st {

#define LCBT_VBCONFIG(instance) (instance)->cmdq.config
#define LCBT_NSERVERS(instance) (instance)->cmdq.npipelines
#define LCBT_NDATASERVERS(instance) LCBVB_NDATASERVERS(LCBT_VBCONFIG(instance))
#define LCBT_NREPLICAS(instance) LCBVB_NREPLICAS(LCBT_VBCONFIG(instance))
#define LCBT_GET_SERVER(instance, ix) (mc_SERVER *)(instance)->cmdq.pipelines[ix]
#define LCBT_SETTING(instance, name) (instance)->settings->name
Expand Down
53 changes: 29 additions & 24 deletions src/operations/durability.c
Expand Up @@ -258,47 +258,48 @@ static void poll_once(lcb_DURSET *dset)

#define DUR_MIN(a,b) (a) < (b) ? (a) : (b)

/**
* Ensure that the user-specified criteria is possible; i.e. we have enough
* servers and replicas. If the user requested capping, we do that here too.
*/
static int verify_critera(lcb_t instance, lcb_DURSET *dset)
LIBCOUCHBASE_API
lcb_error_t
lcb_durability_validate(lcb_t instance,
lcb_U16 *persist_to, lcb_U16 *replicate_to, int options)
{
lcb_DURABILITYOPTSv0 *opts = &dset->opts;

int replica_max = DUR_MIN(
LCBT_NREPLICAS(instance),
LCBT_NSERVERS(instance)-1
);

LCBT_NREPLICAS(instance),
LCBT_NDATASERVERS(instance)-1);
int persist_max = replica_max + 1;

if (*persist_to == 0 && *replicate_to == 0) {
/* Empty values! */
return LCB_EINVAL;
}

/* persist_max is always one more than replica_max */
if ((int)opts->persist_to > persist_max) {
if (opts->cap_max) {
opts->persist_to = persist_max;
if ((int)*persist_to > persist_max) {
if (options & LCB_DURABILITY_VALIDATE_CAPMAX) {
*persist_to = persist_max;
} else {
return -1;
return LCB_DURABILITY_ETOOMANY;
}
}

if (opts->replicate_to == 0) {
return 0;
if (*replicate_to == 0) {
return LCB_SUCCESS;
}

if (replica_max < 0) {
replica_max = 0;
}

/* now, we need at least as many nodes as we have replicas */
if ((int)opts->replicate_to > replica_max) {
if (opts->cap_max) {
opts->replicate_to = replica_max;
if ((int)*replicate_to > replica_max) {
if (options & LCB_DURABILITY_VALIDATE_CAPMAX) {
*replicate_to = replica_max;
} else {
return -1;
return LCB_DURABILITY_ETOOMANY;
}
}
return LCB_SUCCESS;

return 0;
}

#define CTX_FROM_MULTI(mcmd) (void *) ((((char *) (mcmd))) - offsetof(lcb_DURSET, mctx))
Expand Down Expand Up @@ -459,11 +460,15 @@ lcb_endure3_ctxnew(lcb_t instance, const lcb_durability_opts_t *options,
DSET_OPTFLD(dset, interval) = LCBT_SETTING(instance, durability_interval);
}

if (-1 == verify_critera(instance, dset)) {
*errp = lcb_durability_validate(instance,
&dset->opts.persist_to, &dset->opts.replicate_to,
dset->opts.cap_max ? LCB_DURABILITY_VALIDATE_CAPMAX : 0);

if (*errp != LCB_SUCCESS) {
free(dset);
*errp = LCB_DURABILITY_ETOOMANY;
return NULL;
}

dset->timer = io->timer.create(io->p);
lcb_string_init(&dset->kvbufs);
return &dset->mctx;
Expand Down
53 changes: 53 additions & 0 deletions tests/iotests/t_durability.cc
Expand Up @@ -894,3 +894,56 @@ TEST_F(DurabilityUnitTest, testExternalSynctoken)
// TODO: How to actually run this?
ASSERT_EQ(LCB_SUCCESS, dop.resp.rc);
}

TEST_F(DurabilityUnitTest, testOptionValidation)
{
HandleWrap hw;
lcb_t instance;
lcb_U16 persist = 0, replicate = 0;
int options;
lcb_error_t rc;

createConnection(hw, instance);

// Validate simple mode
persist = -1;
replicate = -1;
rc = lcb_durability_validate(instance, &persist, &replicate,
LCB_DURABILITY_VALIDATE_CAPMAX);

ASSERT_EQ(LCB_SUCCESS, rc);
ASSERT_TRUE(persist > replicate);

lcbvb_CONFIG *vbc;
rc = lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_VBCONFIG, &vbc);
ASSERT_EQ(LCB_SUCCESS, rc);

int replica_max = min(LCBVB_NREPLICAS(vbc), LCBVB_NDATASERVERS(vbc)-1);
int persist_max = replica_max + 1;

ASSERT_EQ(replica_max, replicate);
ASSERT_EQ(persist_max, persist);

persist = 0;
replicate = 0;
rc = lcb_durability_validate(instance, &persist, &replicate, 0);
ASSERT_EQ(LCB_EINVAL, rc);

persist = -1;
replicate = -1;
rc = lcb_durability_validate(instance, &persist, &replicate, 0);
ASSERT_EQ(LCB_DURABILITY_ETOOMANY, rc);

persist = persist_max;
replicate = replica_max;
rc = lcb_durability_validate(instance, &persist, &replicate, 0);
ASSERT_EQ(LCB_SUCCESS, rc);
ASSERT_EQ(persist_max, persist);
ASSERT_EQ(replica_max, replicate);

rc = lcb_durability_validate(instance, &persist, &replicate,
LCB_DURABILITY_VALIDATE_CAPMAX);
ASSERT_EQ(LCB_SUCCESS, rc);
ASSERT_EQ(persist_max, persist);
ASSERT_EQ(replica_max, replicate);
}

0 comments on commit ef543ac

Please sign in to comment.