Skip to content

Commit

Permalink
task_manager: module: make_task: enter gate when the task is created
Browse files Browse the repository at this point in the history
Passing the gate_closed_exception to the task promise in start()
ends up with abandoned exception since no-one is waiting
for it.

Instead, enter the gate when the task is made
so it will fail make_task if the gate is already closed.

Fixes scylladb#15211

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Aug 31, 2023
1 parent 6012d24 commit 2e8190d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
12 changes: 6 additions & 6 deletions tasks/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <seastar/core/on_internal_error.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>

#include "task_manager.hh"
#include "test_module.hh"
Expand Down Expand Up @@ -145,7 +146,7 @@ void task_manager::task::impl::finish_failed(std::exception_ptr ex) {
finish_failed(ex, fmt::format("{}", ex));
}

task_manager::task::task(task_impl_ptr&& impl) noexcept : _impl(std::move(impl)) {
task_manager::task::task(task_impl_ptr&& impl, gate::holder gh) noexcept : _impl(std::move(impl)), _gate_holder(std::move(gh)) {
register_task();
}

Expand Down Expand Up @@ -186,14 +187,13 @@ void task_manager::task::start() {
try {
// Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground.
// After the ttl expires, the task id will be used to unregister the task if that didn't happen in any other way.
(void)with_gate(_impl->_module->async_gate(), [f = done(), module = _impl->_module, id = id()] () mutable {
return std::move(f).finally([module] {
auto module = _impl->_module;
(void)done().finally([module] {
return sleep_abortable(module->get_task_manager().get_task_ttl(), module->abort_source());
}).then_wrapped([module, id] (auto f) {
}).then_wrapped([module, id = id()] (auto f) {
f.ignore_ready_future();
module->unregister_task(id);
});
});
_impl->_as.check();
_impl->_status.state = task_manager::task_state::running;
_impl->run_to_completion();
Expand Down Expand Up @@ -307,7 +307,7 @@ future<> task_manager::module::stop() noexcept {
}

future<task_manager::task_ptr> task_manager::module::make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d) {
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr), async_gate().hold());
bool abort = false;
if (parent_d) {
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable {
Expand Down
4 changes: 3 additions & 1 deletion tasks/task_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ public:
using task_impl_ptr = shared_ptr<impl>;
protected:
task_impl_ptr _impl;
private:
gate::holder _gate_holder;
public:
task(task_impl_ptr&& impl) noexcept;
task(task_impl_ptr&& impl, gate::holder) noexcept;

task_id id();
std::string type() const;
Expand Down

0 comments on commit 2e8190d

Please sign in to comment.