Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ThreadPool for DistributedSink and use StrongTypedef for CurrentMetrics/ProfileEvents/StatusInfo to avoid further errors #48314

Merged
merged 8 commits into from Apr 2, 2023
2 changes: 1 addition & 1 deletion base/base/strong_typedef.h
Expand Up @@ -35,7 +35,7 @@ struct StrongTypedef
Self & operator=(T && rhs) { t = std::move(rhs); return *this;}

// NOLINTBEGIN(google-explicit-constructor)
operator const T & () const { return t; }
constexpr operator const T & () const { return t; }
operator T & () { return t; }
// NOLINTEND(google-explicit-constructor)

Expand Down
6 changes: 3 additions & 3 deletions programs/server/MetricsTransmitter.cpp
Expand Up @@ -87,7 +87,7 @@ void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_count

if (send_events)
{
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
Expand All @@ -100,7 +100,7 @@ void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_count

if (send_events_cumulative)
{
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
Expand All @@ -110,7 +110,7 @@ void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_count

if (send_metrics)
{
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
for (CurrentMetrics::Metric i = CurrentMetrics::Metric(0), end = CurrentMetrics::end(); i < end; ++i)
{
const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);

Expand Down
6 changes: 4 additions & 2 deletions src/Common/CurrentMetrics.cpp
Expand Up @@ -126,6 +126,8 @@
M(DDLWorkerThreadsActive, "Number of threads in the DDLWORKER thread pool for ON CLUSTER queries running a task.") \
M(StorageDistributedThreads, "Number of threads in the StorageDistributed thread pool.") \
M(StorageDistributedThreadsActive, "Number of threads in the StorageDistributed thread pool running a task.") \
M(DistributedInsertThreads, "Number of threads used for INSERT into Distributed.") \
M(DistributedInsertThreadsActive, "Number of threads used for INSERT into Distributed running a task.") \
M(StorageS3Threads, "Number of threads in the StorageS3 thread pool.") \
M(StorageS3ThreadsActive, "Number of threads in the StorageS3 thread pool running a task.") \
M(MergeTreePartsLoaderThreads, "Number of threads in the MergeTree parts loader thread pool.") \
Expand Down Expand Up @@ -184,10 +186,10 @@

namespace CurrentMetrics
{
#define M(NAME, DOCUMENTATION) extern const Metric NAME = __COUNTER__;
#define M(NAME, DOCUMENTATION) extern const Metric NAME = Metric(__COUNTER__);
APPLY_FOR_METRICS(M)
#undef M
constexpr Metric END = __COUNTER__;
constexpr Metric END = Metric(__COUNTER__);

std::atomic<Value> values[END] {}; /// Global variable, initialized by zeros.

Expand Down
3 changes: 2 additions & 1 deletion src/Common/CurrentMetrics.h
Expand Up @@ -6,6 +6,7 @@
#include <atomic>
#include <cassert>
#include <base/types.h>
#include <base/strong_typedef.h>

/** Allows to count number of simultaneously happening processes or current value of some metric.
* - for high-level profiling.
Expand All @@ -22,7 +23,7 @@
namespace CurrentMetrics
{
/// Metric identifier (index in array).
using Metric = size_t;
using Metric = StrongTypedef<size_t, struct MetricTag>;
using Value = DB::Int64;

/// Get name of metric by identifier. Returns statically allocated string.
Expand Down
10 changes: 5 additions & 5 deletions src/Common/ProfileEvents.cpp
Expand Up @@ -497,10 +497,10 @@ The server successfully detected this situation and will download merged part fr
namespace ProfileEvents
{

#define M(NAME, DOCUMENTATION) extern const Event NAME = __COUNTER__;
#define M(NAME, DOCUMENTATION) extern const Event NAME = Event(__COUNTER__);
APPLY_FOR_EVENTS(M)
#undef M
constexpr Event END = __COUNTER__;
constexpr Event END = Event(__COUNTER__);

/// Global variable, initialized by zeros.
Counter global_counters_array[END] {};
Expand All @@ -522,7 +522,7 @@ void Counters::resetCounters()
{
if (counters)
{
for (Event i = 0; i < num_counters; ++i)
for (Event i = Event(0); i < num_counters; ++i)
counters[i].store(0, std::memory_order_relaxed);
}
}
Expand All @@ -540,7 +540,7 @@ Counters::Snapshot::Snapshot()
Counters::Snapshot Counters::getPartiallyAtomicSnapshot() const
{
Snapshot res;
for (Event i = 0; i < num_counters; ++i)
for (Event i = Event(0); i < num_counters; ++i)
res.counters_holder[i] = counters[i].load(std::memory_order_relaxed);
return res;
}
Expand Down Expand Up @@ -616,7 +616,7 @@ CountersIncrement::CountersIncrement(Counters::Snapshot const & snapshot)
CountersIncrement::CountersIncrement(Counters::Snapshot const & after, Counters::Snapshot const & before)
{
init();
for (Event i = 0; i < Counters::num_counters; ++i)
for (Event i = Event(0); i < Counters::num_counters; ++i)
increment_holder[i] = static_cast<Increment>(after[i]) - static_cast<Increment>(before[i]);
}

Expand Down
5 changes: 3 additions & 2 deletions src/Common/ProfileEvents.h
@@ -1,7 +1,8 @@
#pragma once

#include <Common/VariableContext.h>
#include "base/types.h"
#include <base/types.h>
#include <base/strong_typedef.h>
#include <atomic>
#include <memory>
#include <cstddef>
Expand All @@ -14,7 +15,7 @@
namespace ProfileEvents
{
/// Event identifier (index in array).
using Event = size_t;
using Event = StrongTypedef<size_t, struct EventTag>;
using Count = size_t;
using Increment = Int64;
using Counter = std::atomic<Count>;
Expand Down
4 changes: 2 additions & 2 deletions src/Common/StatusInfo.cpp
Expand Up @@ -8,10 +8,10 @@

namespace CurrentStatusInfo
{
#define M(NAME, DOCUMENTATION, ENUM) extern const Status NAME = __COUNTER__;
#define M(NAME, DOCUMENTATION, ENUM) extern const Status NAME = Status(__COUNTER__);
APPLY_FOR_STATUS(M)
#undef M
constexpr Status END = __COUNTER__;
constexpr Status END = Status(__COUNTER__);

std::mutex locks[END] {};
std::unordered_map<String, Int8> values[END] {};
Expand Down
3 changes: 2 additions & 1 deletion src/Common/StatusInfo.h
Expand Up @@ -6,13 +6,14 @@
#include <atomic>
#include <vector>
#include <base/types.h>
#include <base/strong_typedef.h>
#include <mutex>
#include <unordered_map>


namespace CurrentStatusInfo
{
using Status = size_t;
using Status = StrongTypedef<size_t, struct StatusTag>;
using Key = std::string;

const char * getName(Status event);
Expand Down
9 changes: 8 additions & 1 deletion src/Common/tests/gtest_thread_pool_limit.cpp
@@ -1,16 +1,23 @@
#include <atomic>
#include <iostream>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>

#include <gtest/gtest.h>

namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}

/// Test for thread self-removal when number of free threads in pool is too large.
/// Just checks that nothing weird happens.

template <typename Pool>
int test()
{
Pool pool(10, 2, 10);
Pool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 10, 2, 10);

std::atomic<int> counter{0};
for (size_t i = 0; i < 10; ++i)
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/TemporaryFileOnDisk.cpp
Expand Up @@ -27,7 +27,7 @@ TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_)
: TemporaryFileOnDisk(disk_, "")
{}

TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope)
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Metric metric_scope)
: TemporaryFileOnDisk(disk_)
{
sub_metric_increment.emplace(metric_scope);
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/TemporaryFileOnDisk.h
Expand Up @@ -17,7 +17,7 @@ class TemporaryFileOnDisk
{
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Metric metric_scope);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix);

~TemporaryFileOnDisk();
Expand Down
8 changes: 4 additions & 4 deletions src/Interpreters/DDLWorker.cpp
Expand Up @@ -1022,10 +1022,10 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
{
String str_buf = node_path.substr(query_path_prefix.length());
DB::ReadBufferFromString in(str_buf);
CurrentMetrics::Metric id;
readText(id, in);
id = std::max(*max_pushed_entry_metric, id);
CurrentMetrics::set(*max_pushed_entry_metric, id);
CurrentMetrics::Value pushed_entry;
readText(pushed_entry, in);
pushed_entry = std::max(CurrentMetrics::get(*max_pushed_entry_metric), pushed_entry);
CurrentMetrics::set(*max_pushed_entry_metric, pushed_entry);
}

/// We cannot create status dirs in a single transaction with previous request,
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/MetricLog.cpp
Expand Up @@ -50,7 +50,7 @@ void MetricLogElement::appendToBlock(MutableColumns & columns) const
columns[column_idx++]->insert(profile_events[i]);

for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
columns[column_idx++]->insert(current_metrics[i]);
columns[column_idx++]->insert(current_metrics[i].toUnderType());
}


Expand Down Expand Up @@ -97,7 +97,7 @@ void MetricLog::metricThreadFunction()
elem.milliseconds = timeInMilliseconds(current_time) - timeInSeconds(current_time) * 1000;

elem.profile_events.resize(ProfileEvents::end());
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
const ProfileEvents::Count new_value = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
auto & old_value = prev_profile_events[i];
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/ProfileEventsExt.cpp
Expand Up @@ -32,7 +32,7 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column,
auto & value_column = tuple_column.getColumn(1);

size_t size = 0;
for (Event event = 0; event < Counters::num_counters; ++event)
for (Event event = Event(0); event < Counters::num_counters; ++event)
{
UInt64 value = counters[event];

Expand All @@ -54,7 +54,7 @@ static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::Mutabl
size_t rows = 0;
auto & name_column = columns[NAME_COLUMN_INDEX];
auto & value_column = columns[VALUE_COLUMN_INDEX];
for (Event event = 0; event < Counters::num_counters; ++event)
for (Event event = Event(0); event < Counters::num_counters; ++event)
{
Int64 value = snapshot.counters[event];

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/TemporaryDataOnDisk.cpp
Expand Up @@ -49,7 +49,7 @@ TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
{}

TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Value metric_scope)
TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
, current_metric_scope(metric_scope)
{}
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/TemporaryDataOnDisk.h
Expand Up @@ -85,7 +85,7 @@ class TemporaryDataOnDisk : private TemporaryDataOnDiskScope

explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_);

explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Value metric_scope);
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope);

/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0);
Expand All @@ -102,7 +102,7 @@ class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);

typename CurrentMetrics::Value current_metric_scope = CurrentMetrics::TemporaryFilesUnknown;
typename CurrentMetrics::Metric current_metric_scope = CurrentMetrics::TemporaryFilesUnknown;
};

/*
Expand Down
2 changes: 1 addition & 1 deletion src/Server/PrometheusMetricsWriter.cpp
Expand Up @@ -59,7 +59,7 @@ void PrometheusMetricsWriter::write(WriteBuffer & wb) const
{
if (send_events)
{
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);

Expand Down
9 changes: 6 additions & 3 deletions src/Storages/Distributed/DistributedSink.cpp
Expand Up @@ -41,6 +41,8 @@
namespace CurrentMetrics
{
extern const Metric DistributedSend;
extern const Metric DistributedInsertThreads;
extern const Metric DistributedInsertThreadsActive;
}

namespace ProfileEvents
Expand Down Expand Up @@ -460,9 +462,10 @@ void DistributedSink::writeSync(const Block & block)

size_t jobs_count = random_shard_insert ? 1 : (remote_jobs_count + local_jobs_count);
size_t max_threads = std::min<size_t>(settings.max_distributed_connections, jobs_count);
pool.emplace(/* max_threads_= */ max_threads,
/* max_free_threads_= */ max_threads,
/* queue_size_= */ jobs_count);
pool.emplace(
CurrentMetrics::DistributedInsertThreads,
CurrentMetrics::DistributedInsertThreadsActive,
max_threads, max_threads, jobs_count);

if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes))
{
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemEvents.cpp
Expand Up @@ -18,7 +18,7 @@ NamesAndTypesList StorageSystemEvents::getNamesAndTypes()

void StorageSystemEvents::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
UInt64 value = ProfileEvents::global_counters[i];

Expand Down
1 change: 1 addition & 0 deletions utils/check-style/check-style
Expand Up @@ -78,6 +78,7 @@ EXTERN_TYPES_EXCLUDES=(

CurrentMetrics::add
CurrentMetrics::sub
CurrentMetrics::get
CurrentMetrics::set
CurrentMetrics::end
CurrentMetrics::Increment
Expand Down