Skip to content

Commit

Permalink
repair: release resources of shard_repair_task_impl
Browse files Browse the repository at this point in the history
Before integration with task manager the state of one shard repair
was kept in repair_info. repair_info object was destroyed immediately
after shard repair was finished.

In an integration process repair_info's fields were moved to
shard_repair_task_impl as the two served the similar purposes.
Though, shard_repair_task_impl isn't immediately destoyed, but is
kept in task manager for task_ttl seconds after it's complete.
Thus, some of repair_info's fields have their lifetime prolonged,
which makes the repair state change delayed.

Release shard_repair_task_impl resources immediately after shard
repair is finished.

Fixes: scylladb#15505.
  • Loading branch information
Deexie committed Sep 21, 2023
1 parent a56a4b6 commit a7f771c
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 3 deletions.
2 changes: 0 additions & 2 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,6 @@ compaction::compaction_state::~compaction_state() {
void sstables_task_executor::release_resources() noexcept {
_cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending);
_sstables = {};
compaction_task_executor::release_resources();
}

future<compaction_manager::compaction_stats_opt> compaction_task_executor::run_compaction() noexcept {
Expand Down Expand Up @@ -1675,7 +1674,6 @@ class cleanup_sstables_compaction_task_executor : public compaction_task_executo
_pending_cleanup_jobs = {};
_compacting.release_all();
_owned_ranges_ptr = nullptr;
compaction_task_executor::release_resources();
}

virtual tasks::is_internal is_internal() const noexcept override {
Expand Down
1 change: 0 additions & 1 deletion compaction/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ public:
compaction_task_executor(compaction_task_executor&&) = delete;
compaction_task_executor(const compaction_task_executor&) = delete;

virtual void release_resources() noexcept {}
virtual ~compaction_task_executor() = default;

// called when a compaction replaces the exhausted sstables with the new set
Expand Down
17 changes: 17 additions & 0 deletions repair/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,17 @@ struct repair_options {
}
};

void repair::shard_repair_task_impl::release_resources() noexcept {
erm = {};
cfs = {};
data_centers = {};
hosts = {};
ignore_nodes = {};
neighbors = {};
dropped_tables = {};
nodes_down = {};
}

future<> repair::shard_repair_task_impl::do_repair_ranges() {
// Repair tables in the keyspace one after another
assert(table_names().size() == table_ids.size());
Expand Down Expand Up @@ -1272,6 +1283,9 @@ future<> repair::user_requested_repair_task_impl::run() {
auto task = co_await local_repair._repair_module->make_and_start_task<repair::shard_repair_task_impl>(parent_data, tasks::task_id::create_random_id(), keyspace,
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, ranges_parallelism);
auto release_task_resources = defer([&] () noexcept {
task->release_resources();
});
co_await task->done();
});
repair_results.push_back(std::move(f));
Expand Down Expand Up @@ -1390,6 +1404,9 @@ future<> repair::data_sync_repair_task_impl::run() {
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, ranges_parallelism);
task_impl_ptr->neighbors = std::move(neighbors);
auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data);
auto release_task_resources = defer([&] () noexcept {
task->release_resources();
});
task->start();
co_await task->done();
});
Expand Down
2 changes: 2 additions & 0 deletions repair/task_manager_module.hh
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public:
future<> repair_range(const dht::token_range& range, table_info table);

size_t ranges_size() const noexcept;

virtual void release_resources() noexcept override;
protected:
future<> do_repair_ranges();
virtual future<tasks::task_manager::task::progress> get_progress() const override;
Expand Down
4 changes: 4 additions & 0 deletions tasks/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ bool task_manager::task::is_complete() const noexcept {
return _impl->is_complete();
}

void task_manager::task::release_resources() noexcept {
return _impl->release_resources();
}

task_manager::module::module(task_manager& tm, std::string name) noexcept : _tm(tm), _name(std::move(name)) {
_abort_subscription = _tm.abort_source().subscribe([this] () noexcept {
abort_source().request_abort();
Expand Down
2 changes: 2 additions & 0 deletions tasks/task_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public:
virtual tasks::is_internal is_internal() const noexcept;
virtual future<> abort() noexcept;
bool is_complete() const noexcept;
virtual void release_resources() noexcept {}
protected:
virtual future<> run() = 0;
void run_to_completion();
Expand Down Expand Up @@ -167,6 +168,7 @@ public:
void unregister_task() noexcept;
const foreign_task_vector& get_children() const noexcept;
bool is_complete() const noexcept;
void release_resources() noexcept;

friend class test_task;
friend class ::repair::task_manager_module;
Expand Down

0 comments on commit a7f771c

Please sign in to comment.