From a47806b6d55247b09c5b5b07989c7c051bf11a76 Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Wed, 19 Jul 2023 12:50:46 +0100 Subject: [PATCH 1/2] Close all item channels when scan is cancelled --- core/range_scan_orchestrator.cxx | 12 +++--- test/test_integration_range_scan.cxx | 62 ++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/core/range_scan_orchestrator.cxx b/core/range_scan_orchestrator.cxx index bf6d6bb5..51b5dd65 100644 --- a/core/range_scan_orchestrator.cxx +++ b/core/range_scan_orchestrator.cxx @@ -154,8 +154,8 @@ class range_scan_stream : public std::enable_shared_from_this agent_.range_scan_cancel(uuid(), vbucket_id_, {}, [](auto /* res */, auto /* ec */) {}); } - items_.close(); items_.cancel(); + items_.close(); bool fatal{}; if (ec == errc::key_value::document_not_found || ec == errc::common::authentication_failure || @@ -199,7 +199,11 @@ class range_scan_stream : public std::enable_shared_from_this void cancel() { - should_cancel_ = true; + if (!should_cancel_) { + should_cancel_ = true; + items_.cancel(); + items_.close(); + } } template @@ -443,9 +447,7 @@ class range_scan_orchestrator_impl { cancelled_ = true; for (const auto& [vbucket_id, stream] : streams_) { - if (stream->is_running()) { - stream->cancel(); - } + stream->cancel(); } } diff --git a/test/test_integration_range_scan.cxx b/test/test_integration_range_scan.cxx index e7aba299..859cb70c 100644 --- a/test/test_integration_range_scan.cxx +++ b/test/test_integration_range_scan.cxx @@ -1088,6 +1088,68 @@ TEST_CASE("integration: manager sampling scan with custom collection and up to 1 } } +TEST_CASE("integration: manager sampling scan with custom collection and up to 128 concurrent streams and batch item limit 0", "[integration]") +{ + test::utils::integration_test_guard integration; + + if (!integration.has_bucket_capability("range_scan")) { + SKIP("cluster does not support range_scan"); + } + + collection_guard new_collection(integration); + + auto collection = couchbase::cluster(integration.cluster) + .bucket(integration.ctx.bucket) + .scope(couchbase::scope::default_name) + .collection(new_collection.name()); + + auto ids = make_doc_ids(100, "samplingscan-"); + auto value = make_binary_value(100); + auto mutations = populate_documents_for_range_scan(collection, ids, value, std::chrono::seconds{ 300 }); + + auto vbucket_map = get_vbucket_map(integration); + + auto ag = couchbase::core::agent_group(integration.io, { { integration.cluster } }); + ag.open_bucket(integration.ctx.bucket); + auto agent = ag.get_agent(integration.ctx.bucket); + REQUIRE(agent.has_value()); + + couchbase::core::sampling_scan scan{ 10, 50 }; + couchbase::core::range_scan_orchestrator_options options{}; + options.consistent_with = mutations_to_mutation_state(mutations); + options.concurrency = 128; + options.batch_item_limit = 0; + couchbase::core::range_scan_orchestrator orchestrator( + integration.io, agent.value(), vbucket_map, couchbase::scope::default_name, new_collection.name(), scan, options); + + auto result = orchestrator.scan(); + EXPECT_SUCCESS(result); + + std::set entry_ids{}; + + auto now = std::chrono::system_clock::now(); + do { + auto entry = result->next(); + if (!entry) { + break; + } + + REQUIRE(entry->body); + REQUIRE_FALSE(entry->body->cas.empty()); + REQUIRE(entry->body->value == value); + REQUIRE(entry->body->expiry_time() > now); + + auto [_, inserted] = entry_ids.insert(entry->key); + REQUIRE(inserted); + } while (true); + + REQUIRE(ids.size() >= 10); + + for (const auto& id : entry_ids) { + REQUIRE(std::find(ids.begin(), ids.end(), id) != ids.end()); + } +} + TEST_CASE("integration: manager prefix scan without content and up to 5 concurrent streams", "[integration]") { test::utils::integration_test_guard integration; From dbc440505b67c15b0f4e4448096b5a9d87b2279d Mon Sep 17 00:00:00 2001 From: Dimitris Christodoulou Date: Wed, 19 Jul 2023 12:56:51 +0100 Subject: [PATCH 2/2] Fix clang-format error --- test/test_integration_range_scan.cxx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_integration_range_scan.cxx b/test/test_integration_range_scan.cxx index 859cb70c..a4f1b905 100644 --- a/test/test_integration_range_scan.cxx +++ b/test/test_integration_range_scan.cxx @@ -1088,7 +1088,8 @@ TEST_CASE("integration: manager sampling scan with custom collection and up to 1 } } -TEST_CASE("integration: manager sampling scan with custom collection and up to 128 concurrent streams and batch item limit 0", "[integration]") +TEST_CASE("integration: manager sampling scan with custom collection and up to 128 concurrent streams and batch item limit 0", + "[integration]") { test::utils::integration_test_guard integration;