Skip to content

Commit

Permalink
CXXCBC-242: SDK Support for Native KV Range Scans (#419)
Browse files Browse the repository at this point in the history
* Added `prefix_scan` as top-level scan type
* Changed scan terms from byte vectors to strings
* Seed for sampling scan is randomly generated if not specified
* Removed `range_scan_cancelled` error code, returning `request_canceled` instead
* Removed sorting
* Added `concurrency` option to orchestrator to set the maximum allowed level of concurrency
* The constructor of `range_scan_orchestrator` takes the vbucket map instead of the number of vbuckets
* Implemented new concurrency approach for scanning vbuckets:
    * Streams start with the maximum level of concurrency. If a temporary failure occurs (i.e. receiving busy status from the server) the stream is retried a later point and the concurrency is reduced by 1
    * When a stream finishes successfully or with a benign error, another stream is initiated to take its place. If the temporary failure occurs, a new stream is not initiated which effectively reduces the concurrency (unless streams are not being executed concurrently in which case that continues)
    * The number of streams being run per node are kept track of and when a new stream should start, a vbucket on the least busy node is selected (initially random node is selected)
* Added `key_value_scan_timeout` to timeout defaults (7.5 seconds)
* Removed `batch_time_limit` from options, 90% of the timeout is used instead
* Timeouts are now on both range_scan_create and range_scan_continue. There's also a check before retrying a stream if the time since the first attempt exceeds the timeout
* The `next` methods of the scan result can return an error code in the case of a fatal error
* Added `cancel()` method to `scan_result` that can cancel all streams
* Errors on range scan continue or start are now separated into 'fatal' (which takes into account whether the scan is a sampling scan), 'retryable' or 'benign'

---------

Co-authored-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information
DemetrisChr and avsej committed Jun 28, 2023
1 parent 0227652 commit 9394a00
Show file tree
Hide file tree
Showing 14 changed files with 667 additions and 334 deletions.
73 changes: 51 additions & 22 deletions core/crud_component.cxx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2020-2021 Couchbase, Inc.
* Copyright 2020-2023 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,8 @@

#include <tl/expected.hpp>

#include <random>

namespace couchbase::core
{
static std::pair<std::vector<std::byte>, std::error_code>
Expand All @@ -53,17 +55,40 @@ serialize_range_scan_create_options(const range_scan_create_options& options)
body["collection"] = fmt::format("{:x}", options.collection_id);
}

if (std::holds_alternative<range_scan>(options.scan_type)) {
const auto& range = std::get<range_scan>(options.scan_type);
if (std::holds_alternative<range_scan>(options.scan_type) || std::holds_alternative<prefix_scan>(options.scan_type)) {
const auto& range = (std::holds_alternative<range_scan>(options.scan_type))
? std::get<range_scan>(options.scan_type)
: std::get<prefix_scan>(options.scan_type).to_range_scan();

const auto& from = range.from.value_or(scan_term{ "" });
const auto& to = range.to.value_or(scan_term{ "\xf4\x8f\xfb\xfb" });

body["range"] = {
{ range.start_.exclusive ? "excl_start" : "start", base64::encode(range.start_.id) },
{ range.end_.exclusive ? "excl_end" : "end", base64::encode(range.end_.id) },
{ from.exclusive ? "excl_start" : "start", base64::encode(from.term) },
{ to.exclusive ? "excl_end" : "end", base64::encode(to.term) },
};
} else if (std::holds_alternative<sampling_scan>(options.scan_type)) {
const auto& sampling = std::get<sampling_scan>(options.scan_type);

// The limit in sampling scan is required to be greater than 0
if (sampling.limit <= 0) {
return { {}, errc::common::invalid_argument };
}

std::uint64_t seed{};
if (sampling.seed.has_value()) {
seed = sampling.seed.value();
} else {
// Generate random uint64 as seed
std::random_device rd;
std::mt19937_64 gen(rd());
std::uniform_int_distribution<std::uint64_t> dis;
seed = dis(gen);
}

body["sampling"] = {
{ "samples", sampling.limit },
{ "seed", sampling.seed.value_or(0) },
{ "seed", seed },
};
} else {
return { {}, errc::common::invalid_argument };
Expand All @@ -75,7 +100,7 @@ serialize_range_scan_create_options(const range_scan_create_options& options)
{ "vb_uuid", std::to_string(snapshot.vbucket_uuid) },
{ "seqno", snapshot.sequence_number },
{ "timeout_ms",
(options.timeout == std::chrono::milliseconds::zero()) ? timeout_defaults::range_scan_timeout.count()
(options.timeout == std::chrono::milliseconds::zero()) ? timeout_defaults::key_value_scan_timeout.count()
: options.timeout.count() },
};
if (snapshot.sequence_number_exists) {
Expand All @@ -98,7 +123,7 @@ parse_range_scan_keys(gsl::span<std::byte> data, range_scan_item_callback&& item
if (remaining.size() < key_length) {
return errc::network::protocol_error;
}
item_callback(range_scan_item{ { remaining.begin(), remaining.begin() + static_cast<std::ptrdiff_t>(key_length) } });
item_callback(range_scan_item{ { reinterpret_cast<const char*>(remaining.data()), key_length } });
if (remaining.size() == key_length) {
return {};
}
Expand Down Expand Up @@ -130,13 +155,13 @@ parse_range_scan_documents(gsl::span<std::byte> data, range_scan_item_callback&&
body.datatype = data[24];
data = gsl::make_span(data.data() + header_offset, data.size() - header_offset);

std::vector<std::byte> key{};
std::string key{};
{
auto [key_length, remaining] = utils::decode_unsigned_leb128<std::size_t>(data, core::utils::leb_128_no_throw{});
if (remaining.size() < key_length) {
return errc::network::protocol_error;
}
key = { remaining.begin(), remaining.begin() + static_cast<std::ptrdiff_t>(key_length) };
key = { reinterpret_cast<const char*>(remaining.data()), key_length };
data = gsl::make_span(remaining.data() + key_length, remaining.size() - key_length);
}

Expand Down Expand Up @@ -242,19 +267,10 @@ class crud_component_impl
if (error) {
return cb({}, error);
}
bool ids_only;
switch (response->extras_.size()) {
case 4:
ids_only = mcbp::big_endian::read_uint32(response->extras_, 0) == 0;
break;

case 0:
ids_only = options.ids_only; // support servers before MB-54267. TODO: remove after server GA
break;

default:
return cb({}, errc::network::protocol_error);
if (response->extras_.size() != 4) {
return cb({}, errc::network::protocol_error);
}
bool ids_only = mcbp::big_endian::read_uint32(response->extras_, 0) == 0;

if (auto ec = parse_range_scan_data(response->value_, std::move(item_cb), ids_only); ec) {
return cb({}, ec);
Expand All @@ -276,6 +292,19 @@ class crud_component_impl

req->persistent_ = true;
req->vbucket_ = vbucket_id;

if (options.timeout != std::chrono::milliseconds::zero()) {
auto timer = std::make_shared<asio::steady_timer>(io_);
timer->expires_after(options.timeout);
timer->async_wait([req](auto error) {
if (error == asio::error::operation_aborted) {
return;
}
req->cancel(couchbase::errc::common::unambiguous_timeout);
});
req->set_deadline(timer);
}

mcbp::buffer_writer buf{ scan_uuid.size() + sizeof(std::uint32_t) * 3 };
buf.write(scan_uuid);
buf.write_uint32(options.batch_item_limit);
Expand Down
6 changes: 2 additions & 4 deletions core/impl/key_value_error_category.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ struct key_value_error_category : std::error_category {
return "cannot_revive_living_document (131)";
case errc::key_value::xattr_no_access:
return "xattr_no_access (130)";
case errc::key_value::range_scan_cancelled:
return "range_scan_cancelled (132)";
case errc::key_value::range_scan_vb_uuid_not_equal:
return "range_scan_vb_uuid_not_equal (133)";
case errc::key_value::mutation_token_outdated:
return "mutation_token_outdated (133)";
case errc::key_value::range_scan_completed:
return "range_scan_completed (134)";
}
Expand Down
4 changes: 2 additions & 2 deletions core/protocol/status.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ map_status_code(protocol::client_opcode opcode, std::uint16_t status)
case key_value_status_code::opaque_no_match:

case key_value_status_code::range_scan_cancelled:
return errc::key_value::range_scan_cancelled;
return errc::common::request_canceled;

case key_value_status_code::range_scan_vb_uuid_not_equal:
return errc::key_value::range_scan_vb_uuid_not_equal;
return errc::key_value::mutation_token_outdated;

case key_value_status_code::unknown:
break;
Expand Down
30 changes: 3 additions & 27 deletions core/range_scan_options.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,10 @@

namespace couchbase::core
{
range_scan::range_scan(scan_term start, scan_term end)
: start_{ std::move(start) }
, end_{ std::move(end) }
{
}

range_scan::range_scan(std::string_view start, std::string_view end)
: start_{ utils::to_binary(start) }
, end_{ utils::to_binary(end) }
{
}

range_scan::range_scan(std::string_view start, bool exclusive_start, std::string_view end, bool exclusive_end)
: start_{ utils::to_binary(start), exclusive_start }
, end_{ utils::to_binary(end), exclusive_end }
{
}

range_scan::range_scan(std::vector<std::byte> start, std::vector<std::byte> end)
: start_{ std::move(start) }
, end_{ std::move(end) }
{
}

range_scan::range_scan(std::vector<std::byte> start, bool exclusive_start, std::vector<std::byte> end, bool exclusive_end)
: start_{ std::move(start), exclusive_start }
, end_{ std::move(end), exclusive_end }
auto
prefix_scan::to_range_scan() const -> range_scan
{
return { scan_term{ prefix, false }, scan_term{ prefix + "\xf4\x8f\xfb\xfb" } };
}

auto
Expand Down
30 changes: 13 additions & 17 deletions core/range_scan_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,24 @@
namespace couchbase::core
{
struct scan_term {
static constexpr std::byte minimum_marker{ 0x00 };
static constexpr std::byte maximum_marker{ 0xff };

std::vector<std::byte> id;
std::string term;
bool exclusive{ false };
};

struct range_scan {
range_scan() = default;
range_scan(scan_term start, scan_term end);
range_scan(std::string_view start, std::string_view end);
range_scan(std::string_view start, bool exclusive_start, std::string_view end, bool exclusive_end);
range_scan(std::vector<std::byte> start, std::vector<std::byte> end);
range_scan(std::vector<std::byte> start, bool exclusive_start, std::vector<std::byte> end, bool exclusive_end);

scan_term start_{ { scan_term::minimum_marker } };
scan_term end_{ { scan_term::maximum_marker } };
std::optional<scan_term> from{};
std::optional<scan_term> to{};
};

struct prefix_scan {
std::string prefix{};

[[nodiscard]] auto to_range_scan() const -> range_scan;
};

struct sampling_scan {
std::size_t limit{};
std::optional<std::uint32_t> seed{};
std::optional<std::uint64_t> seed{};
};

struct range_snapshot_requirements {
Expand All @@ -62,7 +58,7 @@ struct range_snapshot_requirements {
struct range_scan_create_options {
std::string scope_name{};
std::string collection_name{};
std::variant<std::monostate, range_scan, sampling_scan> scan_type{};
std::variant<std::monostate, range_scan, prefix_scan, sampling_scan> scan_type{};
std::chrono::milliseconds timeout{};
std::uint32_t collection_id{ 0 };
std::optional<range_snapshot_requirements> snapshot_requirements{};
Expand All @@ -88,10 +84,10 @@ struct range_scan_continue_options {

std::uint32_t batch_item_limit{ default_batch_item_limit };
std::uint32_t batch_byte_limit{ default_batch_byte_limit };
std::chrono::milliseconds timeout{};
std::chrono::milliseconds batch_time_limit{ default_batch_time_limit };
std::shared_ptr<couchbase::retry_strategy> retry_strategy{ nullptr };

bool ids_only{ false }; // support servers before MB-54267. TODO: remove after server GA
struct {
std::string user{};
} internal{};
Expand Down Expand Up @@ -126,7 +122,7 @@ struct range_scan_item_body {
};

struct range_scan_item {
std::vector<std::byte> key{};
std::string key{};
std::optional<range_scan_item_body> body{};
};

Expand Down

0 comments on commit 9394a00

Please sign in to comment.