Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use forward declaration of ThreadPool #48519

Merged
merged 1 commit into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/Access/DiskAccessStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <Interpreters/Access/InterpreterCreateUserQuery.h>
#include <Interpreters/Access/InterpreterShowGrantsQuery.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
Expand All @@ -19,6 +20,7 @@
#include <base/range.h>
#include <filesystem>
#include <fstream>
#include <memory>


namespace DB
Expand Down Expand Up @@ -317,15 +319,15 @@ void DiskAccessStorage::scheduleWriteLists(AccessEntityType type)
return; /// If the lists' writing thread is still waiting we can update `types_of_lists_to_write` easily,
/// without restarting that thread.

if (lists_writing_thread.joinable())
lists_writing_thread.join();
if (lists_writing_thread && lists_writing_thread->joinable())
lists_writing_thread->join();

/// Create the 'need_rebuild_lists.mark' file.
/// This file will be used later to find out if writing lists is successful or not.
std::ofstream out{getNeedRebuildListsMarkFilePath(directory_path)};
out.close();

lists_writing_thread = ThreadFromGlobalPool{&DiskAccessStorage::listsWritingThreadFunc, this};
lists_writing_thread = std::make_unique<ThreadFromGlobalPool>(&DiskAccessStorage::listsWritingThreadFunc, this);
lists_writing_thread_is_waiting = true;
}

Expand All @@ -349,10 +351,10 @@ void DiskAccessStorage::listsWritingThreadFunc()

