Skip to content

Commit

Permalink
CCBC-1384: allow deferring get_replica operation
Browse files Browse the repository at this point in the history
Change-Id: I8ee161307849bced8ae3be8266b15bf84ac6e8e5
Reviewed-on: http://review.couchbase.org/c/libcouchbase/+/154323
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Brett Lawson <brett19@gmail.com>
  • Loading branch information
avsej committed May 28, 2021
1 parent 40ad81d commit ee9f024
Show file tree
Hide file tree
Showing 4 changed files with 585 additions and 483 deletions.
1 change: 1 addition & 0 deletions cmake/source_files.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ SET(LCB_CORE_CXXSRC
src/operations/durability.cc
src/operations/exists.cc
src/operations/get.cc
src/operations/get_replica.cc
src/operations/observe-seqno.cc
src/operations/observe.cc
src/operations/ping.cc
Expand Down
227 changes: 133 additions & 94 deletions src/capi/cmd_get_replica.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,141 @@

#include <cstddef>
#include <cstdint>
#include <chrono>

#include "key_value_error_context.hh"
/**
* @private
*/
enum class get_replica_mode {
any,
all,
select,
};

/**
* @private
*/
struct lcb_CMDGETREPLICA_ {
lcb_STATUS mode(get_replica_mode mode)
{
mode_ = mode;
return LCB_SUCCESS;
}

get_replica_mode mode() const
{
return mode_;
}

lcb_STATUS select_index(int replica_index)
{
mode_ = get_replica_mode::select;
select_index_ = replica_index;
return LCB_SUCCESS;
}

int selected_replica_index() const {
return select_index_;
}

lcb_STATUS key(std::string key)
{
key_ = std::move(key);
return LCB_SUCCESS;
}

lcb_STATUS collection(lcb::collection_qualifier collection)
{
collection_ = std::move(collection);
return LCB_SUCCESS;
}

lcb_STATUS parent_span(lcbtrace_SPAN *parent_span)
{
parent_span_ = parent_span;
return LCB_SUCCESS;
}

lcb_STATUS timeout_in_milliseconds(std::uint32_t timeout)
{
timeout_ = std::chrono::milliseconds(timeout);
return LCB_SUCCESS;
}

lcb_STATUS timeout_in_microseconds(std::uint32_t timeout)
{
timeout_ = std::chrono::microseconds(timeout);
return LCB_SUCCESS;
}

lcb_STATUS start_time_in_nanoseconds(std::uint64_t val)
{
start_time_ = std::chrono::nanoseconds(val);
return LCB_SUCCESS;
}

std::uint64_t start_time_or_default_in_nanoseconds(std::uint64_t default_val) const
{
if (start_time_ == std::chrono::nanoseconds::zero()) {
return default_val;
}
return start_time_.count();
}

const lcb::collection_qualifier &collection() const
{
return collection_;
}

lcb::collection_qualifier &collection()
{
return collection_;
}

const std::string &key() const
{
return key_;
}

std::uint64_t timeout_or_default_in_nanoseconds(std::uint64_t default_timeout) const
{
if (timeout_ > std::chrono::microseconds::zero()) {
return std::chrono::duration_cast<std::chrono::nanoseconds>(timeout_).count();
}
return default_timeout;
}

std::uint32_t timeout_in_microseconds() const
{
return static_cast<std::uint32_t>(timeout_.count());
}

lcbtrace_SPAN *parent_span() const
{
return parent_span_;
}

void cookie(void *cookie)
{
cookie_ = cookie;
}

void *cookie()
{
return cookie_;
}

private:
lcb::collection_qualifier collection_{};
std::chrono::microseconds timeout_{0};
std::chrono::nanoseconds start_time_{0};
lcbtrace_SPAN *parent_span_{nullptr};
void *cookie_{nullptr};
std::string key_{};
get_replica_mode mode_{get_replica_mode::any};
int select_index_{0};
};

struct lcb_RESPGETREPLICA_ {
lcb_KEY_VALUE_ERROR_CONTEXT ctx{};
Expand All @@ -39,98 +172,4 @@ struct lcb_RESPGETREPLICA_ {
std::uint32_t itmflags; /**< User-defined flags for the item */
};

/**@brief Select get-replica mode
* @see lcb_rget3_cmd_t */
enum lcb_replica_t {
/**Query all the replicas sequentially, retrieving the first successful
* response */
LCB_REPLICA_FIRST = 0x00,

/**Query all the replicas concurrently, retrieving all the responses*/
LCB_REPLICA_ALL = 0x01,

/**Query the specific replica specified by the
* lcb_rget3_cmd_t#index field */
LCB_REPLICA_SELECT = 0x02
};

/**
* @brief Command for requesting an item from a replica
* @note The `options.exptime` and `options.cas` fields are ignored for this
* command.
*
* This structure is similar to @ref lcb_RESPGET with the addition of an
* `index` and `strategy` field which allow you to control and select how
* many replicas are queried.
*
* @see lcb_rget3(), lcb_RESPGET
*/
struct lcb_CMDGETREPLICA_ {
/**Common flags for the command. These modify the command itself. Currently
the lower 16 bits of this field are reserved, and the higher 16 bits are
used for individual commands.*/
std::uint32_t cmdflags;

/**Specify the expiration time. This is either an absolute Unix time stamp
or a relative offset from now, in seconds. If the value of this number
is greater than the value of thirty days in seconds, then it is a Unix
timestamp.
This field is used in mutation operations (lcb_store3()) to indicate
the lifetime of the item. It is used in lcb_get3() with the lcb_CMDGET::lock
option to indicate the lock expiration itself. */
std::uint32_t exptime;

/**The known CAS of the item. This is passed to mutation to commands to
ensure the item is only changed if the server-side CAS value matches the
one specified here. For other operations (such as lcb_CMDENDURE) this
is used to ensure that the item has been persisted/replicated to a number
of servers with the value specified here. */
std::uint64_t cas;

/**< Collection ID */
std::uint32_t cid;
const char *scope;
size_t nscope;
const char *collection;
size_t ncollection;
/**The key for the document itself. This should be set via LCB_CMD_SET_KEY() */
lcb_KEYBUF key;

/** Operation timeout (in microseconds). When zero, the library will use default value. */
std::uint32_t timeout;
/** Parent tracing span */
lcbtrace_SPAN *pspan;

/**
* Strategy for selecting a replica. The default is ::LCB_REPLICA_FIRST
* which results in the client trying each replica in sequence until a
* successful reply is found, and returned in the callback.
*
* ::LCB_REPLICA_FIRST evaluates to 0.
*
* Other options include:
* <ul>
* <li>::LCB_REPLICA_ALL - queries all replicas concurrently and dispatches
* a callback for each reply</li>
* <li>::LCB_REPLICA_SELECT - queries a specific replica indicated in the
* #index field</li>
* </ul>
*
* @note When ::LCB_REPLICA_ALL is selected, the callback will be invoked
* multiple times, one for each replica. The final callback will have the
* ::LCB_RESP_F_FINAL bit set in the lcb_RESPBASE::rflags field. The final
* response will also contain the response from the last replica to
* respond.
*/
lcb_replica_t strategy;

/**
* Valid only when #strategy is ::LCB_REPLICA_SELECT, specifies the replica
* index number to query. This should be no more than `nreplicas-1`
* where `nreplicas` is the number of replicas the bucket is configured with.
*/
int index;
};

#endif // LIBCOUCHBASE_CAPI_GET_REPLICA_HH
Loading

0 comments on commit ee9f024

Please sign in to comment.