Skip to content

Commit

Permalink
table: clone staging sstables into table dir
Browse files Browse the repository at this point in the history
clone staging sstables so their content may be compacted while
views are built.  When done, the hard-linked copy in the staging
subdirectory will be simply unlinked.

Fixes scylladb#9559

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Jun 13, 2022
1 parent 3d0f099 commit e0a5982
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 14 deletions.
2 changes: 1 addition & 1 deletion db/view/view_update_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ future<> view_update_generator::start() {
auto& [t, sstables] = *it;
try {
inject_failure("view_update_generator_move_staging_sstable");
t->move_sstables_from_staging(sstables).get();
t->remove_sstables_from_staging(sstables).get();
} catch (...) {
// Move from staging will be retried upon restart.
vug_logger.warn("Moving some sstable from staging failed: {}. Ignoring...", std::current_exception());
Expand Down
2 changes: 1 addition & 1 deletion replica/database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public:
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
future<> remove_sstables_from_staging(std::vector<sstables::shared_sstable>);
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
io_error_handler_gen error_handler_gen);
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f);
Expand Down
44 changes: 32 additions & 12 deletions replica/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,13 @@ table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::s
if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) {
on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename()));
}
if (sstable->requires_view_building()) {
on_internal_error(tlogger, format("Attempted to load staging SSTable {} at table", sstable->get_filename()));
}
// allow in-progress reads to continue using old list
auto new_sstables = make_lw_shared<sstables::sstable_set>(*sstables);
new_sstables->insert(sstable);
if (sstable->requires_view_building()) {
_sstables_staging.emplace(sstable->generation(), sstable);
} else if (backlog_tracker) {
if (backlog_tracker) {
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sstable);
}
// update sstable set last in case either updating
Expand Down Expand Up @@ -437,6 +438,28 @@ void table::enable_off_strategy_trigger() {

future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
if (sst->requires_view_building()) {
auto [it, inserted] = _sstables_staging.emplace(sst->generation(), sst);
if (!inserted) {
on_internal_error(tlogger, format("could not add staging sstable: generation {} already exists", sst->generation()));
}

// clone staging sstables so their content may be compacted while
// views are built. When done, the hard-linked copy in the staging
// subsirectory will be simply unlinked.
//
// Note that after restart, we don't know whether we already cloned
// the staging sstable or we might have restarted right after sealing it
// and before cloning here, so we might be resurrecting an sstable
// in the base directory in this rare corner case.
try {
sst = co_await sst->clone_at(dir(), calculate_generation_for_new_table());
} catch (...) {
_sstables_staging.erase(it);
throw;
}
}

auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([this, sst, offstrategy] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
Expand Down Expand Up @@ -2245,21 +2268,18 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
}

future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
future<> table::remove_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
auto units = co_await get_units(_sstable_deletion_sem, 1);
auto dirs_to_sync = std::set<sstring>({dir()});
auto main_sstables = _main_sstables->all();
std::set<sstring> dirs_to_sync;

for (auto sst : sstables) {
dirs_to_sync.emplace(sst->get_dir());
tlogger.debug("Removing sstable {} from staging", sst->get_filename());
try {
co_await sst->move_to_new_dir(dir(), sst->generation(), false);
co_await sst->unlink();
_sstables_staging.erase(sst->generation());
// Maintenance SSTables being moved from staging shouldn't be added to tracker because they're off-strategy
if (main_sstables->contains(sst)) {
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
}
} catch (...) {
tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception());
tlogger.warn("Failed to remove sstable {} from staging: {}", sst->get_filename(), std::current_exception());
throw;
}
}
Expand Down
11 changes: 11 additions & 0 deletions sstables/sstables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2143,6 +2143,17 @@ future<> sstable::move_to_quarantine(bool do_sync_dirs) {
co_await move_to_new_dir(std::move(new_dir), generation(), do_sync_dirs);
}

future<shared_sstable> sstable::clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen) {
if (fs::canonical(fs::path(new_dir)) == fs::canonical(fs::path(_dir))) {
on_internal_error(sstlog, format("Cannot clone sstable {} into same dir", get_filename()));
}
auto gen = opt_gen.value_or(_generation);
co_await create_links(new_dir, gen);
auto cloned_sst = _manager.make_sstable(_schema, new_dir, gen, _version, _format);
co_await cloned_sst->load(co_await get_open_info());
co_return cloned_sst;
}

flat_mutation_reader_v2
sstable::make_reader(
schema_ptr schema,
Expand Down
6 changes: 6 additions & 0 deletions sstables/sstables.hh
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ public:
// will move it into a quarantine_dir subdirectory of its current directory.
future<> move_to_quarantine(bool do_sync_dirs = true);

// Clone the sstable at a new directory.
// hardlink all sstable components in the new dir
// with the same generation and return a new, shared
// sstable object for the clone.
future<shared_sstable> clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen);

generation_type generation() const {
return _generation;
}
Expand Down

0 comments on commit e0a5982

Please sign in to comment.