Skip to content

Commit

Permalink
compaction: do not swallow compaction_stopped_exception for reshape
Browse files Browse the repository at this point in the history
Loop in shard_reshaping_compaction_task_impl::run relies on whether
sstables::compaction_stopped_exception is thrown from run_custom_job.
The exception is swallowed for each type of compaction
in compaction_manager::perform_task.

Rethrow an exception in perfrom task for reshape compaction.

Fixes: scylladb#15058.

Closes scylladb#15067
  • Loading branch information
Deexie authored and denesb committed Aug 21, 2023
1 parent e13a2b6 commit e0ce711
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
23 changes: 13 additions & 10 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ compaction_task_executor::compaction_task_executor(compaction_manager& mgr, tabl
, _description(std::move(desc))
{}

future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task, throw_if_stopping do_throw_if_stopping) {
cmlog.debug("{}: started", *task);

try {
Expand All @@ -328,6 +328,9 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
co_return res;
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("{}: stopped, reason: {}", *task, e.what());
if (do_throw_if_stopping) {
throw;
}
} catch (sstables::compaction_aborted_exception& e) {
cmlog.error("{}: aborted, reason: {}", *task, e.what());
_stats.errors++;
Expand Down Expand Up @@ -542,7 +545,7 @@ requires std::is_base_of_v<compaction_task_executor, TaskExecutor> &&
requires (compaction_manager& cm, Args&&... args) {
{TaskExecutor(cm, std::forward<Args>(args)...)} -> std::same_as<TaskExecutor>;
}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(std::optional<tasks::task_info> parent_info, Args&&... args) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, std::optional<tasks::task_info> parent_info, Args&&... args) {
auto task_executor = seastar::make_shared<TaskExecutor>(*this, std::forward<Args>(args)...);
gate::holder gate_holder = task_executor->_compaction_state.gate.hold();
_tasks.push_back(task_executor);
Expand All @@ -558,15 +561,15 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_com
// We do not need to wait for the task to be done as compaction_task_executor side will take care of that.
}

co_return co_await perform_task(std::move(task_executor));
co_return co_await perform_task(std::move(task_executor), do_throw_if_stopping);
}

future<> compaction_manager::perform_major_compaction(table_state& t, std::optional<tasks::task_info> info) {
if (_state != state::enabled) {
co_return;
}

co_await perform_compaction<major_compaction_task_executor>(info, &t, info.value_or(tasks::task_info{}).id).discard_result();
co_await perform_compaction<major_compaction_task_executor>(throw_if_stopping::no, info, &t, info.value_or(tasks::task_info{}).id).discard_result();
}

namespace compaction {
Expand Down Expand Up @@ -617,12 +620,12 @@ class custom_compaction_task_executor : public compaction_task_executor, public

}

future<> compaction_manager::run_custom_job(table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job, std::optional<tasks::task_info> info) {
future<> compaction_manager::run_custom_job(table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job, std::optional<tasks::task_info> info, throw_if_stopping do_throw_if_stopping) {
if (_state != state::enabled) {
return make_ready_future<>();
}

return perform_compaction<custom_compaction_task_executor>(info, &t, info.value_or(tasks::task_info{}).id, type, desc, std::move(job)).discard_result();
return perform_compaction<custom_compaction_task_executor>(do_throw_if_stopping, info, &t, info.value_or(tasks::task_info{}).id, type, desc, std::move(job)).discard_result();
}

future<> compaction_manager::update_static_shares(float static_shares) {
Expand Down Expand Up @@ -1204,7 +1207,7 @@ void compaction_manager::submit(table_state& t) {

// OK to drop future.
// waited via compaction_task_executor::compaction_done()
(void)perform_compaction<regular_compaction_task_executor>(tasks::task_info{}, t).then_wrapped([] (auto f) { f.ignore_ready_future(); });
(void)perform_compaction<regular_compaction_task_executor>(throw_if_stopping::no, tasks::task_info{}, t).then_wrapped([] (auto f) { f.ignore_ready_future(); });
}

bool compaction_manager::can_perform_regular_compaction(table_state& t) {
Expand Down Expand Up @@ -1413,7 +1416,7 @@ future<bool> compaction_manager::perform_offstrategy(table_state& t, std::option
}

bool performed;
co_await perform_compaction<offstrategy_compaction_task_executor>(info, &t, info.value_or(tasks::task_info{}).id, performed);
co_await perform_compaction<offstrategy_compaction_task_executor>(throw_if_stopping::no, info, &t, info.value_or(tasks::task_info{}).id, performed);
co_return performed;
}

Expand Down Expand Up @@ -1527,7 +1530,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
return a->data_size() > b->data_size();
});
});
co_return co_await perform_compaction<TaskType>(info, &t, info.value_or(tasks::task_info{}).id, std::move(options), std::move(owned_ranges_ptr), std::move(sstables), std::move(compacting), std::forward<Args>(args)...);
co_return co_await perform_compaction<TaskType>(throw_if_stopping::no, info, &t, info.value_or(tasks::task_info{}).id, std::move(options), std::move(owned_ranges_ptr), std::move(sstables), std::move(compacting), std::forward<Args>(args)...);
}

