Skip to content

Commit

Permalink
Merge 'Update sstable_requiring_cleanup on compaction completion' fro…
Browse files Browse the repository at this point in the history
…m Benny Halevy

Currently `sstable_requiring_cleanup` is updated using `compacting_sstable_registration`, but that mechanism is not used by offstrategy compaction, leading to scylladb#14304.

This series introduces `compaction_manager::on_compaction_completion` that intercepts the call
to the table::on_compaction_completion. This allows us to update `sstable_requiring_cleanup` right before the compacted sstables are deleted, making sure they are no leaked to `sstable_requiring_cleanup`, which would hold a reference to them until cleanup attempts to clean them up.

`cleanup_incremental_compaction_test` was adjusted to observe the sstables `on_delete` (by adding a new observer event) to detect the case where cleanup attempts to delete the leaked sstables and fails since they were already deleted from the file system by offstrategy compaction. The test fails with the fix and passes with it.

\Fixes scylladb#14304

\Closes scylladb#14858

* github.com:scylladb/scylladb:
  compaction_manager: on_compaction_completion: erase sstables from sstables_requiring_cleanup
  compaction/leveled_compaction_strategy: ideal_level_for_input: special case max_sstable_size==0
  sstable: add on_delete observer
  compaction_manager: add on_compaction_completion
  sstable_compaction_test: cleanup_incremental_compaction_test: verify sstables_requiring_cleanup is empty

(cherry picked from commit 108e510)
  • Loading branch information
bhalevy committed Nov 13, 2023
1 parent 869fa37 commit 86a64ee
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 5 deletions.
23 changes: 21 additions & 2 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,20 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
co_return std::nullopt;
}

future<> compaction_manager::on_compaction_completion(table_state& t, sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) {
auto& cs = get_compaction_state(&t);
auto new_sstables = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc.new_sstables);
for (const auto& sst : desc.old_sstables) {
if (!new_sstables.contains(sst)) {
cs.sstables_requiring_cleanup.erase(sst);
}
}
if (cs.sstables_requiring_cleanup.empty()) {
cs.owned_ranges_ptr = nullptr;
}
return t.on_compaction_completion(std::move(desc), offstrategy);
}

future<sstables::compaction_result> compaction_manager::task::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, on_replacement& on_replace, can_purge_tombstones can_purge) {
if (!descriptor.sstables.size()) {
// if there is nothing to compact, just return.
Expand Down Expand Up @@ -393,7 +407,7 @@ future<sstables::compaction_result> compaction_manager::task::compact_sstables(s
// - are not being compacted.
on_replace.on_addition(desc.new_sstables);
auto old_sstables = desc.old_sstables;
t.on_compaction_completion(std::move(desc), offstrategy).get();
_cm.on_compaction_completion(t, std::move(desc), offstrategy).get();
on_replace.on_removal(old_sstables);
};

Expand Down Expand Up @@ -1214,7 +1228,7 @@ class compaction_manager::offstrategy_compaction_task : public compaction_manage
.old_sstables = sstables, // removes from maintenance set.
.new_sstables = sstables, // adds into main set.
};
co_await t.on_compaction_completion(std::move(completion_desc), sstables::offstrategy::yes);
co_await _cm.on_compaction_completion(t, std::move(completion_desc), sstables::offstrategy::yes);
}

if (err) {
Expand Down Expand Up @@ -1582,6 +1596,11 @@ bool compaction_manager::requires_cleanup(table_state& t, const sstables::shared
return cs.sstables_requiring_cleanup.contains(sst);
}

const std::unordered_set<sstables::shared_sstable>& compaction_manager::sstables_requiring_cleanup(table_state& t) const {
const auto& cs = get_compaction_state(&t);
return cs.sstables_requiring_cleanup;
}

future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t) {
constexpr auto sleep_duration = std::chrono::seconds(10);
constexpr auto max_idle_duration = std::chrono::seconds(300);
Expand Down
3 changes: 3 additions & 0 deletions compaction/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ private:

// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);

future<> on_compaction_completion(table_state& t, sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy);
public:
// Submit a table to be upgraded and wait for its termination.
future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version);
Expand Down Expand Up @@ -547,6 +549,7 @@ public:

// checks if the sstable is in the respective compaction_state.sstables_requiring_cleanup set.
bool requires_cleanup(table_state& t, const sstables::shared_sstable& sst) const;
const std::unordered_set<sstables::shared_sstable>& sstables_requiring_cleanup(table_state& t) const;

friend class compacting_sstable_registration;
friend class compaction_weight_registration;
Expand Down
2 changes: 2 additions & 0 deletions sstables/sstables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3036,6 +3036,8 @@ future<> sstable::filesystem_storage::wipe(const sstable& sst) noexcept {

future<>
sstable::unlink() noexcept {
_on_delete(*this);

auto remove_fut = _storage.wipe(*this);

try {
Expand Down
5 changes: 5 additions & 0 deletions sstables/sstables.hh
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ public:
return _on_closed.observe(on_closed_handler);
}

utils::observer<sstable&> add_on_delete_handler(std::function<void (sstable&)> on_delete_handler) noexcept {
return _on_delete.observe(on_delete_handler);
}

template<typename Func, typename... Args>
requires std::is_nothrow_move_constructible_v<Func>
auto sstable_write_io_check(Func&& func, Args&&... args) const noexcept {
Expand Down Expand Up @@ -544,6 +548,7 @@ private:
std::optional<dht::decorated_key> _last;
run_id _run_identifier;
utils::observable<sstable&> _on_closed;
utils::observable<sstable&> _on_delete;

lw_shared_ptr<file_input_stream_history> _single_partition_history = make_lw_shared<file_input_stream_history>();
lw_shared_ptr<file_input_stream_history> _partition_range_history = make_lw_shared<file_input_stream_history>();
Expand Down
98 changes: 98 additions & 0 deletions test/boost/sstable_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5133,6 +5133,7 @@ static future<> run_incremental_compaction_test(sstables::offstrategy offstrateg
ssts = {}; // releases references
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
run_compaction(t, std::move(owned_ranges_ptr)).get();
BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty());
testlog.info("Cleanup has finished");
}

Expand Down Expand Up @@ -5160,6 +5161,103 @@ SEASTAR_TEST_CASE(offstrategy_incremental_compaction_test) {
});
}

