Skip to content

Commit

Permalink
compaction_manager: on_compaction_completion: erase sstables from sst…
Browse files Browse the repository at this point in the history
…ables_requiring_cleanup

Move the management of compaction_state::sstables_requiring_cleanup
from compacting_sstable_registration to on_compaction_completion
so it will be performed for all compaction types, as those sstables
were compacted and about to be deleted.

Adjust cleanup_incremental_compaction_test to check
that cleanup doesn't attempt to cleanup already-deleted
sstables that were left over by offstrategy compaction
in sstables_requiring_cleanup.

Fixes scylladb#14304

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Jul 27, 2023
1 parent de93ee8 commit 43733bf
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
17 changes: 10 additions & 7 deletions compaction/compaction_manager.cc
Expand Up @@ -81,13 +81,6 @@ class compacting_sstable_registration {
// Explicitly release compacting sstables
void release_compacting(const std::vector<sstables::shared_sstable>& sstables) {
_cm.deregister_compacting_sstables(sstables);
for (const auto& sst : sstables) {
_compacting.erase(sst);
_cs.sstables_requiring_cleanup.erase(sst);
}
if (_cs.sstables_requiring_cleanup.empty()) {
_cs.owned_ranges_ptr = nullptr;
}
}

class update_me : public compaction_task_executor::on_replacement {
Expand Down Expand Up @@ -348,6 +341,16 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
}

future<> compaction_manager::on_compaction_completion(table_state& t, sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) {
auto& cs = get_compaction_state(&t);
auto new_sstables = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc.new_sstables);
for (const auto& sst : desc.old_sstables) {
if (!new_sstables.contains(sst)) {
cs.sstables_requiring_cleanup.erase(sst);
}
}
if (cs.sstables_requiring_cleanup.empty()) {
cs.owned_ranges_ptr = nullptr;
}
return t.on_compaction_completion(std::move(desc), offstrategy);
}

Expand Down
38 changes: 21 additions & 17 deletions test/boost/sstable_compaction_test.cc
Expand Up @@ -4961,6 +4961,7 @@ SEASTAR_TEST_CASE(compaction_optimization_to_avoid_bloom_filter_checks) {

SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
return test_env::do_with_async([] (test_env& env) {
for (auto offstrategy : {sstables::offstrategy::no, sstables::offstrategy::yes}) {
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
Expand All @@ -4982,7 +4983,7 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
std::vector<utils::observer<sstable&>> observers;
std::vector<shared_sstable> ssts;
size_t sstables_closed = 0;
size_t sstables_closed_during_cleanup = 0;
size_t sstables_missing_on_delete = 0;
static constexpr size_t sstables_nr = 10;

dht::token_range_vector owned_token_ranges;
Expand All @@ -5005,38 +5006,40 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
std::move(mut2)
});
sstables::test(sst).set_run_identifier(run_identifier); // in order to produce multi-fragment run.
sst->set_sstable_level(1);
// Set level to 0 to trigger offstrategy compaction
sst->set_sstable_level(offstrategy ? 0 : 1);

// every sstable will be eligible for cleanup, by having both an owned and unowned token.
owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token()));

gens.insert(sst->generation());
ssts.push_back(std::move(sst));

if (offstrategy) {
// Force a new run_id to trigger offstrategy compaction
run_identifier = run_id::create_random_id();
}
}

size_t last_input_sstable_count = sstables_nr;
{
auto t = env.make_table_for_tests(s);
auto& cm = t->get_compaction_manager();
auto stop = deferred_stop(t);
t->disable_auto_compaction().get();
const dht::token_range_vector empty_owned_ranges;
for (auto&& sst : ssts) {
testlog.info("run id {}", sst->run_identifier());
column_family_test(t).add_sstable(sst).get();
testlog.info("run id {}, offstrategy={}", sst->run_identifier(), offstrategy);
column_family_test(t).add_sstable(sst, offstrategy).get();
column_family_test::update_sstables_known_generation(*t, sst->generation());
observers.push_back(sst->add_on_closed_handler([&] (sstable& sst) mutable {
auto sstables = t->get_sstables();
auto input_sstable_count = std::count_if(sstables->begin(), sstables->end(), [&] (const shared_sstable& sst) {
return gens.count(sst->generation());
});

testlog.info("Closing sstable of generation {}, table set size: {}", sst.generation(), input_sstable_count);
testlog.info("Closing sstable of generation {}, table set size: {}", sst.generation(), sstables->size());
sstables_closed++;
if (std::cmp_less(input_sstable_count, last_input_sstable_count)) {
sstables_closed_during_cleanup++;
last_input_sstable_count = input_sstable_count;
}
}));
observers.push_back(sst->add_on_delete_handler([&] (sstable& sst) mutable {
auto missing = !file_exists(sst.get_filename()).get();
testlog.info("Deleting sstable of generation {}: missing={}", sst.generation(), missing);
sstables_missing_on_delete += missing;
}));
}
ssts = {}; // releases references
Expand All @@ -5050,10 +5053,11 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
yield().get();
}

testlog.info("Closed sstables {}, Closed during cleanup {}", sstables_closed, sstables_closed_during_cleanup);
testlog.info("Closed sstables {}, missing on delete {}", sstables_closed, sstables_missing_on_delete);

BOOST_REQUIRE(sstables_closed == sstables_nr);
BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2);
BOOST_REQUIRE_EQUAL(sstables_closed, sstables_nr);
BOOST_REQUIRE_EQUAL(sstables_missing_on_delete, 0);
}
});
}

Expand Down
10 changes: 7 additions & 3 deletions test/boost/sstable_test.hh
Expand Up @@ -32,14 +32,18 @@ class column_family_test {
public:
column_family_test(lw_shared_ptr<replica::column_family> cf) : _cf(cf) {}

future<> add_sstable(sstables::shared_sstable sstable) {
future<> add_sstable(sstables::shared_sstable sstable, sstables::offstrategy offstrategy = sstables::offstrategy::no) {
if (offstrategy) {
// Otherwise, on_compaction_completion always adds the new_sstabes to the main set
return _cf->add_sstable_and_update_cache(sstable, offstrategy);
}
auto new_sstables = { sstable };
return _cf->as_table_state().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no);
}

future<> rebuild_sstable_list(compaction::table_state& table_s, const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
return table_s.on_compaction_completion(sstables::compaction_completion_desc{ .old_sstables = sstables_to_remove, .new_sstables = new_sstables }, sstables::offstrategy::no);
const std::vector<sstables::shared_sstable>& sstables_to_remove, sstables::offstrategy offstrategy = sstables::offstrategy::no) {
return table_s.on_compaction_completion(sstables::compaction_completion_desc{ .old_sstables = sstables_to_remove, .new_sstables = new_sstables }, offstrategy);
}

static void update_sstables_known_generation(replica::column_family& cf, sstables::generation_type generation) {
Expand Down

0 comments on commit 43733bf

Please sign in to comment.