Skip to content

Commit

Permalink
nonforwardable reader: no partition_end for empty reader
Browse files Browse the repository at this point in the history
The patch introduces the _partition_is_open flag,
inject partition_end only if there was some data
in the input reader.

A simple unit test has been added for
the nonforwardable reader which checks this
new behaviour.
Merge "Cqlsh serverless v2" from Karol Baryla

This PR adds serverless support for CQLSH. It was previosuly merged, but dtests later discovered a bug in python driver. The bug should be fixed now, so let's merge this again.

@fruch As you had more luck reproducing the issue with dtests than us, could you try and run them with this PR to make sure it works now?

Fixes scylladb#317 (again)
Closes scylladb#323

* github.com:scylladb/scylla-tools-java:
  Add Scylla Cloud serverless support
  Switch cqlsh to use scylla-driver
install.sh: drop locale workaround from python3 thunk

Since scylladb#7408 does not occur on current python3 version (3.11.0), let's drop
the workarond.

Closes scylladb#32
tls: add missing include <map>

std::multimap is later used in the file.

Closes scylladb#1314
  • Loading branch information
Petr Gusev committed Feb 27, 2023
1 parent a46df5a commit 42fed06
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
15 changes: 11 additions & 4 deletions readers/mutation_readers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,15 +411,19 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
flat_mutation_reader_v2 _underlying;
bool _single_partition;
bool _static_row_done = false;
bool _partition_is_open = false;
bool is_end_end_of_underlying_stream() const {
return _underlying.is_buffer_empty() && _underlying.is_end_of_stream();
}
future<> on_end_of_underlying_stream() {
if (!_static_row_done) {
_static_row_done = true;
return _underlying.fast_forward_to(position_range::all_clustered_rows());
if (_partition_is_open) {
if (!_static_row_done) {
_static_row_done = true;
return _underlying.fast_forward_to(position_range::all_clustered_rows());
}
push_mutation_fragment(*_schema, _permit, partition_end());
_partition_is_open = false;
}
push_mutation_fragment(*_schema, _permit, partition_end());
if (_single_partition) {
_end_of_stream = true;
return make_ready_future<>();
Expand All @@ -440,6 +444,9 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
virtual future<> fill_buffer() override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return fill_buffer_from(_underlying).then([this] (bool underlying_finished) {
if (!_partition_is_open && !is_buffer_empty()) {
_partition_is_open = true;
}
if (underlying_finished) {
return on_end_of_underlying_stream();
}
Expand Down
26 changes: 26 additions & 0 deletions test/boost/flat_mutation_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "readers/from_fragments_v2.hh"
#include "readers/forwardable_v2.hh"
#include "readers/compacting.hh"
#include "readers/nonforwardable.hh"

struct mock_consumer {
struct result {
Expand Down Expand Up @@ -727,6 +728,31 @@ SEASTAR_TEST_CASE(test_make_forwardable) {
});
}

SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
const auto permit = semaphore.make_permit();

auto make_reader = [&](std::vector<mutation> mutations,
bool single_partition,
const dht::partition_range& pr)
{
auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(),
permit,
std::move(mutations),
pr,
streamed_mutation::forwarding::yes);
result = make_nonforwardable(std::move(result), single_partition);
return assert_that(std::move(result)).exact();
};

// no input -> no output
{
auto rd = make_reader({}, false, query::full_partition_range);
rd.produces_end_of_stream();
}
}

SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
Expand Down

0 comments on commit 42fed06

Please sign in to comment.