void DiskAccessStorage::stopListsWritingThread()
{
if (lists_writing_thread.joinable())
if (lists_writing_thread && lists_writing_thread->joinable())
{
lists_writing_thread_should_exit.notify_one();
lists_writing_thread.join();
lists_writing_thread->join();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Access/DiskAccessStorage.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <Access/MemoryAccessStorage.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <boost/container/flat_set.hpp>


Expand Down Expand Up @@ -81,7 +81,7 @@ class DiskAccessStorage : public IAccessStorage
bool failed_to_write_lists TSA_GUARDED_BY(mutex) = false;

/// List files are written in a separate thread.
ThreadFromGlobalPool lists_writing_thread;
std::unique_ptr<ThreadFromGlobalPool> lists_writing_thread;

/// Signals `lists_writing_thread` to exit.
std::condition_variable lists_writing_thread_should_exit;
Expand Down
8 changes: 5 additions & 3 deletions src/Access/ReplicatedAccessStorage.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <memory>
#include <Access/AccessEntityIO.h>
#include <Access/MemoryAccessStorage.h>
#include <Access/ReplicatedAccessStorage.h>
Expand All @@ -15,6 +16,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <base/range.h>
#include <base/sleep.h>
#include <boost/range/algorithm_ext/erase.hpp>
Expand Down Expand Up @@ -72,7 +74,7 @@ void ReplicatedAccessStorage::startWatchingThread()
{
bool prev_watching_flag = watching.exchange(true);
if (!prev_watching_flag)
watching_thread = ThreadFromGlobalPool(&ReplicatedAccessStorage::runWatchingThread, this);
watching_thread = std::make_unique<ThreadFromGlobalPool>(&ReplicatedAccessStorage::runWatchingThread, this);
}

void ReplicatedAccessStorage::stopWatchingThread()
Expand All @@ -81,8 +83,8 @@ void ReplicatedAccessStorage::stopWatchingThread()
if (prev_watching_flag)
{
watched_queue->finish();
if (watching_thread.joinable())
watching_thread.join();
if (watching_thread && watching_thread->joinable())
watching_thread->join();
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Access/ReplicatedAccessStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <atomic>

#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ConcurrentBoundedQueue.h>
Expand All @@ -21,7 +21,7 @@ class ReplicatedAccessStorage : public IAccessStorage
static constexpr char STORAGE_TYPE[] = "replicated";

ReplicatedAccessStorage(const String & storage_name, const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper, AccessChangesNotifier & changes_notifier_, bool allow_backup);
virtual ~ReplicatedAccessStorage() override;
~ReplicatedAccessStorage() override;

const char * getStorageType() const override { return STORAGE_TYPE; }

Expand All @@ -43,7 +43,7 @@ class ReplicatedAccessStorage : public IAccessStorage
std::mutex cached_zookeeper_mutex;

std::atomic<bool> watching = false;
ThreadFromGlobalPool watching_thread;
std::unique_ptr<ThreadFromGlobalPool> watching_thread;
std::shared_ptr<ConcurrentBoundedQueue<UUID>> watched_queue;

std::optional<UUID> insertImpl(const AccessEntityPtr & entity, bool replace_if_exists, bool throw_if_exists) override;
Expand Down
2 changes: 1 addition & 1 deletion src/AggregateFunctions/IAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <Interpreters/Context_fwd.h>
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Core/IResolvedFunction.h>

#include "config.h"
Expand Down
1 change: 1 addition & 0 deletions src/Backups/BackupCoordinationFileInfos.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Backups/BackupCoordinationFileInfos.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>


namespace DB
Expand Down
1 change: 1 addition & 0 deletions src/Backups/BackupEntriesCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <base/sleep.h>
#include <Common/escapeForFileName.h>
#include <boost/range/algorithm/copy.hpp>
#include <base/scope_guard.h>
#include <filesystem>

namespace fs = std::filesystem;
Expand Down
1 change: 1 addition & 0 deletions src/Backups/BackupFileInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <IO/HashingReadBuffer.h>


Expand Down
3 changes: 2 additions & 1 deletion src/Backups/BackupFileInfo.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#pragma once

#include <Core/Types.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>

namespace Poco { class Logger; }

namespace DB
{
Expand Down
1 change: 0 additions & 1 deletion src/Backups/BackupUtils.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <Parsers/ASTBackupQuery.h>
#include <Common/ThreadPool.h>


namespace DB
Expand Down
19 changes: 10 additions & 9 deletions src/Backups/BackupsWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Common/ThreadPool.h>


namespace CurrentMetrics
Expand Down Expand Up @@ -182,8 +183,8 @@ namespace


BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_)
: backups_thread_pool(CurrentMetrics::BackupsThreads, CurrentMetrics::BackupsThreadsActive, num_backup_threads, /* max_free_threads = */ 0, num_backup_threads)
, restores_thread_pool(CurrentMetrics::RestoreThreads, CurrentMetrics::RestoreThreadsActive, num_restore_threads, /* max_free_threads = */ 0, num_restore_threads)
: backups_thread_pool(std::make_unique<ThreadPool>(CurrentMetrics::BackupsThreads, CurrentMetrics::BackupsThreadsActive, num_backup_threads, /* max_free_threads = */ 0, num_backup_threads))
, restores_thread_pool(std::make_unique<ThreadPool>(CurrentMetrics::RestoreThreads, CurrentMetrics::RestoreThreadsActive, num_restore_threads, /* max_free_threads = */ 0, num_restore_threads))
, log(&Poco::Logger::get("BackupsWorker"))
, allow_concurrent_backups(allow_concurrent_backups_)
, allow_concurrent_restores(allow_concurrent_restores_)
Expand Down Expand Up @@ -248,7 +249,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context

if (backup_settings.async)
{
backups_thread_pool.scheduleOrThrowOnError(
backups_thread_pool->scheduleOrThrowOnError(
[this, backup_query, backup_id, backup_name_for_logging, backup_info, backup_settings, backup_coordination, context_in_use, mutable_context]
{
doBackup(
Expand Down Expand Up @@ -435,7 +436,7 @@ void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, con
LOG_TRACE(log, "{}", Stage::BUILDING_FILE_INFOS);
backup_coordination->setStage(Stage::BUILDING_FILE_INFOS, "");
backup_coordination->waitForStage(Stage::BUILDING_FILE_INFOS);
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), backups_thread_pool));
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), *backups_thread_pool));
}


Expand Down Expand Up @@ -522,7 +523,7 @@ void BackupsWorker::writeBackupEntries(BackupMutablePtr backup, BackupEntries &&
}
};

if (always_single_threaded || !backups_thread_pool.trySchedule([job] { job(true); }))
if (always_single_threaded || !backups_thread_pool->trySchedule([job] { job(true); }))
job(false);
}

Expand Down Expand Up @@ -581,7 +582,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt

