From d764c26bd0a36a09c9464df9fd02294b622e4966 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 8 Jun 2015 10:08:00 -0700 Subject: [PATCH] CCBC-616: Allow storedur operation (Storage+Endure) This allows storage and durability operations to be wrapped into a single command (from an application perspective). This new functionality is available in the form of lcb_storedure3 (with its own command, scheduler, and response callback). Change-Id: I74869bd79292f419b4d88e9b2b43b77ea973b899 Reviewed-on: http://review.couchbase.org/51798 Tested-by: buildbot Reviewed-by: Dave Rigby Reviewed-by: Subhashni Balakrishnan --- include/libcouchbase/api3.h | 49 +++++++ src/handler.c | 6 +- src/operations/durability.c | 32 ++++- src/operations/durability_internal.h | 5 + src/operations/store.c | 186 ++++++++++++++++++++++++--- tests/iotests/t_durability.cc | 110 ++++++++++++++++ tools/cbc.cc | 101 ++++++--------- 7 files changed, 402 insertions(+), 87 deletions(-) diff --git a/include/libcouchbase/api3.h b/include/libcouchbase/api3.h index f863efd53..8bbf6e957 100644 --- a/include/libcouchbase/api3.h +++ b/include/libcouchbase/api3.h @@ -237,6 +237,7 @@ typedef enum { LCB_CALLBACK_HTTP, /**< lcb_http3() */ LCB_CALLBACK_CBFLUSH, /**< lcb_cbflush3() */ LCB_CALLBACK_OBSEQNO, /**< For lcb_observe_synctoken3() */ + LCB_CALLBACK_STOREDUR, /** flags & MCREQ_F_REQEXT) { + request->u_rdata.exdata->procs->handler(pipeline, request, immerr, &resp); + } else { + INVOKE_CALLBACK3(request, &resp, root, LCB_CALLBACK_STORE); + } } static void diff --git a/src/operations/durability.c b/src/operations/durability.c index 79a575c9d..fb8898fd9 100644 --- a/src/operations/durability.c +++ b/src/operations/durability.c @@ -196,6 +196,8 @@ lcbdur_ent_getinfo(lcb_DURITEM *item, int srvix) void lcbdur_ent_finish(lcb_DURITEM *ent) { lcb_RESPCALLBACK callback; + lcb_t instance; + if (ent->done) { return; } @@ -205,10 +207,24 @@ void lcbdur_ent_finish(lcb_DURITEM *ent) /** Invoke the callback now :) */ ent->result.cookie = (void *)ent->parent->cookie; - - callback = lcb_find_callback(ent->parent->instance, LCB_CALLBACK_ENDURE); - callback(ent->parent->instance, LCB_CALLBACK_ENDURE, - (lcb_RESPBASE*)&ent->result); + instance = ent->parent->instance; + + if (ent->parent->is_durstore) { + lcb_RESPSTOREDUR resp = { 0 }; + resp.key = ent->result.key; + resp.nkey = ent->result.nkey; + resp.rc = ent->result.rc; + resp.cas = ent->reqcas; + resp.cookie = ent->result.cookie; + resp.store_ok = 1; + resp.dur_resp = &ent->result; + + callback = lcb_find_callback(instance, LCB_CALLBACK_STOREDUR); + callback(instance, LCB_CALLBACK_STOREDUR, (lcb_RESPBASE*)&resp); + } else { + callback = lcb_find_callback(instance, LCB_CALLBACK_ENDURE); + callback(instance, LCB_CALLBACK_ENDURE, (lcb_RESPBASE*)&ent->result); + } if (ent->parent->nremaining == 0) { lcbdur_unref(ent->parent); @@ -230,7 +246,6 @@ void lcbdur_reqs_done(lcb_DURSET *dset) lcbdur_unref(dset); } - /** * Schedules a single sweep of observe requests. * The `initial` parameter determines if this is a retry or if this is the @@ -383,6 +398,13 @@ dset_ctx_fail(lcb_MULTICMD_CTX *mctx) lcbdur_destroy(dset); } +void lcbdurctx_set_durstore(lcb_MULTICMD_CTX *mctx, int enabled) +{ + + lcb_DURSET *dset = CTX_FROM_MULTI(mctx); + dset->is_durstore = enabled; +} + static lcb_U8 get_poll_meth(lcb_t instance, const lcb_DURABILITYOPTSv0 *options) { diff --git a/src/operations/durability_internal.h b/src/operations/durability_internal.h index 3f2f57fc0..16a4834c5 100644 --- a/src/operations/durability_internal.h +++ b/src/operations/durability_internal.h @@ -85,6 +85,7 @@ typedef struct lcb_DURSET_st { unsigned refcnt; /**< Reference count */ unsigned next_state; /**< Internal state */ lcb_error_t lasterr; + int is_durstore; /** Whether the callback should be DURSTORE */ lcb_string kvbufs; /**< Backing storage for key buffers */ const void *cookie; /**< User cookie */ hrtime_t ns_timeout; /**< Timestamp of next timeout */ @@ -107,6 +108,10 @@ void lcbdur_update_seqno(lcb_t instance, lcb_DURSET *dset, const lcb_RESPOBSEQNO *resp); +/** Indicate that this durability command context is for an original storage op */ +void +lcbdurctx_set_durstore(lcb_MULTICMD_CTX *ctx, int enabled); + lcb_MULTICMD_CTX * lcb_observe_ctx_dur_new(lcb_t instance); diff --git a/src/operations/store.c b/src/operations/store.c index 65d5d6be1..10c5ee62c 100644 --- a/src/operations/store.c +++ b/src/operations/store.c @@ -17,6 +17,96 @@ #include "internal.h" #include "mc/compress.h" #include "trace.h" +#include "durability_internal.h" + +typedef struct { + mc_REQDATAEX base; + lcb_t instance; + lcb_U16 persist_to; + lcb_U16 replicate_to; +} DURSTORECTX; + +/** Observe stuff */ +static void +handle_dur_storecb(mc_PIPELINE *pl, mc_PACKET *pkt, + lcb_error_t err, const void *arg) +{ + lcb_RESPCALLBACK cb; + lcb_RESPSTOREDUR resp = { 0 }; + lcb_CMDENDURE dcmd = { 0 }; + DURSTORECTX *dctx = (DURSTORECTX *)pkt->u_rdata.exdata; + lcb_MULTICMD_CTX *mctx; + lcb_durability_opts_t opts = { 0 }; + const lcb_RESPSTORE *sresp = (const lcb_RESPSTORE *)arg; + + if (err != LCB_SUCCESS) { + goto GT_BAIL; + } + if (sresp->rc != LCB_SUCCESS) { + err = sresp->rc; + goto GT_BAIL; + } + + resp.store_ok = 1; + LCB_CMD_SET_KEY(&dcmd, sresp->key, sresp->nkey); + dcmd.cas = sresp->cas; + + if (LCB_SYNCTOKEN_ISVALID(&sresp->synctoken)) { + dcmd.synctoken = &sresp->synctoken; + } + + /* Set the options.. */ + opts.v.v0.persist_to = dctx->persist_to; + opts.v.v0.replicate_to = dctx->replicate_to; + + mctx = lcb_endure3_ctxnew(dctx->instance, &opts, &err); + if (mctx == NULL) { + goto GT_BAIL; + } + + lcbdurctx_set_durstore(mctx, 1); + err = mctx->addcmd(mctx, (lcb_CMDBASE*)&dcmd); + if (err != LCB_SUCCESS) { + mctx->fail(mctx); + goto GT_BAIL; + } + lcb_sched_enter(dctx->instance); + err = mctx->done(mctx, sresp->cookie); + lcb_sched_leave(dctx->instance); + + if (err == LCB_SUCCESS) { + /* Everything OK? */ + free(dctx); + return; + } + + GT_BAIL: + { + lcb_RESPENDURE dresp = { 0 }; + resp.key = sresp->key; + resp.nkey = sresp->nkey; + resp.cookie = sresp->cookie; + resp.rc = err; + resp.dur_resp = &dresp; + cb = lcb_find_callback(dctx->instance, LCB_CALLBACK_STOREDUR); + cb(dctx->instance, LCB_CALLBACK_STOREDUR, (const lcb_RESPBASE*)&resp); + free(dctx); + } + + (void)pl; +} + +static void +handle_dur_schedfail(mc_PACKET *pkt) +{ + DURSTORECTX *dctx = (void *)pkt->u_rdata.exdata; + free(dctx); +} + +mc_REQDATAPROCS storedur_procs = { + handle_dur_storecb, + handle_dur_schedfail +}; static lcb_size_t get_value_size(mc_PACKET *packet) @@ -56,7 +146,7 @@ get_esize_and_opcode( static int can_compress(lcb_t instance, const mc_PIPELINE *pipeline, - const lcb_CMDSTORE *cmd) + const lcb_VALBUF *vbuf, lcb_datatype_t datatype) { mc_SERVER *server = (mc_SERVER *)pipeline; int compressopts = LCBT_SETTING(instance, compressopts); @@ -65,7 +155,7 @@ can_compress(lcb_t instance, const mc_PIPELINE *pipeline, return 0; } - if (cmd->value.vtype != LCB_KV_COPY) { + if (vbuf->vtype != LCB_KV_COPY) { return 0; } if ((compressopts & LCB_COMPRESS_OUT) == 0) { @@ -74,15 +164,15 @@ can_compress(lcb_t instance, const mc_PIPELINE *pipeline, if (server->compsupport == 0 && (compressopts & LCB_COMPRESS_FORCE) == 0) { return 0; } - if (cmd->datatype & LCB_VALUE_F_SNAPPYCOMP) { + if (datatype & LCB_VALUE_F_SNAPPYCOMP) { return 0; } return 1; } -LIBCOUCHBASE_API -lcb_error_t -lcb_store3(lcb_t instance, const void *cookie, const lcb_CMDSTORE *cmd) +static lcb_error_t +do_store3(lcb_t instance, const void *cookie, + const lcb_CMDBASE *cmd, int is_durstore) { mc_PIPELINE *pipeline; mc_PACKET *packet; @@ -92,23 +182,42 @@ lcb_store3(lcb_t instance, const void *cookie, const lcb_CMDSTORE *cmd) int should_compress = 0; lcb_error_t err; + lcb_storage_t operation; + lcb_U32 flags; + const lcb_VALBUF *vbuf; + lcb_datatype_t datatype; + protocol_binary_request_set scmd; protocol_binary_request_header *hdr = &scmd.message.header; + if (!is_durstore) { + const lcb_CMDSTORE *simple_cmd = (const lcb_CMDSTORE *)cmd; + operation = simple_cmd->operation; + flags = simple_cmd->flags; + vbuf = &simple_cmd->value; + datatype = simple_cmd->datatype; + } else { + const lcb_CMDSTOREDUR *durcmd = (const lcb_CMDSTOREDUR *)cmd; + operation = durcmd->operation; + flags = durcmd->flags; + vbuf = &durcmd->value; + datatype = durcmd->datatype; + } + if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) { return LCB_EMPTY_KEY; } err = get_esize_and_opcode( - cmd->operation, &hdr->request.opcode, &hdr->request.extlen); + operation, &hdr->request.opcode, &hdr->request.extlen); if (err != LCB_SUCCESS) { return err; } - switch (cmd->operation) { + switch (operation) { case LCB_APPEND: case LCB_PREPEND: - if (cmd->exptime || cmd->flags) { + if (cmd->exptime || flags) { return LCB_OPTIONS_CONFLICT; } break; @@ -130,31 +239,60 @@ lcb_store3(lcb_t instance, const void *cookie, const lcb_CMDSTORE *cmd) return err; } - should_compress = can_compress(instance, pipeline, cmd); + should_compress = can_compress(instance, pipeline, vbuf, datatype); if (should_compress) { - int rv = mcreq_compress_value(pipeline, packet, &cmd->value.u_buf.contig); + int rv = mcreq_compress_value(pipeline, packet, &vbuf->u_buf.contig); if (rv != 0) { mcreq_release_packet(pipeline, packet); return LCB_CLIENT_ENOMEM; } } else { - mcreq_reserve_value(pipeline, packet, &cmd->value); + mcreq_reserve_value(pipeline, packet, vbuf); + } + + if (is_durstore) { + int duropts = 0; + lcb_U16 persist_u , replicate_u; + const lcb_CMDSTOREDUR *dcmd = (const lcb_CMDSTOREDUR *)cmd; + DURSTORECTX *dctx = calloc(1, sizeof(*dctx)); + + persist_u = dcmd->persist_to; + replicate_u = dcmd->replicate_to; + if (dcmd->replicate_to == -1 || dcmd->persist_to == -1) { + duropts = LCB_DURABILITY_VALIDATE_CAPMAX; + } + + err = lcb_durability_validate(instance, &persist_u, &replicate_u, duropts); + if (err != LCB_SUCCESS) { + mcreq_wipe_packet(pipeline, packet); + mcreq_release_packet(pipeline, packet); + return err; + } + + dctx->instance = instance; + dctx->persist_to = persist_u; + dctx->replicate_to = replicate_u; + packet->u_rdata.exdata = &dctx->base; + packet->flags |= MCREQ_F_REQEXT; + + dctx->base.cookie = cookie; + dctx->base.procs = &storedur_procs; } - rdata = &packet->u_rdata.reqdata; + rdata = MCREQ_PKT_RDATA(packet); rdata->cookie = cookie; rdata->start = gethrtime(); scmd.message.body.expiration = htonl(cmd->exptime); - scmd.message.body.flags = htonl(cmd->flags); + scmd.message.body.flags = htonl(flags); hdr->request.magic = PROTOCOL_BINARY_REQ; hdr->request.cas = cmd->cas; hdr->request.datatype = PROTOCOL_BINARY_RAW_BYTES; - if (should_compress || (cmd->datatype & LCB_VALUE_F_SNAPPYCOMP)) { + if (should_compress || (datatype & LCB_VALUE_F_SNAPPYCOMP)) { hdr->request.datatype |= PROTOCOL_BINARY_DATATYPE_COMPRESSED; } - if (cmd->datatype & LCB_VALUE_F_JSON) { + if (datatype & LCB_VALUE_F_JSON) { hdr->request.datatype |= PROTOCOL_BINARY_DATATYPE_JSON; } @@ -165,10 +303,24 @@ lcb_store3(lcb_t instance, const void *cookie, const lcb_CMDSTORE *cmd) memcpy(SPAN_BUFFER(&packet->kh_span), scmd.bytes, hsize); mcreq_sched_add(pipeline, packet); - TRACE_STORE_BEGIN(hdr, cmd); + TRACE_STORE_BEGIN(hdr, (lcb_CMDSTORE* )cmd); return LCB_SUCCESS; } +LIBCOUCHBASE_API +lcb_error_t +lcb_store3(lcb_t instance, const void *cookie, const lcb_CMDSTORE *cmd) +{ + return do_store3(instance, cookie, (const lcb_CMDBASE*)cmd, 0); +} + +LIBCOUCHBASE_API +lcb_error_t +lcb_storedur3(lcb_t instance, const void *cookie, const lcb_CMDSTOREDUR *cmd) +{ + return do_store3(instance, cookie, (const lcb_CMDBASE*)cmd, 1); +} + LIBCOUCHBASE_API lcb_error_t lcb_store(lcb_t instance, const void *cookie, lcb_size_t num, diff --git a/tests/iotests/t_durability.cc b/tests/iotests/t_durability.cc index ca0a56b5b..3958840ab 100644 --- a/tests/iotests/t_durability.cc +++ b/tests/iotests/t_durability.cc @@ -5,6 +5,8 @@ using namespace std; +#define LOGARGS(instance, lvl) \ + instance->settings, "tests-dur", LCB_LOG_##lvl, __FILE__, __LINE__ #define SECS_USECS(f) ((f) * 1000000) static bool supportsSynctokens(lcb_t instance) @@ -947,3 +949,111 @@ TEST_F(DurabilityUnitTest, testOptionValidation) ASSERT_EQ(persist_max, persist); ASSERT_EQ(replica_max, replicate); } + +extern "C" { +static void durstoreCallback(lcb_t, int, const lcb_RESPBASE *rb) +{ + const lcb_RESPSTOREDUR *resp = reinterpret_cast(rb); + lcb_RESPSTOREDUR *rout = reinterpret_cast(rb->cookie); + lcb_RESPENDURE *dur_resp = const_cast(rout->dur_resp); + + ASSERT_FALSE(resp->dur_resp == NULL); + + *rout = *resp; + *dur_resp = *resp->dur_resp; + rout->dur_resp = dur_resp; +} +} + +TEST_F(DurabilityUnitTest, testDurStore) +{ + HandleWrap hw; + lcb_t instance; + lcb_durability_opts_t options = { 0 }; + createConnection(hw, instance); + lcb_install_callback3(instance, LCB_CALLBACK_STOREDUR, durstoreCallback); + + std::string key("durStore"); + std::string value("value"); + + lcb_error_t rc; + lcb_RESPSTOREDUR resp = { 0 }; + lcb_RESPENDURE dur_resp = { 0 }; + + resp.dur_resp = &dur_resp; + + lcb_CMDSTOREDUR cmd = { 0 }; + LCB_CMD_SET_KEY(&cmd, key.c_str(), key.size()); + LCB_CMD_SET_VALUE(&cmd, value.c_str(), value.size()); + defaultOptions(instance, options); + cmd.operation = LCB_SET; + cmd.persist_to = options.v.v0.persist_to; + cmd.replicate_to = options.v.v0.replicate_to; + + lcb_sched_enter(instance); + resp.rc = LCB_ERROR; + rc = lcb_storedur3(instance, &resp, &cmd); + ASSERT_EQ(LCB_SUCCESS, rc); + lcb_sched_leave(instance); + lcb_wait(instance); + + ASSERT_EQ(LCB_SUCCESS, resp.rc); + ASSERT_NE(0, resp.store_ok); + ASSERT_TRUE(options.v.v0.persist_to <= resp.dur_resp->npersisted); + ASSERT_TRUE(options.v.v0.replicate_to <= resp.dur_resp->nreplicated); + + lcb_sched_enter(instance); + // Try with bad criteria.. + cmd.persist_to = 100; + cmd.replicate_to = 100; + rc = lcb_storedur3(instance, &resp, &cmd); + ASSERT_EQ(LCB_DURABILITY_ETOOMANY, rc); + + // Try with no persist/replicate options + cmd.persist_to = 0; + cmd.replicate_to = 0; + rc = lcb_storedur3(instance, &resp, &cmd); + ASSERT_EQ(LCB_EINVAL, rc); + lcb_sched_fail(instance); + + // CAP_MAX should be applied here + cmd.persist_to = -1; + cmd.replicate_to = -1; + lcb_sched_enter(instance); + rc = lcb_storedur3(instance, &resp, &cmd); + ASSERT_EQ(LCB_SUCCESS, rc); + lcb_sched_leave(instance); + lcb_wait(instance); + ASSERT_EQ(LCB_SUCCESS, resp.rc); + ASSERT_TRUE(options.v.v0.persist_to <= resp.dur_resp->npersisted); + ASSERT_TRUE(options.v.v0.replicate_to <= resp.dur_resp->nreplicated); + + // Use bad CAS. we should have a clear indicator that storage failed + cmd.cas = -1; + lcb_sched_enter(instance); + rc = lcb_storedur3(instance, &resp, &cmd); + ASSERT_EQ(LCB_SUCCESS, rc); + lcb_sched_leave(instance); + lcb_wait(instance); + ASSERT_EQ(LCB_KEY_EEXISTS, resp.rc); + ASSERT_EQ(0, resp.store_ok); + + // Make storage succeed, but let durability fail. + // TODO: Add Mock-specific command to disable persistence/replication + lcb_U32 ustmo = 1; // 1 microsecond + rc = lcb_cntl(instance, LCB_CNTL_SET, LCB_CNTL_DURABILITY_TIMEOUT, &ustmo); + ASSERT_EQ(LCB_SUCCESS, rc); + + // Reset CAS from previous command + cmd.cas = 0; + lcb_sched_enter(instance); + rc = lcb_storedur3(instance, &resp, &cmd); + ASSERT_EQ(LCB_SUCCESS, rc); + lcb_sched_leave(instance); + lcb_wait(instance); + if (resp.rc == LCB_ETIMEDOUT) { + ASSERT_NE(0, resp.store_ok); + } else { + lcb_log(LOGARGS(instance, WARN), "Test skipped because mock is too fast(!)"); + } +} diff --git a/tools/cbc.cc b/tools/cbc.cc index ce2e3ae72..2c9b96c28 100644 --- a/tools/cbc.cc +++ b/tools/cbc.cc @@ -28,9 +28,12 @@ string getRespKey(const lcb_RESPBASE* resp) } static void -printKeyError(string& key, lcb_error_t err) +printKeyError(string& key, lcb_error_t err, const char *additional = NULL) { fprintf(stderr, "%-20s %s (0x%x)\n", key.c_str(), lcb_strerror(NULL, err), err); + if (additional) { + fprintf(stderr, "%-20s%s\n", "", additional); + } } static void @@ -67,17 +70,34 @@ get_callback(lcb_t, lcb_CALLBACKTYPE, const lcb_RESPGET *resp) } static void -store_callback(lcb_t, lcb_CALLBACKTYPE cbtype, const lcb_RESPSTORE *resp) +store_callback(lcb_t, lcb_CALLBACKTYPE cbtype, const lcb_RESPBASE *resp) { string key = getRespKey((const lcb_RESPBASE*)resp); - if (resp->rc == LCB_SUCCESS) { - printKeyCasStatus(key, cbtype, (const lcb_RESPBASE *)resp, "Stored."); - if (resp->cookie != NULL) { - map& items = *(map*)resp->cookie; - items[key] = resp->cas; + + if (cbtype == LCB_CALLBACK_STOREDUR) { + // Storage with durability + const lcb_RESPSTOREDUR *dresp = + reinterpret_cast(resp); + char buf[4096]; + if (resp->rc == LCB_SUCCESS) { + sprintf(buf, "Stored. Persisted(%u). Replicated(%u)", + dresp->dur_resp->npersisted, dresp->dur_resp->nreplicated); + printKeyCasStatus(key, cbtype, resp, buf); + } else { + if (dresp->store_ok) { + sprintf(buf, "Store OK, but durability failed. Persisted(%u). Replicated(%u)", + dresp->dur_resp->npersisted, dresp->dur_resp->nreplicated); + } else { + sprintf(buf, "%s", "Store failed"); + } + printKeyError(key, resp->rc, buf); } } else { - printKeyError(key, resp->rc); + if (resp->rc == LCB_SUCCESS) { + printKeyCasStatus(key, cbtype, resp, "Stored."); + } else { + printKeyError(key, resp->rc); + } } } @@ -96,14 +116,6 @@ common_callback(lcb_t, int type, const lcb_RESPBASE *resp) case LCB_CALLBACK_REMOVE: printKeyCasStatus(key, type, resp, "Deleted."); break; - case LCB_CALLBACK_ENDURE: { - const lcb_RESPENDURE *er = (const lcb_RESPENDURE*)resp; - char s_tmp[4006]; - sprintf(s_tmp, "Persisted(%u). Replicated(%u).", - er->npersisted, er->nreplicated); - printKeyCasStatus(key, type, resp, s_tmp); - break; - } default: abort(); // didn't request it } @@ -405,46 +417,6 @@ GetHandler::run() lcb_wait(instance); } -static void -endureItems(lcb_t instance, const map items, - int persist_to, int replicate_to) -{ - lcb_install_callback3(instance, LCB_CALLBACK_ENDURE, common_callback); - lcb_durability_opts_t options = { 0 }; - if (persist_to < 0 || replicate_to < 0) { - options.v.v0.cap_max = 1; - } - options.v.v0.persist_to = persist_to; - options.v.v0.replicate_to = replicate_to; - lcb_error_t err; - - lcb_MULTICMD_CTX *mctx = lcb_endure3_ctxnew(instance, &options, &err); - if (mctx == NULL) { - throw err; - } - - map::const_iterator iter; - for (iter = items.begin(); iter != items.end(); ++iter) { - lcb_CMDENDURE cmd = { 0 }; - LCB_KREQ_SIMPLE(&cmd.key, iter->first.c_str(), iter->first.size()); - cmd.cas = iter->second; - err = mctx->addcmd(mctx, (lcb_CMDBASE *)&cmd); - if (err != LCB_SUCCESS) { - throw err; - } - } - - lcb_sched_enter(instance); - err = mctx->done(mctx, NULL); - if (err == LCB_SUCCESS) { - lcb_sched_leave(instance); - } else { - lcb_sched_fail(instance); - throw err; - } - lcb_wait(instance); -} - void SetHandler::addOptions() { @@ -465,8 +437,8 @@ void SetHandler::storeItem(const string& key, const char *value, size_t nvalue) { lcb_error_t err; - lcb_CMDSTORE cmd = { 0 }; - LCB_KREQ_SIMPLE(&cmd.key, key.c_str(), key.size()); + lcb_CMDSTOREDUR cmd = { 0 }; + LCB_CMD_SET_KEY(&cmd, key.c_str(), key.size()); cmd.value.vtype = LCB_KV_COPY; cmd.value.u_buf.contig.bytes = value; cmd.value.u_buf.contig.nbytes = nvalue; @@ -485,9 +457,12 @@ SetHandler::storeItem(const string& key, const char *value, size_t nvalue) } else { cmd.operation = LCB_SET; } - err = lcb_store3(instance, &items, &cmd); - if (err != LCB_SUCCESS) { - throw err; + if (o_persist.passed() || o_replicate.passed()) { + cmd.persist_to = o_persist.result(); + cmd.replicate_to = o_replicate.result(); + err = lcb_storedur3(instance, NULL, &cmd); + } else { + err = lcb_store3(instance, NULL, reinterpret_cast(&cmd)); } } @@ -508,6 +483,7 @@ SetHandler::run() { Handler::run(); lcb_install_callback3(instance, LCB_CALLBACK_STORE, (lcb_RESPCALLBACK)store_callback); + lcb_install_callback3(instance, LCB_CALLBACK_STOREDUR, (lcb_RESPCALLBACK)store_callback); const vector& keys = parser.getRestArgs(); lcb_sched_enter(instance); @@ -537,9 +513,6 @@ SetHandler::run() lcb_sched_leave(instance); lcb_wait(instance); - if (items.empty() == false && (o_persist.passed() || o_replicate.passed())) { - endureItems(instance, items, o_persist.result(), o_replicate.result()); - } } void