diff --git a/base/base/strong_typedef.h b/base/base/strong_typedef.h index 2ddea6412f56..b3b8bced688c 100644 --- a/base/base/strong_typedef.h +++ b/base/base/strong_typedef.h @@ -35,7 +35,7 @@ struct StrongTypedef Self & operator=(T && rhs) { t = std::move(rhs); return *this;} // NOLINTBEGIN(google-explicit-constructor) - operator const T & () const { return t; } + constexpr operator const T & () const { return t; } operator T & () { return t; } // NOLINTEND(google-explicit-constructor) diff --git a/programs/server/MetricsTransmitter.cpp b/programs/server/MetricsTransmitter.cpp index 2f28f0a1d162..ae9fa5ecc2c9 100644 --- a/programs/server/MetricsTransmitter.cpp +++ b/programs/server/MetricsTransmitter.cpp @@ -87,7 +87,7 @@ void MetricsTransmitter::transmit(std::vector & prev_count if (send_events) { - for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) + for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i) { const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); const auto counter_increment = counter - prev_counters[i]; @@ -100,7 +100,7 @@ void MetricsTransmitter::transmit(std::vector & prev_count if (send_events_cumulative) { - for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) + for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i) { const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); std::string key{ProfileEvents::getName(static_cast(i))}; @@ -110,7 +110,7 @@ void MetricsTransmitter::transmit(std::vector & prev_count if (send_metrics) { - for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) + for (CurrentMetrics::Metric i = CurrentMetrics::Metric(0), end = CurrentMetrics::end(); i < end; ++i) { const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed); diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 542c48148c8b..cfe9f41befeb 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -126,6 +126,8 @@ M(DDLWorkerThreadsActive, "Number of threads in the DDLWORKER thread pool for ON CLUSTER queries running a task.") \ M(StorageDistributedThreads, "Number of threads in the StorageDistributed thread pool.") \ M(StorageDistributedThreadsActive, "Number of threads in the StorageDistributed thread pool running a task.") \ + M(DistributedInsertThreads, "Number of threads used for INSERT into Distributed.") \ + M(DistributedInsertThreadsActive, "Number of threads used for INSERT into Distributed running a task.") \ M(StorageS3Threads, "Number of threads in the StorageS3 thread pool.") \ M(StorageS3ThreadsActive, "Number of threads in the StorageS3 thread pool running a task.") \ M(MergeTreePartsLoaderThreads, "Number of threads in the MergeTree parts loader thread pool.") \ @@ -184,10 +186,10 @@ namespace CurrentMetrics { - #define M(NAME, DOCUMENTATION) extern const Metric NAME = __COUNTER__; + #define M(NAME, DOCUMENTATION) extern const Metric NAME = Metric(__COUNTER__); APPLY_FOR_METRICS(M) #undef M - constexpr Metric END = __COUNTER__; + constexpr Metric END = Metric(__COUNTER__); std::atomic values[END] {}; /// Global variable, initialized by zeros. diff --git a/src/Common/CurrentMetrics.h b/src/Common/CurrentMetrics.h index 0ae16e2d08d7..a1ef254485dd 100644 --- a/src/Common/CurrentMetrics.h +++ b/src/Common/CurrentMetrics.h @@ -6,6 +6,7 @@ #include #include #include +#include /** Allows to count number of simultaneously happening processes or current value of some metric. * - for high-level profiling. @@ -22,7 +23,7 @@ namespace CurrentMetrics { /// Metric identifier (index in array). - using Metric = size_t; + using Metric = StrongTypedef; using Value = DB::Int64; /// Get name of metric by identifier. Returns statically allocated string. diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 3cee4a8e7181..1d035952f13c 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -497,10 +497,10 @@ The server successfully detected this situation and will download merged part fr namespace ProfileEvents { -#define M(NAME, DOCUMENTATION) extern const Event NAME = __COUNTER__; +#define M(NAME, DOCUMENTATION) extern const Event NAME = Event(__COUNTER__); APPLY_FOR_EVENTS(M) #undef M -constexpr Event END = __COUNTER__; +constexpr Event END = Event(__COUNTER__); /// Global variable, initialized by zeros. Counter global_counters_array[END] {}; @@ -522,7 +522,7 @@ void Counters::resetCounters() { if (counters) { - for (Event i = 0; i < num_counters; ++i) + for (Event i = Event(0); i < num_counters; ++i) counters[i].store(0, std::memory_order_relaxed); } } @@ -540,7 +540,7 @@ Counters::Snapshot::Snapshot() Counters::Snapshot Counters::getPartiallyAtomicSnapshot() const { Snapshot res; - for (Event i = 0; i < num_counters; ++i) + for (Event i = Event(0); i < num_counters; ++i) res.counters_holder[i] = counters[i].load(std::memory_order_relaxed); return res; } @@ -616,7 +616,7 @@ CountersIncrement::CountersIncrement(Counters::Snapshot const & snapshot) CountersIncrement::CountersIncrement(Counters::Snapshot const & after, Counters::Snapshot const & before) { init(); - for (Event i = 0; i < Counters::num_counters; ++i) + for (Event i = Event(0); i < Counters::num_counters; ++i) increment_holder[i] = static_cast(after[i]) - static_cast(before[i]); } diff --git a/src/Common/ProfileEvents.h b/src/Common/ProfileEvents.h index 867b5b551c62..a36e68742cf5 100644 --- a/src/Common/ProfileEvents.h +++ b/src/Common/ProfileEvents.h @@ -1,7 +1,8 @@ #pragma once #include -#include "base/types.h" +#include +#include #include #include #include @@ -14,7 +15,7 @@ namespace ProfileEvents { /// Event identifier (index in array). - using Event = size_t; + using Event = StrongTypedef; using Count = size_t; using Increment = Int64; using Counter = std::atomic; diff --git a/src/Common/StatusInfo.cpp b/src/Common/StatusInfo.cpp index 32afc8330013..1f9ddfaf4b9e 100644 --- a/src/Common/StatusInfo.cpp +++ b/src/Common/StatusInfo.cpp @@ -8,10 +8,10 @@ namespace CurrentStatusInfo { - #define M(NAME, DOCUMENTATION, ENUM) extern const Status NAME = __COUNTER__; + #define M(NAME, DOCUMENTATION, ENUM) extern const Status NAME = Status(__COUNTER__); APPLY_FOR_STATUS(M) #undef M - constexpr Status END = __COUNTER__; + constexpr Status END = Status(__COUNTER__); std::mutex locks[END] {}; std::unordered_map values[END] {}; diff --git a/src/Common/StatusInfo.h b/src/Common/StatusInfo.h index 9aa185cd0c36..91e6d4d3b850 100644 --- a/src/Common/StatusInfo.h +++ b/src/Common/StatusInfo.h @@ -6,13 +6,14 @@ #include #include #include +#include #include #include namespace CurrentStatusInfo { - using Status = size_t; + using Status = StrongTypedef; using Key = std::string; const char * getName(Status event); diff --git a/src/Common/tests/gtest_thread_pool_limit.cpp b/src/Common/tests/gtest_thread_pool_limit.cpp index bc67ffd0bc16..17f79d17894f 100644 --- a/src/Common/tests/gtest_thread_pool_limit.cpp +++ b/src/Common/tests/gtest_thread_pool_limit.cpp @@ -1,16 +1,23 @@ #include #include #include +#include #include +namespace CurrentMetrics +{ + extern const Metric LocalThread; + extern const Metric LocalThreadActive; +} + /// Test for thread self-removal when number of free threads in pool is too large. /// Just checks that nothing weird happens. template int test() { - Pool pool(10, 2, 10); + Pool pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, 10, 2, 10); std::atomic counter{0}; for (size_t i = 0; i < 10; ++i) diff --git a/src/Disks/TemporaryFileOnDisk.cpp b/src/Disks/TemporaryFileOnDisk.cpp index d31b09b2185f..6fe6fd5a1c9a 100644 --- a/src/Disks/TemporaryFileOnDisk.cpp +++ b/src/Disks/TemporaryFileOnDisk.cpp @@ -27,7 +27,7 @@ TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_) : TemporaryFileOnDisk(disk_, "") {} -TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope) +TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Metric metric_scope) : TemporaryFileOnDisk(disk_) { sub_metric_increment.emplace(metric_scope); diff --git a/src/Disks/TemporaryFileOnDisk.h b/src/Disks/TemporaryFileOnDisk.h index 9ba59c3eaf0a..4c376383087f 100644 --- a/src/Disks/TemporaryFileOnDisk.h +++ b/src/Disks/TemporaryFileOnDisk.h @@ -17,7 +17,7 @@ class TemporaryFileOnDisk { public: explicit TemporaryFileOnDisk(const DiskPtr & disk_); - explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope); + explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Metric metric_scope); explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix); ~TemporaryFileOnDisk(); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 22bece0ef042..c4529af2c516 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -1022,10 +1022,10 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry) { String str_buf = node_path.substr(query_path_prefix.length()); DB::ReadBufferFromString in(str_buf); - CurrentMetrics::Metric id; - readText(id, in); - id = std::max(*max_pushed_entry_metric, id); - CurrentMetrics::set(*max_pushed_entry_metric, id); + CurrentMetrics::Value pushed_entry; + readText(pushed_entry, in); + pushed_entry = std::max(CurrentMetrics::get(*max_pushed_entry_metric), pushed_entry); + CurrentMetrics::set(*max_pushed_entry_metric, pushed_entry); } /// We cannot create status dirs in a single transaction with previous request, diff --git a/src/Interpreters/MetricLog.cpp b/src/Interpreters/MetricLog.cpp index 6e98f84bc82f..578cc118a6b5 100644 --- a/src/Interpreters/MetricLog.cpp +++ b/src/Interpreters/MetricLog.cpp @@ -50,7 +50,7 @@ void MetricLogElement::appendToBlock(MutableColumns & columns) const columns[column_idx++]->insert(profile_events[i]); for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) - columns[column_idx++]->insert(current_metrics[i]); + columns[column_idx++]->insert(current_metrics[i].toUnderType()); } @@ -97,7 +97,7 @@ void MetricLog::metricThreadFunction() elem.milliseconds = timeInMilliseconds(current_time) - timeInSeconds(current_time) * 1000; elem.profile_events.resize(ProfileEvents::end()); - for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) + for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i) { const ProfileEvents::Count new_value = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); auto & old_value = prev_profile_events[i]; diff --git a/src/Interpreters/ProfileEventsExt.cpp b/src/Interpreters/ProfileEventsExt.cpp index 7fbbe3c662b0..44977524c565 100644 --- a/src/Interpreters/ProfileEventsExt.cpp +++ b/src/Interpreters/ProfileEventsExt.cpp @@ -32,7 +32,7 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, auto & value_column = tuple_column.getColumn(1); size_t size = 0; - for (Event event = 0; event < Counters::num_counters; ++event) + for (Event event = Event(0); event < Counters::num_counters; ++event) { UInt64 value = counters[event]; @@ -54,7 +54,7 @@ static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::Mutabl size_t rows = 0; auto & name_column = columns[NAME_COLUMN_INDEX]; auto & value_column = columns[VALUE_COLUMN_INDEX]; - for (Event event = 0; event < Counters::num_counters; ++event) + for (Event event = Event(0); event < Counters::num_counters; ++event) { Int64 value = snapshot.counters[event]; diff --git a/src/Interpreters/TemporaryDataOnDisk.cpp b/src/Interpreters/TemporaryDataOnDisk.cpp index 25252f8226bc..c57de88d964c 100644 --- a/src/Interpreters/TemporaryDataOnDisk.cpp +++ b/src/Interpreters/TemporaryDataOnDisk.cpp @@ -49,7 +49,7 @@ TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_) : TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0) {} -TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Value metric_scope) +TemporaryDataOnDisk::TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope) : TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0) , current_metric_scope(metric_scope) {} diff --git a/src/Interpreters/TemporaryDataOnDisk.h b/src/Interpreters/TemporaryDataOnDisk.h index f0e02f16fb69..f7a6249a1ee1 100644 --- a/src/Interpreters/TemporaryDataOnDisk.h +++ b/src/Interpreters/TemporaryDataOnDisk.h @@ -85,7 +85,7 @@ class TemporaryDataOnDisk : private TemporaryDataOnDiskScope explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_); - explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Value metric_scope); + explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Metric metric_scope); /// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0); @@ -102,7 +102,7 @@ class TemporaryDataOnDisk : private TemporaryDataOnDiskScope mutable std::mutex mutex; std::vector streams TSA_GUARDED_BY(mutex); - typename CurrentMetrics::Value current_metric_scope = CurrentMetrics::TemporaryFilesUnknown; + typename CurrentMetrics::Metric current_metric_scope = CurrentMetrics::TemporaryFilesUnknown; }; /* diff --git a/src/Server/PrometheusMetricsWriter.cpp b/src/Server/PrometheusMetricsWriter.cpp index abf2a2c0b6bc..2331e455225f 100644 --- a/src/Server/PrometheusMetricsWriter.cpp +++ b/src/Server/PrometheusMetricsWriter.cpp @@ -59,7 +59,7 @@ void PrometheusMetricsWriter::write(WriteBuffer & wb) const { if (send_events) { - for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) + for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i) { const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 11b938cd722b..720a951299a1 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -41,6 +41,8 @@ namespace CurrentMetrics { extern const Metric DistributedSend; + extern const Metric DistributedInsertThreads; + extern const Metric DistributedInsertThreadsActive; } namespace ProfileEvents @@ -460,9 +462,10 @@ void DistributedSink::writeSync(const Block & block) size_t jobs_count = random_shard_insert ? 1 : (remote_jobs_count + local_jobs_count); size_t max_threads = std::min(settings.max_distributed_connections, jobs_count); - pool.emplace(/* max_threads_= */ max_threads, - /* max_free_threads_= */ max_threads, - /* queue_size_= */ jobs_count); + pool.emplace( + CurrentMetrics::DistributedInsertThreads, + CurrentMetrics::DistributedInsertThreadsActive, + max_threads, max_threads, jobs_count); if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes)) { diff --git a/src/Storages/System/StorageSystemEvents.cpp b/src/Storages/System/StorageSystemEvents.cpp index be2d3f8d49ef..b9b07cfe0ac5 100644 --- a/src/Storages/System/StorageSystemEvents.cpp +++ b/src/Storages/System/StorageSystemEvents.cpp @@ -18,7 +18,7 @@ NamesAndTypesList StorageSystemEvents::getNamesAndTypes() void StorageSystemEvents::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const { - for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) + for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i) { UInt64 value = ProfileEvents::global_counters[i]; diff --git a/utils/check-style/check-style b/utils/check-style/check-style index 7a1fa6ce123f..a6cc20bb7c85 100755 --- a/utils/check-style/check-style +++ b/utils/check-style/check-style @@ -78,6 +78,7 @@ EXTERN_TYPES_EXCLUDES=( CurrentMetrics::add CurrentMetrics::sub + CurrentMetrics::get CurrentMetrics::set CurrentMetrics::end CurrentMetrics::Increment