if (restore_settings.async)
{
restores_thread_pool.scheduleOrThrowOnError(
restores_thread_pool->scheduleOrThrowOnError(
[this, restore_query, restore_id, backup_name_for_logging, backup_info, restore_settings, restore_coordination, context_in_use]
{
doRestore(
Expand Down Expand Up @@ -716,7 +717,7 @@ void BackupsWorker::doRestore(
}

/// Execute the data restoring tasks.
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), restores_thread_pool);
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), *restores_thread_pool);

/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(Stage::COMPLETED, "");
Expand Down Expand Up @@ -941,8 +942,8 @@ void BackupsWorker::shutdown()
if (has_active_backups_and_restores)
LOG_INFO(log, "Waiting for {} backups and {} restores to be finished", num_active_backups, num_active_restores);

backups_thread_pool.wait();
restores_thread_pool.wait();
backups_thread_pool->wait();
restores_thread_pool->wait();

if (has_active_backups_and_restores)
LOG_INFO(log, "All backup and restore tasks have finished");
Expand Down
7 changes: 4 additions & 3 deletions src/Backups/BackupsWorker.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include <Backups/BackupStatus.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Core/UUID.h>
#include <Parsers/IAST_fwd.h>
#include <unordered_map>
Expand Down Expand Up @@ -132,8 +133,8 @@ class BackupsWorker
void setNumFilesAndSize(const OperationID & id, size_t num_files, UInt64 total_size, size_t num_entries,
UInt64 uncompressed_size, UInt64 compressed_size, size_t num_read_files, UInt64 num_read_bytes);

ThreadPool backups_thread_pool;
ThreadPool restores_thread_pool;
std::unique_ptr<ThreadPool> backups_thread_pool;
std::unique_ptr<ThreadPool> restores_thread_pool;

std::unordered_map<OperationID, Info> infos;
std::condition_variable status_changed;
Expand Down
1 change: 1 addition & 0 deletions src/Bridge/IBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <Server/HTTP/HTTPServer.h>
#include <base/errnoToString.h>
#include <base/range.h>
#include <base/scope_guard.h>

#include <sys/time.h>
#include <sys/resource.h>
Expand Down
13 changes: 6 additions & 7 deletions src/Common/SystemLogBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/SystemLogBase.h>
#include <Common/ThreadPool.h>

#include <Common/logger_useful.h>
#include <base/scope_guard.h>
Expand All @@ -35,34 +36,32 @@ namespace
constexpr size_t DBMS_SYSTEM_LOG_QUEUE_SIZE = 1048576;
}

ISystemLog::~ISystemLog() = default;

void ISystemLog::stopFlushThread()
{
{
std::lock_guard lock(mutex);

if (!saving_thread.joinable())
{
if (!saving_thread || !saving_thread->joinable())
return;
}

if (is_shutdown)
{
return;
}

is_shutdown = true;

/// Tell thread to shutdown.
flush_event.notify_all();
}

saving_thread.join();
saving_thread->join();
}

void ISystemLog::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
saving_thread = std::make_unique<ThreadFromGlobalPool>([this] { savingThreadFunction(); });
}

static thread_local bool recursive_add_call = false;
Expand Down
6 changes: 3 additions & 3 deletions src/Common/SystemLogBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>

#define SYSTEM_LOG_ELEMENTS(M) \
M(AsynchronousMetricLogElement) \
Expand Down Expand Up @@ -60,12 +60,12 @@ class ISystemLog
/// Stop the background flush thread before destructor. No more data will be written.
virtual void shutdown() = 0;

virtual ~ISystemLog() = default;
virtual ~ISystemLog();

virtual void savingThreadFunction() = 0;

protected:
ThreadFromGlobalPool saving_thread;
std::unique_ptr<ThreadFromGlobalPool> saving_thread;

/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;
Expand Down
1 change: 1 addition & 0 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/ThreadStatus.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool_fwd.h>
#include <base/scope_guard.h>

/** Very simple thread pool similar to boost::threadpool.
Expand Down
13 changes: 13 additions & 0 deletions src/Common/ThreadPool_fwd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once
azat marked this conversation as resolved.
Show resolved Hide resolved

template <typename Thread>
class ThreadPoolImpl;

template <bool propagate_opentelemetry_context>
class ThreadFromGlobalPoolImpl;

using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;

using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;

using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolNoTracingContextPropagation>;
1 change: 1 addition & 0 deletions src/Coordination/Changelog.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <libnuraft/nuraft.hxx>
#include <libnuraft/raft_server.hxx>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions src/Coordination/KeeperStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <base/hex.h>
#include <base/scope_guard.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
Expand Down