Skip to content

Commit

Permalink
CCBC-616: Allow storedur operation (Storage+Endure)
Browse files Browse the repository at this point in the history
This allows storage and durability operations to be wrapped into a
single command (from an application perspective). This new functionality
is available in the form of lcb_storedure3 (with its own command,
scheduler, and response callback).

Change-Id: I74869bd79292f419b4d88e9b2b43b77ea973b899
Reviewed-on: http://review.couchbase.org/51798
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Subhashni Balakrishnan <b.subhashni@gmail.com>
  • Loading branch information
mnunberg authored and Mark Nunberg committed Jun 15, 2015
1 parent ef543ac commit d764c26
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 87 deletions.
49 changes: 49 additions & 0 deletions include/libcouchbase/api3.h
Expand Up @@ -237,6 +237,7 @@ typedef enum {
LCB_CALLBACK_HTTP, /**< lcb_http3() */
LCB_CALLBACK_CBFLUSH, /**< lcb_cbflush3() */
LCB_CALLBACK_OBSEQNO, /**< For lcb_observe_synctoken3() */
LCB_CALLBACK_STOREDUR, /** <for lcb_storedur3() */
LCB_CALLBACK__MAX /* Number of callbacks */
} lcb_CALLBACKTYPE;

Expand Down Expand Up @@ -676,6 +677,7 @@ typedef struct {
LIBCOUCHBASE_API
lcb_error_t
lcb_store3(lcb_t instance, const void *cookie, const lcb_CMDSTORE *cmd);

/**@}*/

/**@name Removing Items
Expand Down Expand Up @@ -1080,6 +1082,53 @@ LIBCOUCHBASE_API
lcb_MULTICMD_CTX *
lcb_endure3_ctxnew(lcb_t instance,
const lcb_durability_opts_t *options, lcb_error_t *err);

/**
* Command structure for lcb_storedur3()
* This is much like @ref lcb_CMDSTORE, but also includes durability options.
*/
typedef struct {
LCB_CMD_BASE;
lcb_VALBUF value;
lcb_U32 flags;
lcb_datatype_t datatype;
lcb_storage_t operation;

/**Number of nodes to persist to. If negative, will be capped at the maximum
* allowable for the current cluster*/
char persist_to;

/**Number of nodes to replicate to. If negative, will be capped at the maximum
* allowable for the current cluster */
char replicate_to;
} lcb_CMDSTOREDUR;

/**
* Response structure for `LCB_CALLBACK_STOREDUR.
*/
typedef struct {
LCB_RESP_BASE
/** Internal durability response structure. This should never be NULL */
const lcb_RESPENDURE *dur_resp;

/**If the ::rc field is not @ref LCB_SUCCESS, this field indicates
* what failed. If this field is nonzero, then the store operation failed,
* but the durability checking failed. If this field is zero then the
* actual storage operation failed. */
int store_ok;
} lcb_RESPSTOREDUR;

/**
* @volatile
*
* Schedule a storage operation with subsequent durability checking.
*
* This is a compound of a logical lcb_store3() followed by an
* lcb_endure3_ctxnew() upon success.
*/
LIBCOUCHBASE_API
lcb_error_t
lcb_storedur3(lcb_t instance, const void *cookie, const lcb_CMDSTOREDUR *cmd);
/**@}*/

/**@name Check the memcached server versions
Expand Down
6 changes: 5 additions & 1 deletion src/handler.c
Expand Up @@ -431,7 +431,11 @@ H_store(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
}
handle_synctoken(root, response, request, &resp.synctoken);
TRACE_STORE_END(response, &resp);
INVOKE_CALLBACK3(request, &resp, root, LCB_CALLBACK_STORE);
if (request->flags & MCREQ_F_REQEXT) {
request->u_rdata.exdata->procs->handler(pipeline, request, immerr, &resp);
} else {
INVOKE_CALLBACK3(request, &resp, root, LCB_CALLBACK_STORE);
}
}

static void
Expand Down
32 changes: 27 additions & 5 deletions src/operations/durability.c
Expand Up @@ -196,6 +196,8 @@ lcbdur_ent_getinfo(lcb_DURITEM *item, int srvix)
void lcbdur_ent_finish(lcb_DURITEM *ent)
{
lcb_RESPCALLBACK callback;
lcb_t instance;

if (ent->done) {
return;
}
Expand All @@ -205,10 +207,24 @@ void lcbdur_ent_finish(lcb_DURITEM *ent)

/** Invoke the callback now :) */
ent->result.cookie = (void *)ent->parent->cookie;

callback = lcb_find_callback(ent->parent->instance, LCB_CALLBACK_ENDURE);
callback(ent->parent->instance, LCB_CALLBACK_ENDURE,
(lcb_RESPBASE*)&ent->result);
instance = ent->parent->instance;

if (ent->parent->is_durstore) {
lcb_RESPSTOREDUR resp = { 0 };
resp.key = ent->result.key;
resp.nkey = ent->result.nkey;
resp.rc = ent->result.rc;
resp.cas = ent->reqcas;
resp.cookie = ent->result.cookie;
resp.store_ok = 1;
resp.dur_resp = &ent->result;

callback = lcb_find_callback(instance, LCB_CALLBACK_STOREDUR);
callback(instance, LCB_CALLBACK_STOREDUR, (lcb_RESPBASE*)&resp);
} else {
callback = lcb_find_callback(instance, LCB_CALLBACK_ENDURE);
callback(instance, LCB_CALLBACK_ENDURE, (lcb_RESPBASE*)&ent->result);
}

if (ent->parent->nremaining == 0) {
lcbdur_unref(ent->parent);
Expand All @@ -230,7 +246,6 @@ void lcbdur_reqs_done(lcb_DURSET *dset)
lcbdur_unref(dset);
}


/**
* Schedules a single sweep of observe requests.
* The `initial` parameter determines if this is a retry or if this is the
Expand Down Expand Up @@ -383,6 +398,13 @@ dset_ctx_fail(lcb_MULTICMD_CTX *mctx)
lcbdur_destroy(dset);
}

void lcbdurctx_set_durstore(lcb_MULTICMD_CTX *mctx, int enabled)
{

lcb_DURSET *dset = CTX_FROM_MULTI(mctx);
dset->is_durstore = enabled;
}

static lcb_U8
get_poll_meth(lcb_t instance, const lcb_DURABILITYOPTSv0 *options)
{
Expand Down
5 changes: 5 additions & 0 deletions src/operations/durability_internal.h
Expand Up @@ -85,6 +85,7 @@ typedef struct lcb_DURSET_st {
unsigned refcnt; /**< Reference count */
unsigned next_state; /**< Internal state */
lcb_error_t lasterr;
int is_durstore; /** Whether the callback should be DURSTORE */
lcb_string kvbufs; /**< Backing storage for key buffers */
const void *cookie; /**< User cookie */
hrtime_t ns_timeout; /**< Timestamp of next timeout */
Expand All @@ -107,6 +108,10 @@ void
lcbdur_update_seqno(lcb_t instance, lcb_DURSET *dset,
const lcb_RESPOBSEQNO *resp);

/** Indicate that this durability command context is for an original storage op */
void
lcbdurctx_set_durstore(lcb_MULTICMD_CTX *ctx, int enabled);

lcb_MULTICMD_CTX *
lcb_observe_ctx_dur_new(lcb_t instance);

Expand Down

0 comments on commit d764c26

Please sign in to comment.