future<compaction_manager::compaction_stats_opt> compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, std::optional<tasks::task_info> info, can_purge_tombstones can_purge) {
Expand Down Expand Up @@ -1604,7 +1607,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = get_all_sstables(t);
return perform_compaction<validate_sstables_compaction_task_executor>(info, &t, info.value_or(tasks::task_info{}).id, std::move(all_sstables));
return perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.value_or(tasks::task_info{}).id, std::move(all_sstables));
}

namespace compaction {
Expand Down
14 changes: 7 additions & 7 deletions compaction/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public:
};

namespace compaction {
using throw_if_stopping = bool_class<struct throw_if_stopping_tag>;

class compaction_task_executor;
class sstables_task_executor;
class major_compaction_task_executor;
Expand Down Expand Up @@ -160,7 +162,7 @@ private:
tombstone_gc_state _tombstone_gc_state;
private:
// Requires task->_compaction_state.gate to be held and task to be registered in _tasks.
future<compaction_stats_opt> perform_task(shared_ptr<compaction::compaction_task_executor> task);
future<compaction_stats_opt> perform_task(shared_ptr<compaction::compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);

// parent_info set to std::nullopt means that task manager should not register this task executor.
// To create a task manager task with no parent, parent_info argument should contain empty task_info.
Expand All @@ -170,7 +172,7 @@ private:
requires (compaction_manager& cm, Args&&... args) {
{TaskExecutor(cm, std::forward<Args>(args)...)} -> std::same_as<TaskExecutor>;
}
future<compaction_manager::compaction_stats_opt> perform_compaction(std::optional<tasks::task_info> parent_info, Args&&... args);
future<compaction_manager::compaction_stats_opt> perform_compaction(throw_if_stopping do_throw_if_stopping, std::optional<tasks::task_info> parent_info, Args&&... args);

future<> stop_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>> tasks, sstring reason);
future<> update_throughput(uint32_t value_mbs);
Expand Down Expand Up @@ -337,7 +339,7 @@ public:
// parameter type is the compaction type the operation can most closely be
// associated with, use compaction_type::Compaction, if none apply.
// parameter job is a function that will carry the operation
future<> run_custom_job(compaction::table_state& s, sstables::compaction_type type, const char *desc, noncopyable_function<future<>(sstables::compaction_data&)> job, std::optional<tasks::task_info> info);
future<> run_custom_job(compaction::table_state& s, sstables::compaction_type type, const char *desc, noncopyable_function<future<>(sstables::compaction_data&)> job, std::optional<tasks::task_info> info, throw_if_stopping do_throw_if_stopping);

class compaction_reenabler {
compaction_manager& _cm;
Expand Down Expand Up @@ -497,8 +499,6 @@ public:
protected:
virtual future<compaction_manager::compaction_stats_opt> do_run() = 0;

using throw_if_stopping = bool_class<struct throw_if_stopping_tag>;

state switch_state(state new_state);

future<semaphore_units<named_semaphore_exception_factory>> acquire_semaphore(named_semaphore& sem, size_t units = 1);
Expand Down Expand Up @@ -575,8 +575,8 @@ public:
requires (compaction_manager& cm, Args&&... args) {
{TaskExecutor(cm, std::forward<Args>(args)...)} -> std::same_as<TaskExecutor>;
}
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(std::optional<tasks::task_info> parent_info, Args&&... args);
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task);
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, std::optional<tasks::task_info> parent_info, Args&&... args);
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);
friend fmt::formatter<compaction_task_executor>;
};

Expand Down
4 changes: 2 additions & 2 deletions compaction/task_manager_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
// input sstables are moved, to guarantee their resources are released once we're done
// resharding them.
co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result();
}, parent_info);
}, parent_info, throw_if_stopping::no);
});
}

Expand Down Expand Up @@ -500,7 +500,7 @@ future<> shard_reshaping_compaction_task_impl::run() {
sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state());
co_await dir.remove_unshared_sstables(std::move(sstlist));
co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no);
}, info);
}, info, throw_if_stopping::yes);
} catch (...) {
ex = std::current_exception();
}
Expand Down

0 comments on commit e0ce711

Please sign in to comment.