From 69989e5353416535f9f7c288bf0e8661d20ebd96 Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Tue, 11 Jul 2023 10:49:58 +0100 Subject: [PATCH 1/6] RCBC-391: SDK Support for Native KV Range Scans --- examples/range_scan.rb | 56 +++++ ext/couchbase | 2 +- ext/couchbase.cxx | 294 +++++++++++++++++++++- lib/couchbase/collection.rb | 14 ++ lib/couchbase/collection_options.rb | 81 ++++++ lib/couchbase/errors.rb | 5 + lib/couchbase/key_value_scan.rb | 117 +++++++++ lib/couchbase/options.rb | 71 ++++++ test/scan_test.rb | 377 ++++++++++++++++++++++++++++ test/test_helper.rb | 6 +- 10 files changed, 1018 insertions(+), 5 deletions(-) create mode 100644 examples/range_scan.rb create mode 100644 lib/couchbase/key_value_scan.rb create mode 100644 test/scan_test.rb diff --git a/examples/range_scan.rb b/examples/range_scan.rb new file mode 100644 index 00000000..c9f0dc6e --- /dev/null +++ b/examples/range_scan.rb @@ -0,0 +1,56 @@ +# Copyright 2023. Couchbase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'couchbase' + +include Couchbase # rubocop:disable Style/MixinUsage for brevity + +options = Cluster::ClusterOptions.new +options.authenticate("Administrator", "password") +cluster = Cluster.connect("couchbase://localhost", options) + +bucket = cluster.bucket("travel-sample") +collection = bucket.scope("inventory").collection("airline") + +options = Options::Scan.new(ids_only: false) + +puts "Range Scan (from 'airline_1' to 'airline_11')" + +scan_type = RangeScan.new( + from: ScanTerm.new("airline_1"), + to: ScanTerm.new("airline_11", exclusive: true) +) +res = collection.scan(scan_type, options) + +res.each do |item| + puts " (#{item.id}) #{item.content['icao']}: #{item.content['name']}" +end + +puts "\nPrefix Scan (with prefix 'airline_8')" + +scan_type = PrefixScan.new("airline_8") +res = collection.scan(scan_type, options) + +res.each do |item| + puts " (#{item.id}) #{item.content['icao']}: #{item.content['name']}" +end + +puts "\nSampling Scan (with limit 5)" + +scan_type = SamplingScan.new(5) +res = collection.scan(scan_type, options) + +res.each do |item| + puts " (#{item.id}) #{item.content['icao']}: #{item.content['name']}" +end diff --git a/ext/couchbase b/ext/couchbase index b8274271..a46a5e1e 160000 --- a/ext/couchbase +++ b/ext/couchbase @@ -1 +1 @@ -Subproject commit b82742710f6daaaf6e4902da9925dc5b5ba1cd5f +Subproject commit a46a5e1ecf49d2717b9f8fd8aa6ad28d83693d45 diff --git a/ext/couchbase.cxx b/ext/couchbase.cxx index 2c5b138d..68fb6373 100644 --- a/ext/couchbase.cxx +++ b/ext/couchbase.cxx @@ -33,6 +33,8 @@ #include #include + +#include #include #include #include @@ -45,6 +47,10 @@ #include #include +#include +#include +#include + #include #include #include @@ -435,7 +441,7 @@ cb_backend_close(cb_backend_data* backend) static void cb_Backend_mark(void* /* ptr */) { - /* no embeded ruby objects -- no mark */ + /* no embedded ruby objects -- no mark */ } static void @@ -495,6 +501,8 @@ cb_backend_to_cluster(VALUE self) return backend->cluster; } +static VALUE eCouchbaseError; + static VALUE eAmbiguousTimeout; static VALUE eAuthenticationFailure; static VALUE eBucketExists; @@ -534,6 +542,7 @@ static VALUE eInvalidArgument; static VALUE eJobQueueFull; static VALUE eLinkNotFound; static VALUE eLinkExists; +static VALUE eMutationTokenOutdated; static VALUE eNumberTooBig; static VALUE eParsingFailure; static VALUE ePathExists; @@ -586,7 +595,7 @@ static void init_exceptions(VALUE mCouchbase) { VALUE mError = rb_define_module_under(mCouchbase, "Error"); - VALUE eCouchbaseError = rb_define_class_under(mError, "CouchbaseError", rb_eStandardError); + eCouchbaseError = rb_define_class_under(mError, "CouchbaseError", rb_eStandardError); VALUE eTimeout = rb_define_class_under(mError, "Timeout", eCouchbaseError); @@ -629,6 +638,7 @@ init_exceptions(VALUE mCouchbase) eJobQueueFull = rb_define_class_under(mError, "JobQueueFull", eCouchbaseError); eLinkNotFound = rb_define_class_under(mError, "LinkNotFound", eCouchbaseError); eLinkExists = rb_define_class_under(mError, "LinkExists", eCouchbaseError); + eMutationTokenOutdated = rb_define_class_under(mError, "MutationTokenOutdated", eCouchbaseError); eNumberTooBig = rb_define_class_under(mError, "NumberTooBig", eCouchbaseError); eParsingFailure = rb_define_class_under(mError, "ParsingFailure", eCouchbaseError); ePathExists = rb_define_class_under(mError, "PathExists", eCouchbaseError); @@ -784,6 +794,9 @@ cb_map_error_code(std::error_code ec, const std::string& message, bool include_e case couchbase::errc::key_value::durable_write_re_commit_in_progress: return rb_exc_new_cstr(eDurableWriteReCommitInProgress, what.c_str()); + case couchbase::errc::key_value::mutation_token_outdated: + return rb_exc_new_cstr(eMutationTokenOutdated, what.c_str()); + case couchbase::errc::key_value::path_not_found: return rb_exc_new_cstr(ePathNotFound, what.c_str()); @@ -3661,6 +3674,280 @@ cb_Backend_document_mutate_in(VALUE self, VALUE bucket, VALUE scope, VALUE colle return Qnil; } +struct cb_core_scan_result_data { + std::unique_ptr scan_result{}; +}; + +static void +cb_CoreScanResult_mark(void* ptr) +{ + /* No embedded Ruby objects */ +} + +static void +cb_CoreScanResult_free(void* ptr) +{ + auto* data = static_cast(ptr); + if (data->scan_result != nullptr && !data->scan_result->is_cancelled()) { + data->scan_result->cancel(); + } + data->scan_result.reset(); + ruby_xfree(data); +} + +static const rb_data_type_t cb_core_scan_result_type { + .wrap_struct_name = "Couchbase/Backend/CoreScanResult", + .function = { + .dmark = cb_CoreScanResult_mark, + .dfree = cb_CoreScanResult_free, + }, + .data = nullptr, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY + .flags = RUBY_TYPED_FREE_IMMEDIATELY, +#endif +}; + +static VALUE +cb_CoreScanResult_allocate(VALUE klass) +{ + cb_core_scan_result_data* data = nullptr; + VALUE obj = TypedData_Make_Struct(klass, cb_core_scan_result_data, &cb_core_scan_result_type, data); + return obj; +} + +static VALUE +cb_CoreScanResult_is_cancelled(VALUE self) +{ + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(self, cb_core_scan_result_data, &cb_core_scan_result_type, data); + auto resp = data->scan_result->is_cancelled(); + if (resp) { + return Qtrue; + } else { + return Qfalse; + } +} + +static VALUE +cb_CoreScanResult_cancel(VALUE self) +{ + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(self, cb_core_scan_result_data, &cb_core_scan_result_type, data); + data->scan_result->cancel(); + return Qnil; +} + +static VALUE +cb_CoreScanResult_next_item(VALUE self) +{ + try { + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(self, cb_core_scan_result_data, &cb_core_scan_result_type, data); + auto barrier = std::make_shared>>(); + // auto resp = data->scan_result->next(); + auto f = barrier->get_future(); + data->scan_result->next([barrier](couchbase::core::range_scan_item item, std::error_code ec) { + if (ec) { + return barrier->set_value(tl::unexpected(ec)); + } else { + return barrier->set_value(item); + } + }); + auto resp = cb_wait_for_future(f); + if (!resp.has_value()) { + // If the error code is range_scan_completed return nil without raising an exception (nil signifies that there + // are no more items) + if (resp.error() != couchbase::errc::key_value::range_scan_completed) { + cb_throw_error_code(resp.error(), "unable to fetch next scan item"); + } + // Release ownership of scan_result unique pointer + return Qnil; + } + auto item = resp.value(); + VALUE res = rb_hash_new(); + rb_hash_aset(res, rb_id2sym(rb_intern("id")), cb_str_new(item.key)); + if (item.body.has_value()) { + auto body = item.body.value(); + rb_hash_aset(res, rb_id2sym(rb_intern("id")), cb_str_new(item.key)); + rb_hash_aset(res, rb_id2sym(rb_intern("encoded")), cb_str_new(body.value)); + rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(body.cas)); + rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(body.flags)); + rb_hash_aset(res, rb_id2sym(rb_intern("expiry")), UINT2NUM(body.expiry)); + rb_hash_aset(res, rb_id2sym(rb_intern("id_only")), Qfalse); + } else { + rb_hash_aset(res, rb_id2sym(rb_intern("id_only")), Qtrue); + } + return res; + } catch (const std::system_error& se) { + rb_exc_raise(cb_map_error_code(se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); + } catch (const ruby_exception& e) { + rb_exc_raise(e.exception_object()); + } + return Qnil; +} + +static VALUE +cb_Backend_document_scan_create(VALUE self, VALUE bucket, VALUE scope, VALUE collection, VALUE scan_type, VALUE options) +{ + const auto& cluster = cb_backend_to_cluster(self); + + Check_Type(bucket, T_STRING); + Check_Type(scope, T_STRING); + Check_Type(collection, T_STRING); + Check_Type(scan_type, T_HASH); + if (!NIL_P(options)) { + Check_Type(options, T_HASH); + } + + try { + couchbase::core::range_scan_orchestrator_options orchestrator_options{}; + cb_extract_timeout(orchestrator_options, options); + cb_extract_option_bool(orchestrator_options.ids_only, options, "ids_only"); + cb_extract_option_number(orchestrator_options.batch_item_limit, options, "batch_item_limit"); + cb_extract_option_number(orchestrator_options.batch_byte_limit, options, "batch_byte_limit"); + cb_extract_option_number(orchestrator_options.concurrency, options, "concurrency"); + + // Extracting the mutation state + if (VALUE mutation_state = rb_hash_aref(options, rb_id2sym(rb_intern("mutation_state"))); !NIL_P(mutation_state)) { + cb_check_type(mutation_state, T_ARRAY); + auto state_size = static_cast(RARRAY_LEN(mutation_state)); + + if (state_size > 0) { + auto core_mut_state = couchbase::core::mutation_state{}; + core_mut_state.tokens.reserve(state_size); + for (std::size_t i = 0; i < state_size; ++i) { + VALUE token = rb_ary_entry(mutation_state, static_cast(i)); + cb_check_type(token, T_HASH); + VALUE bucket_name = rb_hash_aref(token, rb_id2sym(rb_intern("bucket_name"))); + cb_check_type(bucket_name, T_STRING); + VALUE partition_id = rb_hash_aref(token, rb_id2sym(rb_intern("partition_id"))); + cb_check_type(partition_id, T_FIXNUM); + VALUE partition_uuid = rb_hash_aref(token, rb_id2sym(rb_intern("partition_uuid"))); + switch (TYPE(partition_uuid)) { + case T_FIXNUM: + case T_BIGNUM: + break; + default: + rb_raise(rb_eArgError, "partition_uuid must be an Integer"); + } + VALUE sequence_number = rb_hash_aref(token, rb_id2sym(rb_intern("sequence_number"))); + switch (TYPE(sequence_number)) { + case T_FIXNUM: + case T_BIGNUM: + break; + default: + rb_raise(rb_eArgError, "sequence_number must be an Integer"); + } + core_mut_state.tokens.emplace_back(NUM2ULL(partition_uuid), + NUM2ULL(sequence_number), + gsl::narrow_cast(NUM2UINT(partition_id)), + cb_string_new(bucket_name)); + } + + orchestrator_options.consistent_with = core_mut_state; + } + } + + auto bucket_name = cb_string_new(bucket); + auto scope_name = cb_string_new(scope); + auto collection_name = cb_string_new(collection); + + // Getting the operation agent + auto agent_group = couchbase::core::agent_group(cluster->io_context(), couchbase::core::agent_group_config{ { cluster } }); + agent_group.open_bucket(bucket_name); + auto agent = agent_group.get_agent(bucket_name); + if (!agent.has_value()) { + rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get operation agent"); + return Qnil; + } + + // Getting the vbucket map + auto barrier = + std::make_shared>>(); + auto f = barrier->get_future(); + cluster->with_bucket_configuration( + bucket_name, [barrier](std::error_code ec, const couchbase::core::topology::configuration& config) mutable { + if (ec) { + return barrier->set_value(tl::unexpected(ec)); + } + if (!config.vbmap || config.vbmap->empty()) { + return barrier->set_value(tl::unexpected(couchbase::errc::common::feature_not_available)); + } + barrier->set_value(config.vbmap.value()); + }); + auto vbucket_map = cb_wait_for_future(f); + if (!vbucket_map.has_value()) { + rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get vbucket map"); + return Qnil; + } + + // Constructing the scan type + std::variant + core_scan_type{}; + ID scan_type_id = rb_sym2id(rb_hash_aref(scan_type, rb_id2sym(rb_intern("scan_type")))); + if (scan_type_id == rb_intern("range")) { + auto range_scan = couchbase::core::range_scan{}; + + VALUE from_hash = rb_hash_aref(scan_type, rb_id2sym(rb_intern("from"))); + VALUE to_hash = rb_hash_aref(scan_type, rb_id2sym(rb_intern("to"))); + + if (!NIL_P(from_hash)) { + Check_Type(from_hash, T_HASH); + range_scan.from = couchbase::core::scan_term{}; + cb_extract_option_string(range_scan.from->term, from_hash, "term"); + cb_extract_option_bool(range_scan.from->exclusive, from_hash, "exclusive"); + } + if (!NIL_P(to_hash)) { + Check_Type(to_hash, T_HASH); + range_scan.to = couchbase::core::scan_term{}; + cb_extract_option_string(range_scan.to->term, to_hash, "term"); + cb_extract_option_bool(range_scan.to->exclusive, to_hash, "exclusive"); + } + core_scan_type = range_scan; + } else if (scan_type_id == rb_intern("prefix")) { + auto prefix_scan = couchbase::core::prefix_scan{}; + cb_extract_option_string(prefix_scan.prefix, scan_type, "prefix"); + core_scan_type = prefix_scan; + } else if (scan_type_id == rb_intern("sampling")) { + auto sampling_scan = couchbase::core::sampling_scan{}; + cb_extract_option_number(sampling_scan.limit, scan_type, "limit"); + cb_extract_option_number(sampling_scan.seed, scan_type, "seed"); + core_scan_type = sampling_scan; + } else { + rb_raise(eInvalidArgument, "Invalid scan operation type"); + } + + auto orchestrator = couchbase::core::range_scan_orchestrator( + cluster->io_context(), agent.value(), vbucket_map.value(), scope_name, collection_name, core_scan_type, orchestrator_options); + + // Start the scan + auto resp = orchestrator.scan(); + if (!resp.has_value()) { + cb_throw_error_code(resp.error(), "unable to start scan"); + } + + // Wrap core scan_result inside Ruby ScanResult + // Creating a Ruby CoreScanResult object *after* checking that no error occurred during orchestrator.scan() + VALUE cCoreScanResult = rb_define_class_under(rb_define_module("Couchbase"), "CoreScanResult", rb_cObject); + rb_define_alloc_func(cCoreScanResult, cb_CoreScanResult_allocate); + rb_define_method(cCoreScanResult, "next_item", VALUE_FUNC(cb_CoreScanResult_next_item), 0); + rb_define_method(cCoreScanResult, "cancelled?", VALUE_FUNC(cb_CoreScanResult_is_cancelled), 0); + rb_define_method(cCoreScanResult, "cancel", VALUE_FUNC(cb_CoreScanResult_cancel), 0); + VALUE core_scan_result_obj = rb_class_new_instance(0, NULL, cCoreScanResult); + rb_ivar_set(core_scan_result_obj, rb_intern("@backend"), self); + cb_core_scan_result_data* data = nullptr; + TypedData_Get_Struct(core_scan_result_obj, cb_core_scan_result_data, &cb_core_scan_result_type, data); + data->scan_result = std::make_unique(resp.value()); + return core_scan_result_obj; + + } catch (const std::system_error& se) { + rb_exc_raise(cb_map_error_code(se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); + } catch (const ruby_exception& e) { + rb_exc_raise(e.exception_object()); + } + return Qnil; +} + static int cb_for_each_named_param(VALUE key, VALUE value, VALUE arg) { @@ -8581,7 +8868,7 @@ cb_Backend_form_encode(VALUE self, VALUE data) static void init_backend(VALUE mCouchbase) { - VALUE cBackend = rb_define_class_under(mCouchbase, "Backend", rb_cBasicObject); + VALUE cBackend = rb_define_class_under(mCouchbase, "Backend", rb_cObject); rb_define_alloc_func(cBackend, cb_Backend_allocate); rb_define_method(cBackend, "open", VALUE_FUNC(cb_Backend_open), 3); rb_define_method(cBackend, "close", VALUE_FUNC(cb_Backend_close), 0); @@ -8606,6 +8893,7 @@ init_backend(VALUE mCouchbase) rb_define_method(cBackend, "document_remove_multi", VALUE_FUNC(cb_Backend_document_remove_multi), 5); rb_define_method(cBackend, "document_lookup_in", VALUE_FUNC(cb_Backend_document_lookup_in), 6); rb_define_method(cBackend, "document_mutate_in", VALUE_FUNC(cb_Backend_document_mutate_in), 6); + rb_define_method(cBackend, "document_scan_create", VALUE_FUNC(cb_Backend_document_scan_create), 5); rb_define_method(cBackend, "document_query", VALUE_FUNC(cb_Backend_document_query), 2); rb_define_method(cBackend, "document_touch", VALUE_FUNC(cb_Backend_document_touch), 6); rb_define_method(cBackend, "document_exists", VALUE_FUNC(cb_Backend_document_exists), 5); diff --git a/lib/couchbase/collection.rb b/lib/couchbase/collection.rb index 05c04930..4f1d87d6 100644 --- a/lib/couchbase/collection.rb +++ b/lib/couchbase/collection.rb @@ -15,6 +15,7 @@ require "couchbase/errors" require "couchbase/collection_options" require "couchbase/binary_collection" +require "couchbase/key_value_scan" module Couchbase # Provides access to all collection APIs @@ -535,6 +536,19 @@ def mutate_in(id, specs, options = Options::MutateIn::DEFAULT) end end + # Performs a key-value scan + # + # @param [RangeScan, PrefixScan, SamplingScan] scan_type + # @param [Options::Scan] options + def scan(scan_type, options = Options::Scan::DEFAULT) + ScanResults.new( + core_scan_result: @backend.document_scan_create( + @bucket_name, @scope_name, @name, scan_type.to_backend, options.to_backend + ), + transcoder: options.transcoder + ) + end + private def extract_mutation_token(resp) diff --git a/lib/couchbase/collection_options.rb b/lib/couchbase/collection_options.rb index e66a7481..701f40d5 100644 --- a/lib/couchbase/collection_options.rb +++ b/lib/couchbase/collection_options.rb @@ -15,6 +15,9 @@ require "rubygems/deprecate" require "couchbase/json_transcoder" +require "couchbase/raw_string_transcoder" +require "couchbase/raw_json_transcoder" +require "couchbase/raw_binary_transcoder" require "couchbase/subdoc" require "couchbase/mutation_state" @@ -296,5 +299,83 @@ def initialize yield self if block_given? end end + + class ScanResult + # @return [String] identifier of the document + attr_accessor :id + + # @return [Boolean] whether only ids are returned from this scan + attr_accessor :id_only + + # @return [Integer, nil] holds the CAS value of the fetched document + attr_accessor :cas + + # @return [Integer, nil] the expiration if fetched and present + attr_accessor :expiry + + # @return [JsonTranscoder, RawBinaryTranscoder, RawJsonTranscoder, RawStringTranscoder, #decode] The default + # transcoder which should be used + attr_accessor :transcoder + + def initialize(id:, id_only:, cas: nil, expiry: nil, encoded: nil, flags: nil, transcoder: JsonTranscoder.new) + @id = id + @id_only = id_only + @cas = cas + @expiry = expiry + @encoded = encoded + @flags = flags + @transcoder = transcoder + + yield self if block_given? + end + + # Decodes the content of the document using given (or default transcoder) + # + # @param [JsonTranscoder, RawJsonTranscoder, RawBinaryTranscoder, RawStringTranscoder] transcoder custom transcoder + # + # @return [Object, nil] + def content(transcoder = self.transcoder) + return nil if @encoded.nil? + + transcoder ? transcoder.decode(@encoded, @flags) : @encoded + end + end + + class ScanResults + include Enumerable + + def initialize(core_scan_result:, transcoder:) + @core_scan_result = core_scan_result + @transcoder = transcoder + end + + def each + return enum_for(:each) unless block_given? + + loop do + resp = @core_scan_result.next_item + + break if resp.nil? + + if resp[:id_only] + yield ScanResult.new( + id: resp[:id], + id_only: resp[:id_only], + transcoder: @transcoder + ) + else + yield ScanResult.new( + id: resp[:id], + id_only: resp[:id_only], + cas: resp[:cas], + expiry: resp[:expiry], + encoded: resp[:encoded], + flags: resp[:flags], + transcoder: @transcoder + ) + end + end + end + end end end diff --git a/lib/couchbase/errors.rb b/lib/couchbase/errors.rb index 644ea8d1..52fdc807 100644 --- a/lib/couchbase/errors.rb +++ b/lib/couchbase/errors.rb @@ -183,6 +183,11 @@ class DurableWriteInProgress < CouchbaseError class DurableWriteReCommitInProgress < CouchbaseError end + # Happens when consistency requirements are specified but the partition uuid of the requirements do not align + # with the server + class MutationTokenOutdated < CouchbaseError + end + # Subdocument exception thrown when a path does not exist in the document. The exact meaning of path existence # depends on the operation and inputs. class PathNotFound < CouchbaseError diff --git a/lib/couchbase/key_value_scan.rb b/lib/couchbase/key_value_scan.rb new file mode 100644 index 00000000..f8930bfa --- /dev/null +++ b/lib/couchbase/key_value_scan.rb @@ -0,0 +1,117 @@ +# Copyright 2023. Couchbase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Couchbase + # A scan term used to specify the bounds of a range scan + class ScanTerm + attr_accessor :term # @return [ScanTerm] + attr_accessor :exclusive # @return [Boolean] + + # Creates an instance of a ScanTerm + # + # @param [String] term the key pattern of this term + # @param [Boolean] exclusive specifies if this term is excluded while scanning, the bounds are included by default + def initialize(term, exclusive: false) + @term = term + @exclusive = exclusive + end + + # @api private + def to_backend + { + term: @term, + exclusive: @exclusive, + } + end + end + + # A range scan performs a scan on a range of keys + class RangeScan + attr_accessor :from # @return [ScanTerm, nil] + attr_accessor :to # @return [ScanTerm, nil] + + # Creates an instance of a RangeScan scan type + # + # @param [ScanTerm, String, nil] from the lower bound of the range, if set + # @param [ScanTerm, String, nil] to the upper bound of the range, if set + def initialize(from: nil, to: nil) + @from = + if from.nil? || from.instance_of?(ScanTerm) + from + else + ScanTerm(from) + end + @to = + if to.nil? || to.instance_of?(ScanTerm) + to + else + ScanTerm(to) + end + end + + # @api private + def to_backend + { + scan_type: :range, + from: @from&.to_backend, + to: @to&.to_backend, + } + end + end + + # A prefix scan performs a scan that includes all documents whose keys start with the given prefix + class PrefixScan + attr_accessor :prefix # @return [String] + + # Creates an instance of a PrefixScan scan type + # + # @param [String, nil] prefix the prefix all document keys should start with + def initialize(prefix) + @prefix = prefix + end + + # @api private + def to_backend + { + scan_type: :prefix, + prefix: @prefix, + } + end + end + + # A sampling scan performs a scan that randomly selects documents up to a configured limit + class SamplingScan + attr_accessor :limit # @return [Integer] + attr_accessor :seed # @return [Integer, nil] + + # Creates an instance of a SamplingScan scan type + # + # @param [Integer] limit the maximum number of documents the sampling scan can return + # @param [Integer, nil] seed the seed used for the random number generator that selects the documents. If not set, + # a seed is generated at random + def initialize(limit, seed = nil) + @limit = limit + @seed = seed + end + + # @api private + def to_backend + { + scan_type: :sampling, + limit: @limit, + seed: @seed, + } + end + end +end diff --git a/lib/couchbase/options.rb b/lib/couchbase/options.rb index 8c9f94a5..6de94fee 100644 --- a/lib/couchbase/options.rb +++ b/lib/couchbase/options.rb @@ -1026,6 +1026,77 @@ def to_backend DEFAULT = LookupIn.new.freeze end + # Options for {Collection#scan} + class Scan < Base + attr_accessor :ids_only # @return [Boolean] + attr_accessor :transcoder # @return [JsonTranscoder, #decode(String)] + attr_accessor :mutation_state # @return [MutationState, nil] + attr_accessor :batch_byte_limit # @return [Integer, nil] + attr_accessor :batch_item_limit # @return [Integer, nil] + attr_accessor :concurrency # @return [Integer, nil] + + # Creates an instance of options for {Collection#scan} + # + # @param [Boolean] ids_only if set to true, the content of the documents is not included in the results + # @param [JsonTranscoder, #decode(String)] transcoder used for decoding + # @param [MutationState, nil] mutation_state sets the mutation tokens this scan should be consistent with + # @param [Integer, nil] batch_byte_limit allows to limit the maximum amount of bytes that are sent from the server + # to the client on each partition batch, defaults to 15,000 + # @param [Integer, nil] batch_item_limit allows to limit the maximum amount of items that are sent from the server + # to the client on each partition batch, defaults to 50 + # @param [Integer, nil] concurrency specifies the maximum number of partitions that can be scanned concurrently, + # defaults to 1 + # + # @param [Integer, #in_milliseconds, nil] timeout + # @param [Proc, nil] retry_strategy the custom retry strategy, if set + # @param [Hash, nil] client_context the client context data, if set + # @param [Span, nil] parent_span if set holds the parent span, that should be used for this request + # + # @yieldparam [LookupIn] self + def initialize(ids_only: false, + transcoder: JsonTranscoder.new, + mutation_state: nil, + batch_byte_limit: nil, + batch_item_limit: nil, + concurrency: nil, + timeout: nil, + retry_strategy: nil, + client_context: nil, + parent_span: nil) + super(timeout: timeout, retry_strategy: retry_strategy, client_context: client_context, parent_span: parent_span) + @ids_only = ids_only + @transcoder = transcoder + @mutation_state = mutation_state + @batch_byte_limit = batch_byte_limit + @batch_item_limit = batch_item_limit + @concurrency = concurrency + yield self if block_given? + end + + # Sets the mutation tokens this query should be consistent with + # + # @note overrides consistency level set by {#scan_consistency=} + # + # @param [MutationState] mutation_state the mutation state containing the mutation tokens + def consistent_with(mutation_state) + @mutation_state = mutation_state + end + + # @api private + def to_backend + { + timeout: Utils::Time.extract_duration(@timeout), + ids_only: @ids_only, + mutation_state: @mutation_state.to_a, + batch_byte_limit: @batch_byte_limit, + batch_item_limit: @batch_item_limit, + concurrency: @concurrency, + } + end + + DEFAULT = Scan.new.freeze + end + # Options for {BinaryCollection#append} class Append < Base attr_accessor :cas # @return [Integer] diff --git a/test/scan_test.rb b/test/scan_test.rb new file mode 100644 index 00000000..b37968e1 --- /dev/null +++ b/test/scan_test.rb @@ -0,0 +1,377 @@ +# Copyright 2023. Couchbase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require_relative "test_helper" + +module Couchbase + class ScanTest < Minitest::Test + include TestUtilities + + BATCH_BYTE_LIMIT_VALUES = [0, 1, 25, 100].freeze + BATCH_ITEM_LIMIT_VALUES = [0, 1, 25, 100].freeze + CONCURRENCY_VALUES = [2, 4, 8, 32, 128].freeze + + def setup + skip("#{name}: CAVES does not support range scan") if use_caves? + skip("#{name}: Server does not support range scan ") unless env.server_version.supports_range_scan? + + connect + @bucket = @cluster.bucket(env.bucket) + @collection = @bucket.default_collection + @shared_prefix = "scan-test" + @test_ids = Set.new + + 100.times do |i| + s = i.to_s.rjust(2, "0") + id = "#{@shared_prefix}-#{s}" + @collection.upsert(id, {num: i}) + @test_ids << id + end + end + + def teardown + @test_ids.each do |id| + @collection.remove(id) + end + end + + def validate_scan(scan_result, expected_ids, ids_only: false) + items = [] + scan_result.each do |item| + items << item + + assert_equal(ids_only, item.id_only) + end + test_ids_returned = items.to_set(&:id) & @test_ids + + assert_equal(expected_ids.to_set, test_ids_returned) + assert_equal(expected_ids.size, test_ids_returned.size) + + return if ids_only + + items.each do |item| + if test_ids_returned.include? item.id + refute_equal(0, item.cas) + assert_equal(item.id, "#{@shared_prefix}-#{item.content['num'].to_s.rjust(2, '0')}") + end + end + end + + def validate_sampling_scan(scan_result, limit, ids_only: false) + items = [] + scan_result.each do |item| + items << item + + assert_equal(ids_only, item.id_only) + end + assert(items.size <= limit) + + return if ids_only + + test_ids_returned = items.to_set(&:id) & @test_ids + items.each do |item| + if test_ids_returned.include? item.id + refute_equal(0, item.cas) + assert_equal(item.id, "#{@shared_prefix}-#{item.content['num'].to_s.rjust(2, '0')}") + end + end + end + + def test_simple_prefix_scan + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1")) + validate_scan(scan_result, expected_ids) + end + + def test_simple_range_scan + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_simple_sampling_scan + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit)) + validate_sampling_scan(scan_result, limit) + end + + def test_range_scan_exclusive_from + expected_ids = (1..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-29") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_exclusive_to + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..8).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: false), + to: ScanTerm.new("#{@shared_prefix}-29", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_both_exclusive + expected_ids = (1..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..8).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-29", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_default_from + expected_ids = (0..9).map { |i| "#{@shared_prefix}-0#{i}" } + scan_result = @collection.scan( + RangeScan.new( + to: ScanTerm.new("#{@shared_prefix}-09") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_default_to + expected_ids = (0..9).map { |i| "#{@shared_prefix}-9#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-90") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_both_default + scan_result = @collection.scan(RangeScan.new) + validate_scan(scan_result, @test_ids) + end + + def test_range_scan_ids_only + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(ids_only: true) + ) + validate_scan(scan_result, expected_ids, ids_only: true) + end + + def test_range_scan_explicitly_with_content + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(ids_only: false) + ) + validate_scan(scan_result, expected_ids) + end + + def test_prefix_scan_ids_only + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(ids_only: true)) + validate_scan(scan_result, expected_ids, ids_only: true) + end + + def test_sampling_scan_ids_only + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(ids_only: true)) + validate_sampling_scan(scan_result, limit, ids_only: true) + end + + def test_sampling_scan_with_seed + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit, 42), Options::Scan.new(ids_only: true)) + validate_sampling_scan(scan_result, limit, ids_only: true) + end + + def test_range_scan_batch_byte_limit + BATCH_BYTE_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(batch_byte_limit: b) + ) + validate_scan(scan_result, expected_ids) + end + end + + def test_prefix_scan_batch_byte_limit + BATCH_BYTE_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(batch_byte_limit: b)) + validate_scan(scan_result, expected_ids) + end + end + + def test_sampling_scan_batch_byte_limit + BATCH_BYTE_LIMIT_VALUES.each do |b| + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(batch_byte_limit: b)) + validate_sampling_scan(scan_result, limit) + end + end + + def test_range_scan_concurrency + skip("Skipped until CXXCBC-345 is resolved") + + CONCURRENCY_VALUES.each do |c| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(concurrency: c) + ) + validate_scan(scan_result, expected_ids) + end + end + + def test_prefix_scan_concurrency + skip("Skipped until CXXCBC-345 is resolved") + + CONCURRENCY_VALUES.each do |c| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(concurrency: c)) + validate_scan(scan_result, expected_ids) + end + end + + def test_sampling_scan_concurrency + skip("Skipped until CXXCBC-345 is resolved") + + CONCURRENCY_VALUES.each do |c| + limit = 20 + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(concurrency: c)) + validate_sampling_scan(scan_result, limit) + end + end + + def test_range_scan_batch_item_limit + BATCH_ITEM_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(batch_item_limit: b) + ) + validate_scan(scan_result, expected_ids) + end + end + + def test_prefix_scan_batch_item_limit + BATCH_ITEM_LIMIT_VALUES.each do |b| + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + scan_result = @collection.scan(PrefixScan.new("#{@shared_prefix}-1"), Options::Scan.new(batch_item_limit: b)) + validate_scan(scan_result, expected_ids) + end + end + + def test_sampling_scan_batch_item_limit + limit = 11 + BATCH_ITEM_LIMIT_VALUES.each do |b| + scan_result = @collection.scan(SamplingScan.new(limit), Options::Scan.new(batch_item_limit: b)) + validate_sampling_scan(scan_result, limit) + end + end + + def test_range_scan_multiple_options + expected_ids = (0..9).map { |i| "#{@shared_prefix}-1#{i}" } + (0..9).map { |i| "#{@shared_prefix}-2#{i}" } + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-29") + ), + Options::Scan.new(batch_byte_limit: 100, batch_item_limit: 20, ids_only: false) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_collection_does_not_exist + collection = @bucket.scope("_default").collection(uniq_id(:nonexistent)) + scan_result = collection.scan(RangeScan.new) + assert_raises(Error::CollectionNotFound) do + validate_scan(scan_result, []) + end + end + + def test_range_scan_same_from_to + expected_ids = ["#{@shared_prefix}-10"] + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10"), + to: ScanTerm.new("#{@shared_prefix}-10") + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_same_from_to_exclusive + expected_ids = [] + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-10", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-10", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_range_scan_inverted_bounds + expected_ids = [] + scan_result = @collection.scan( + RangeScan.new( + from: ScanTerm.new("#{@shared_prefix}-20", exclusive: true), + to: ScanTerm.new("#{@shared_prefix}-10", exclusive: true) + ) + ) + validate_scan(scan_result, expected_ids) + end + + def test_sampling_scan_non_positive_limit + collection = @bucket.scope("_default").collection(uniq_id(:nonexistent)) + assert_raises(Error::InvalidArgument) do + collection.scan(SamplingScan.new(0)) + end + end + + def test_range_scan_zero_concurrency + collection = @bucket.scope("_default").collection(uniq_id(:nonexistent)) + assert_raises(Error::InvalidArgument) do + collection.scan(RangeScan.new, Options::Scan.new(concurrency: 0)) + end + end + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index a9f2e4f8..279692ec 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -82,6 +82,10 @@ def supports_preserve_expiry? def is_rcbc_408_applicable? @version < Gem::Version.create("7.0.0") end + + def supports_range_scan? + @version >= Gem::Version.create("7.5.0") + end end require "couchbase" @@ -178,7 +182,7 @@ def time_travel(duration) def uniq_id(name) parent = caller_locations&.first prefix = "#{File.basename(parent&.path, '.rb')}_#{parent&.lineno}" - "#{prefix}_#{name}_#{Time.now.to_f.to_s.reverse}" + "#{prefix}_#{name}_#{Time.now.to_f.to_s.reverse}".sub(".", "-") end def load_raw_test_dataset(dataset) From 5ab75899a73a737992415803397994e67ad308fd Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Wed, 19 Jul 2023 20:22:39 +0100 Subject: [PATCH 2/6] Remove commented line of code --- ext/couchbase.cxx | 1 - 1 file changed, 1 deletion(-) diff --git a/ext/couchbase.cxx b/ext/couchbase.cxx index 68fb6373..40780eb1 100644 --- a/ext/couchbase.cxx +++ b/ext/couchbase.cxx @@ -3744,7 +3744,6 @@ cb_CoreScanResult_next_item(VALUE self) cb_core_scan_result_data* data = nullptr; TypedData_Get_Struct(self, cb_core_scan_result_data, &cb_core_scan_result_type, data); auto barrier = std::make_shared>>(); - // auto resp = data->scan_result->next(); auto f = barrier->get_future(); data->scan_result->next([barrier](couchbase::core::range_scan_item item, std::error_code ec) { if (ec) { From 47d58e2a757f0f1a59a828bdf25394d06946ed02 Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Thu, 20 Jul 2023 14:44:30 +0100 Subject: [PATCH 3/6] Map range_scan_completed to BackendError --- ext/couchbase.cxx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ext/couchbase.cxx b/ext/couchbase.cxx index 40780eb1..c30b5300 100644 --- a/ext/couchbase.cxx +++ b/ext/couchbase.cxx @@ -847,6 +847,10 @@ cb_map_error_code(std::error_code ec, const std::string& message, bool include_e case couchbase::errc::key_value::cannot_revive_living_document: return rb_exc_new_cstr(eCannotReviveLivingDocument, what.c_str()); + + case couchbase::errc::key_value::range_scan_completed: + // Should not be exposed to the Ruby SDK, map it to a BackendError + return rb_exc_new_cstr(eBackendError, what.c_str()); } } else if (ec.category() == couchbase::core::impl::query_category()) { switch (static_cast(ec.value())) { From c2957fedd9f4584d59da714a7443cc2548d92bf8 Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Fri, 21 Jul 2023 09:25:47 +0100 Subject: [PATCH 4/6] Add examples to scan() API doc & mark as uncommitted --- lib/couchbase/collection.rb | 25 ++++++++++++++++++++++--- lib/couchbase/key_value_scan.rb | 8 ++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/lib/couchbase/collection.rb b/lib/couchbase/collection.rb index 4f1d87d6..18c9d2de 100644 --- a/lib/couchbase/collection.rb +++ b/lib/couchbase/collection.rb @@ -536,10 +536,29 @@ def mutate_in(id, specs, options = Options::MutateIn::DEFAULT) end end - # Performs a key-value scan + # Performs a key-value scan operation on the collection # - # @param [RangeScan, PrefixScan, SamplingScan] scan_type - # @param [Options::Scan] options + # @api uncommitted + # + # @param [RangeScan, PrefixScan, SamplingScan] scan_type the type of the scan + # @param [Options::Scan] options request customization + # + # @example Get a sample of up to 5 documents from the collection and store their IDs in an array + # result = collection.scan(SamplingScan.new(5), Options::Scan.new(ids_only: true)) + # ids = result.map { |item| item.id } + # + # @example Get all documents whose ID starts with 'customer_1' and output their content + # result = collection.scan(PrefixScan.new("customer_1")) + # result.each { |item| puts item.content } + # + # @example Get all documents with ID between 'customer_1' and 'customer_2', excluding 'customer_2' and output their content + # result = collection.scan(RangeScan.new( + # from: ScanTerm.new("customer_1"), + # to: ScanTerm.new("customer_2", exclusive: true) + # )) + # result.each { |item| puts item.content } + # + # @return [ScanResults] def scan(scan_type, options = Options::Scan::DEFAULT) ScanResults.new( core_scan_result: @backend.document_scan_create( diff --git a/lib/couchbase/key_value_scan.rb b/lib/couchbase/key_value_scan.rb index f8930bfa..d0d9d483 100644 --- a/lib/couchbase/key_value_scan.rb +++ b/lib/couchbase/key_value_scan.rb @@ -20,6 +20,8 @@ class ScanTerm # Creates an instance of a ScanTerm # + # @api uncommitted + # # @param [String] term the key pattern of this term # @param [Boolean] exclusive specifies if this term is excluded while scanning, the bounds are included by default def initialize(term, exclusive: false) @@ -43,6 +45,8 @@ class RangeScan # Creates an instance of a RangeScan scan type # + # @api uncommitted + # # @param [ScanTerm, String, nil] from the lower bound of the range, if set # @param [ScanTerm, String, nil] to the upper bound of the range, if set def initialize(from: nil, to: nil) @@ -76,6 +80,8 @@ class PrefixScan # Creates an instance of a PrefixScan scan type # + # @api uncommitted + # # @param [String, nil] prefix the prefix all document keys should start with def initialize(prefix) @prefix = prefix @@ -97,6 +103,8 @@ class SamplingScan # Creates an instance of a SamplingScan scan type # + # @api uncommitted + # # @param [Integer] limit the maximum number of documents the sampling scan can return # @param [Integer, nil] seed the seed used for the random number generator that selects the documents. If not set, # a seed is generated at random From 42bf583a1b0a7e593bd589821f3e601055f27124 Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Fri, 21 Jul 2023 12:17:24 +0100 Subject: [PATCH 5/6] Check that bucket supports range scan --- ext/couchbase | 2 +- ext/couchbase.cxx | 20 +++++++++++++------- test/scan_test.rb | 20 +++++++++++++++++++- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/ext/couchbase b/ext/couchbase index a46a5e1e..7d0faf5d 160000 --- a/ext/couchbase +++ b/ext/couchbase @@ -1 +1 @@ -Subproject commit a46a5e1ecf49d2717b9f8fd8aa6ad28d83693d45 +Subproject commit 7d0faf5d931c4693e4d344407af3f853fe5b11d4 diff --git a/ext/couchbase.cxx b/ext/couchbase.cxx index c30b5300..26144570 100644 --- a/ext/couchbase.cxx +++ b/ext/couchbase.cxx @@ -3866,20 +3866,26 @@ cb_Backend_document_scan_create(VALUE self, VALUE bucket, VALUE scope, VALUE col // Getting the vbucket map auto barrier = - std::make_shared>>(); + std::make_shared>>(); auto f = barrier->get_future(); cluster->with_bucket_configuration( bucket_name, [barrier](std::error_code ec, const couchbase::core::topology::configuration& config) mutable { if (ec) { return barrier->set_value(tl::unexpected(ec)); } - if (!config.vbmap || config.vbmap->empty()) { - return barrier->set_value(tl::unexpected(couchbase::errc::common::feature_not_available)); - } - barrier->set_value(config.vbmap.value()); + barrier->set_value(config); }); - auto vbucket_map = cb_wait_for_future(f); - if (!vbucket_map.has_value()) { + auto config = cb_wait_for_future(f); + if (!config.has_value()) { + rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get bucket configuration"); + return Qnil; + } + if (!config->supports_range_scan()) { + rb_raise(eFeatureNotAvailable, "Server does not support key-value scan operations"); + return Qnil; + } + auto vbucket_map = config->vbmap; + if (!vbucket_map || vbucket_map->empty()) { rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get vbucket map"); return Qnil; } diff --git a/test/scan_test.rb b/test/scan_test.rb index b37968e1..e2801ce7 100644 --- a/test/scan_test.rb +++ b/test/scan_test.rb @@ -24,7 +24,7 @@ class ScanTest < Minitest::Test def setup skip("#{name}: CAVES does not support range scan") if use_caves? - skip("#{name}: Server does not support range scan ") unless env.server_version.supports_range_scan? + skip("#{name}: Server does not support range scan (#{env.server_version})") unless env.server_version.supports_range_scan? connect @bucket = @cluster.bucket(env.bucket) @@ -374,4 +374,22 @@ def test_range_scan_zero_concurrency end end end + + class ScanNotSupportedTest < MiniTest::Test + include TestUtilities + + def setup + skip("#{name}: Server supports range scan (#{env.server_version})") if env.server_version.supports_range_scan? + + connect + @bucket = @cluster.bucket(env.bucket) + @collection = @bucket.default_collection + end + + def test_range_scan_feature_not_available + assert_raises(Error::FeatureNotAvailable) do + @collection.scan(RangeScan.new) + end + end + end end From a70e9f2719f236c8daaa20ca86b59530bc2046cf Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Fri, 21 Jul 2023 13:23:05 +0100 Subject: [PATCH 6/6] Fix clang-format errors --- ext/couchbase.cxx | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/ext/couchbase.cxx b/ext/couchbase.cxx index 26144570..c48398d1 100644 --- a/ext/couchbase.cxx +++ b/ext/couchbase.cxx @@ -3865,22 +3865,21 @@ cb_Backend_document_scan_create(VALUE self, VALUE bucket, VALUE scope, VALUE col } // Getting the vbucket map - auto barrier = - std::make_shared>>(); + auto barrier = std::make_shared>>(); auto f = barrier->get_future(); - cluster->with_bucket_configuration( - bucket_name, [barrier](std::error_code ec, const couchbase::core::topology::configuration& config) mutable { - if (ec) { - return barrier->set_value(tl::unexpected(ec)); - } - barrier->set_value(config); - }); + cluster->with_bucket_configuration(bucket_name, + [barrier](std::error_code ec, const couchbase::core::topology::configuration& config) mutable { + if (ec) { + return barrier->set_value(tl::unexpected(ec)); + } + barrier->set_value(config); + }); auto config = cb_wait_for_future(f); if (!config.has_value()) { rb_raise(eCouchbaseError, "Cannot perform scan operation. Unable to get bucket configuration"); return Qnil; } - if (!config->supports_range_scan()) { + if (!config->supports_range_scan()) { rb_raise(eFeatureNotAvailable, "Server does not support key-value scan operations"); return Qnil; }