Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close all item channels when scan is cancelled #431

Merged
merged 2 commits into from Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/range_scan_orchestrator.cxx
Expand Up @@ -154,8 +154,8 @@ class range_scan_stream : public std::enable_shared_from_this<range_scan_stream>
agent_.range_scan_cancel(uuid(), vbucket_id_, {}, [](auto /* res */, auto /* ec */) {});
}

items_.close();
items_.cancel();
items_.close();

bool fatal{};
if (ec == errc::key_value::document_not_found || ec == errc::common::authentication_failure ||
Expand Down Expand Up @@ -199,7 +199,11 @@ class range_scan_stream : public std::enable_shared_from_this<range_scan_stream>

void cancel()
{
should_cancel_ = true;
if (!should_cancel_) {
should_cancel_ = true;
items_.cancel();
items_.close();
}
}

template<typename Handler>
Expand Down Expand Up @@ -443,9 +447,7 @@ class range_scan_orchestrator_impl
{
cancelled_ = true;
for (const auto& [vbucket_id, stream] : streams_) {
if (stream->is_running()) {
stream->cancel();
}
stream->cancel();
}
}

Expand Down
63 changes: 63 additions & 0 deletions test/test_integration_range_scan.cxx
Expand Up @@ -1088,6 +1088,69 @@ TEST_CASE("integration: manager sampling scan with custom collection and up to 1
}
}

TEST_CASE("integration: manager sampling scan with custom collection and up to 128 concurrent streams and batch item limit 0",
"[integration]")
{
test::utils::integration_test_guard integration;

if (!integration.has_bucket_capability("range_scan")) {
SKIP("cluster does not support range_scan");
}

collection_guard new_collection(integration);

auto collection = couchbase::cluster(integration.cluster)
.bucket(integration.ctx.bucket)
.scope(couchbase::scope::default_name)
.collection(new_collection.name());

auto ids = make_doc_ids(100, "samplingscan-");
auto value = make_binary_value(100);
auto mutations = populate_documents_for_range_scan(collection, ids, value, std::chrono::seconds{ 300 });

auto vbucket_map = get_vbucket_map(integration);

auto ag = couchbase::core::agent_group(integration.io, { { integration.cluster } });
ag.open_bucket(integration.ctx.bucket);
auto agent = ag.get_agent(integration.ctx.bucket);
REQUIRE(agent.has_value());

couchbase::core::sampling_scan scan{ 10, 50 };
couchbase::core::range_scan_orchestrator_options options{};
options.consistent_with = mutations_to_mutation_state(mutations);
options.concurrency = 128;
options.batch_item_limit = 0;
couchbase::core::range_scan_orchestrator orchestrator(
integration.io, agent.value(), vbucket_map, couchbase::scope::default_name, new_collection.name(), scan, options);

auto result = orchestrator.scan();
EXPECT_SUCCESS(result);

std::set<std::string> entry_ids{};

auto now = std::chrono::system_clock::now();
do {
auto entry = result->next();
if (!entry) {
break;
}

REQUIRE(entry->body);
REQUIRE_FALSE(entry->body->cas.empty());
REQUIRE(entry->body->value == value);
REQUIRE(entry->body->expiry_time() > now);

auto [_, inserted] = entry_ids.insert(entry->key);
REQUIRE(inserted);
} while (true);

REQUIRE(ids.size() >= 10);

for (const auto& id : entry_ids) {
REQUIRE(std::find(ids.begin(), ids.end(), id) != ids.end());
}
}

TEST_CASE("integration: manager prefix scan without content and up to 5 concurrent streams", "[integration]")
{
test::utils::integration_test_guard integration;
Expand Down