Skip to content

Commit

Permalink
sstable_set: add clone_excluding
Browse files Browse the repository at this point in the history
Used by table::make_reader_v2_excluding_sstables to
create a sstable_set based on the current one
that includes all non-excluded sstables.

It checks if the majority of the sstables is included
or excluded and picks one of 2 strategies:
either cloning the existing set and erasing the
excluded sstables out of it, if they are the minority,
or making a new set and inserting to it all the non-excluded
sstables if the included sstables are the minority.

We do that especially for partitioned_sstable_set
since inserting to or erasing from its _leveled_sstables interval_map
is expensive and we want to avoid that as much as possible
to prevent reactor stalls.

Refs scylladb#14244

The reason this change doesn't fix the issue entirely is
that it will deal with a typical case where the number of excluded sstables
(that are staging and processed by the view update generator)
is relatively small.

But when we have many of them, say about about half of the
total number of sstables, calling partitioned_sstable_set::insert or erase
enough times without preemption might still stall.

Refs scylladb#14089

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Jun 14, 2023
1 parent 8a54e47 commit ab3ac68
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 10 deletions.
10 changes: 1 addition & 9 deletions replica/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2558,15 +2558,7 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
readers.reserve(memtable_count + 1);
});

auto excluded_ssts = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(excluded);
auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema));
_sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable {
if (excluded_ssts.contains(sst)) {
return;
}
effective_sstables->insert(sst);
});

auto effective_sstables = make_lw_shared(_sstables->clone_excluding(excluded));
readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
}
Expand Down
35 changes: 35 additions & 0 deletions sstables/sstable_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,29 @@ sstable_set::size() const noexcept {
return _impl->size();
}

sstable_set sstable_set::clone_excluding(const std::vector<shared_sstable>& excluded_sstables) {
if (excluded_sstables.size() < size() / 2) {
// Typically, the number of excluded sstables is expected to be much less than the
// total number of sstables.
// Clone the whole set and erase the excluded sstables from the clone
auto cloned = sstable_set(*this);
for (const auto& sst : excluded_sstables) {
cloned.erase(sst);
}
return cloned;
} else {
// Make an empty set and insert to it all sstables that aren't excluded.
auto excluded = std::unordered_set<shared_sstable>(excluded_sstables.begin(), excluded_sstables.end());
auto cloned_impl = _impl->clone_empty();
for_each_sstable([&] (const shared_sstable& sst) {
if (!excluded.contains(sst)) {
cloned_impl->insert(sst);
}
});
return sstable_set(std::move(cloned_impl), _schema);
}
}

sstable_set::~sstable_set() = default;

sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s)
Expand Down Expand Up @@ -305,6 +328,10 @@ std::unique_ptr<sstable_set_impl> partitioned_sstable_set::clone() const {
return std::make_unique<partitioned_sstable_set>(_schema, _unleveled_sstables, _leveled_sstables, _all, _all_runs, _use_level_metadata);
}

std::unique_ptr<sstable_set_impl> partitioned_sstable_set::clone_empty() const {
return std::make_unique<partitioned_sstable_set>(_schema, _use_level_metadata);
}

std::vector<shared_sstable> partitioned_sstable_set::select(const dht::partition_range& range) const {
auto ipair = query(range);
auto b = std::move(ipair.first);
Expand Down Expand Up @@ -469,6 +496,10 @@ std::unique_ptr<sstable_set_impl> time_series_sstable_set::clone() const {
return std::make_unique<time_series_sstable_set>(*this);
}

std::unique_ptr<sstable_set_impl> time_series_sstable_set::clone_empty() const {
return std::make_unique<time_series_sstable_set>(_schema, _enable_optimized_twcs_queries);
}

std::vector<shared_sstable> time_series_sstable_set::select(const dht::partition_range& range) const {
return boost::copy_range<std::vector<shared_sstable>>(*_sstables | boost::adaptors::map_values);
}
Expand Down Expand Up @@ -1013,6 +1044,10 @@ std::unique_ptr<sstable_set_impl> compound_sstable_set::clone() const {
return std::make_unique<compound_sstable_set>(_schema, std::move(cloned_sets));
}

std::unique_ptr<sstable_set_impl> compound_sstable_set::clone_empty() const {
return std::make_unique<compound_sstable_set>(_schema);
}

std::vector<shared_sstable> compound_sstable_set::select(const dht::partition_range& range) const {
std::vector<shared_sstable> ret;
for (auto& set : _sets) {
Expand Down
2 changes: 2 additions & 0 deletions sstables/sstable_set.hh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class sstable_set_impl {
public:
virtual ~sstable_set_impl() {}
virtual std::unique_ptr<sstable_set_impl> clone() const = 0;
virtual std::unique_ptr<sstable_set_impl> clone_empty() const = 0;
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const = 0;
virtual std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const;
virtual lw_shared_ptr<const sstable_list> all() const = 0;
Expand Down Expand Up @@ -121,6 +122,7 @@ public:
void insert(shared_sstable sst);
void erase(shared_sstable sst);
size_t size() const noexcept;
sstable_set clone_excluding(const std::vector<shared_sstable>& excluded_sstables);

// Used to incrementally select sstables from sstable set using ring-position.
// sstable set must be alive during the lifetime of the selector.
Expand Down
5 changes: 4 additions & 1 deletion sstables/sstable_set_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public:
bool use_level_metadata);

virtual std::unique_ptr<sstable_set_impl> clone() const override;
virtual std::unique_ptr<sstable_set_impl> clone_empty() const override;
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const override;
virtual std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const override;
virtual lw_shared_ptr<const sstable_list> all() const override;
Expand Down Expand Up @@ -89,6 +90,7 @@ public:
time_series_sstable_set(const time_series_sstable_set& s);

virtual std::unique_ptr<sstable_set_impl> clone() const override;
virtual std::unique_ptr<sstable_set_impl> clone_empty() const override;
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
virtual lw_shared_ptr<const sstable_list> all() const override;
virtual stop_iteration for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const override;
Expand Down Expand Up @@ -125,9 +127,10 @@ class compound_sstable_set : public sstable_set_impl {
schema_ptr _schema;
std::vector<lw_shared_ptr<sstable_set>> _sets;
public:
compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets);
compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets = {});

virtual std::unique_ptr<sstable_set_impl> clone() const override;
virtual std::unique_ptr<sstable_set_impl> clone_empty() const override;
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
virtual std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const override;
virtual lw_shared_ptr<const sstable_list> all() const override;
Expand Down

0 comments on commit ab3ac68

Please sign in to comment.