SEASTAR_TEST_CASE(cleanup_during_offstrategy_incremental_compaction_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
builder.set_gc_grace_seconds(10000);
builder.set_compaction_strategy(sstables::compaction_strategy_type::leveled);
std::map<sstring, sstring> opts = {
{ "sstable_size_in_mb", "0" }, // makes sure that every mutation produces one fragment, to trigger incremental compaction
};
builder.set_compaction_strategy_options(std::move(opts));
auto s = builder.build();
auto tmp = tmpdir();
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)] () {
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big);
};

auto make_insert = [&] (partition_key key) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), api::new_timestamp());
return m;
};

std::vector<utils::observer<sstable&>> observers;
std::vector<shared_sstable> ssts;
size_t sstables_closed = 0;
size_t sstables_missing_on_delete = 0;
static constexpr size_t sstables_nr = 10;

dht::token_range_vector owned_token_ranges;

std::set<mutation, mutation_decorated_key_less_comparator> merged;
for (unsigned i = 0; i < sstables_nr * 2; i++) {
merged.insert(make_insert(partition_key::from_exploded(*s, {to_bytes(to_sstring(i))})));
}

std::unordered_set<sstables::generation_type> gens; // input sstable generations
auto merged_it = merged.begin();
for (unsigned i = 0; i < sstables_nr; i++) {
auto mut1 = std::move(*merged_it);
merged_it++;
auto mut2 = std::move(*merged_it);
merged_it++;
auto sst = make_sstable_containing(sst_gen, {
std::move(mut1),
std::move(mut2)
});
// Force a new run_id to trigger offstrategy compaction
sstables::test(sst).set_run_identifier(run_id::create_random_id());
// Set level to 0 to trigger offstrategy compaction
sst->set_sstable_level(0);

// every sstable will be eligible for cleanup, by having both an owned and unowned token.
owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token()));

gens.insert(sst->generation());
ssts.push_back(std::move(sst));
}

{
table_for_tests t(env.manager(), s, tmp.path().string());
auto& cm = t->get_compaction_manager();
auto stop = deferred_stop(t);
t->disable_auto_compaction().get();
const dht::token_range_vector empty_owned_ranges;
for (auto&& sst : ssts) {
testlog.info("run id {}", sst->run_identifier());
column_family_test(t).add_sstable(sst, sstables::offstrategy::yes).get();
column_family_test::update_sstables_known_generation(*t, generation_value(sst->generation()));
observers.push_back(sst->add_on_closed_handler([&] (sstable& sst) mutable {
auto sstables = t->get_sstables();
testlog.info("Closing sstable of generation {}, table set size: {}", sst.generation(), sstables->size());
sstables_closed++;
}));
observers.push_back(sst->add_on_delete_handler([&] (sstable& sst) mutable {
auto missing = !file_exists(sst.get_filename()).get();
testlog.info("Deleting sstable of generation {}: missing={}", sst.generation(), missing);
sstables_missing_on_delete += missing;
}));
}
ssts = {}; // releases references
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get();
BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty());
testlog.info("Cleanup has finished");
}

while (sstables_closed != sstables_nr) {
yield().get();
}

testlog.info("Closed sstables {}, missing on delete {}", sstables_closed, sstables_missing_on_delete);

BOOST_REQUIRE_EQUAL(sstables_closed, sstables_nr);
BOOST_REQUIRE_EQUAL(sstables_missing_on_delete, 0);
});
}

SEASTAR_TEST_CASE(test_sstables_excluding_staging_correctness) {
return test_env::do_with_async([] (test_env& env) {
Expand Down
10 changes: 7 additions & 3 deletions test/boost/sstable_test.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ class column_family_test {
public:
column_family_test(lw_shared_ptr<replica::column_family> cf) : _cf(cf) {}

future<> add_sstable(sstables::shared_sstable sstable) {
future<> add_sstable(sstables::shared_sstable sstable, sstables::offstrategy offstrategy = sstables::offstrategy::no) {
if (offstrategy) {
// Otherwise, on_compaction_completion always adds the new_sstabes to the main set
return _cf->add_sstable_and_update_cache(sstable, offstrategy);
}
auto new_sstables = { sstable };
return _cf->as_table_state().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no);
}

future<> rebuild_sstable_list(compaction::table_state& table_s, const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
return table_s.on_compaction_completion(sstables::compaction_completion_desc{ .old_sstables = sstables_to_remove, .new_sstables = new_sstables }, sstables::offstrategy::no);
const std::vector<sstables::shared_sstable>& sstables_to_remove, sstables::offstrategy offstrategy = sstables::offstrategy::no) {
return table_s.on_compaction_completion(sstables::compaction_completion_desc{ .old_sstables = sstables_to_remove, .new_sstables = new_sstables }, offstrategy);
}

static void update_sstables_known_generation(replica::column_family& cf, unsigned generation) {
Expand Down

0 comments on commit 86a64ee

Please sign in to comment.