Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile event 'ParallelReplicasUsedCount' #58173

Merged
merged 5 commits into from Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/Common/ProfileEvents.cpp
Expand Up @@ -250,9 +250,9 @@ Number of times data after merge is not byte-identical to the data on another re
7. Manual modification of source data after server startup.
8. Manual modification of checksums stored in ZooKeeper.
9. Part format related settings like 'enable_mixed_granularity_parts' are different on different replicas.
The server successfully detected this situation and will download merged part from replica to force byte-identical result.
The server successfully detected this situation and will download merged part from the replica to force the byte-identical result.
)") \
M(DataAfterMutationDiffersFromReplica, "Number of times data after mutation is not byte-identical to the data on another replicas. In addition to the reasons described in 'DataAfterMergeDiffersFromReplica', it is also possible due to non-deterministic mutation.") \
M(DataAfterMutationDiffersFromReplica, "Number of times data after mutation is not byte-identical to the data on other replicas. In addition to the reasons described in 'DataAfterMergeDiffersFromReplica', it is also possible due to non-deterministic mutation.") \
M(PolygonsAddedToPool, "A polygon has been added to the cache (pool) for the 'pointInPolygon' function.") \
M(PolygonsInPoolAllocatedBytes, "The number of bytes for polygons added to the cache (pool) for the 'pointInPolygon' function.") \
\
Expand All @@ -272,12 +272,12 @@ The server successfully detected this situation and will download merged part fr
M(PartsLockWaitMicroseconds, "Total time spent waiting for data parts lock in MergeTree tables") \
\
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (note that this is a sum).") \
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user mode. This include time CPU pipeline was stalled due to main memory access, cache misses, branch mispredictions, hyper-threading, etc.") \
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user mode. This includes time CPU pipeline was stalled due to main memory access, cache misses, branch mispredictions, hyper-threading, etc.") \
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel mode. This is time spent in syscalls, excluding waiting time during blocking syscalls.") \
M(MemoryOvercommitWaitTimeMicroseconds, "Total time spent in waiting for memory to be freed in OvercommitTracker.") \
M(MemoryAllocatorPurge, "Total number of times memory allocator purge was requested") \
M(MemoryAllocatorPurgeTimeMicroseconds, "Total number of times memory allocator purge was requested") \
M(SoftPageFaults, "The number of soft page faults in query execution threads. Soft page fault usually means a miss in the memory allocator cache which required a new memory mapping from the OS and subsequent allocation of a page of physical memory.") \
M(SoftPageFaults, "The number of soft page faults in query execution threads. Soft page fault usually means a miss in the memory allocator cache, which requires a new memory mapping from the OS and subsequent allocation of a page of physical memory.") \
M(HardPageFaults, "The number of hard page faults in query execution threads. High values indicate either that you forgot to turn off swap on your server, or eviction of memory pages of the ClickHouse binary during very high memory pressure, or successful usage of the 'mmap' read method for the tables data.") \
\
M(OSIOWaitMicroseconds, "Total time a thread spent waiting for a result of IO operation, from the OS point of view. This is real IO that doesn't include page cache.") \
Expand All @@ -290,8 +290,8 @@ The server successfully detected this situation and will download merged part fr
\
M(PerfCpuCycles, "Total cycles. Be wary of what happens during CPU frequency scaling.") \
M(PerfInstructions, "Retired instructions. Be careful, these can be affected by various issues, most notably hardware interrupt counts.") \
M(PerfCacheReferences, "Cache accesses. Usually this indicates Last Level Cache accesses but this may vary depending on your CPU. This may include prefetches and coherency messages; again this depends on the design of your CPU.") \
M(PerfCacheMisses, "Cache misses. Usually this indicates Last Level Cache misses; this is intended to be used in con‐junction with the PERFCOUNTHWCACHEREFERENCES event to calculate cache miss rates.") \
M(PerfCacheReferences, "Cache accesses. Usually, this indicates Last Level Cache accesses, but this may vary depending on your CPU. This may include prefetches and coherency messages; again this depends on the design of your CPU.") \
M(PerfCacheMisses, "Cache misses. Usually this indicates Last Level Cache misses; this is intended to be used in conjunction with the PERFCOUNTHWCACHEREFERENCES event to calculate cache miss rates.") \
M(PerfBranchInstructions, "Retired branch instructions. Prior to Linux 2.6.35, this used the wrong event on AMD processors.") \
M(PerfBranchMisses, "Mispredicted branch instructions.") \
M(PerfBusCycles, "Bus cycles, which can be different from total cycles.") \
Expand Down Expand Up @@ -457,7 +457,7 @@ The server successfully detected this situation and will download merged part fr
M(FileSegmentWaitReadBufferMicroseconds, "Metric per file segment. Time spend waiting for internal read buffer (includes cache waiting)") \
M(FileSegmentReadMicroseconds, "Metric per file segment. Time spend reading from file") \
M(FileSegmentCacheWriteMicroseconds, "Metric per file segment. Time spend writing data to cache") \
M(FileSegmentPredownloadMicroseconds, "Metric per file segment. Time spent predownloading data to cache (predownloading - finishing file segment download (after someone who failed to do that) up to the point current thread was requested to do)") \
M(FileSegmentPredownloadMicroseconds, "Metric per file segment. Time spent pre-downloading data to cache (pre-downloading - finishing file segment download (after someone who failed to do that) up to the point current thread was requested to do)") \
M(FileSegmentUsedBytes, "Metric per file segment. How many bytes were actually used from current file segment") \
\
M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \
Expand All @@ -466,12 +466,12 @@ The server successfully detected this situation and will download merged part fr
M(SleepFunctionMicroseconds, "Time set to sleep in a sleep function (sleep, sleepEachRow).") \
M(SleepFunctionElapsedMicroseconds, "Time spent sleeping in a sleep function (sleep, sleepEachRow).") \
\
M(ThreadPoolReaderPageCacheHit, "Number of times the read inside ThreadPoolReader was done from page cache.") \
M(ThreadPoolReaderPageCacheHitBytes, "Number of bytes read inside ThreadPoolReader when it was done from page cache.") \
M(ThreadPoolReaderPageCacheHit, "Number of times the read inside ThreadPoolReader was done from the page cache.") \
M(ThreadPoolReaderPageCacheHitBytes, "Number of bytes read inside ThreadPoolReader when it was done from the page cache.") \
M(ThreadPoolReaderPageCacheHitElapsedMicroseconds, "Time spent reading data from page cache in ThreadPoolReader.") \
M(ThreadPoolReaderPageCacheMiss, "Number of times the read inside ThreadPoolReader was not done from page cache and was hand off to thread pool.") \
M(ThreadPoolReaderPageCacheMissBytes, "Number of bytes read inside ThreadPoolReader when read was not done from page cache and was hand off to thread pool.") \
M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \
M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from the page cache.") \
\
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads in asynchronous local read.") \
M(SynchronousReadWaitMicroseconds, "Time spent in waiting for synchronous reads in asynchronous local read.") \
Expand Down Expand Up @@ -512,7 +512,7 @@ The server successfully detected this situation and will download merged part fr
M(SchemaInferenceCacheSchemaHits, "Number of times the schema is found in schema cache during schema inference") \
M(SchemaInferenceCacheNumRowsHits, "Number of times the number of rows is found in schema cache during count from files") \
M(SchemaInferenceCacheMisses, "Number of times the requested source is not in schema cache") \
M(SchemaInferenceCacheSchemaMisses, "Number of times the requested source is in cache but the schema is not in cache while schema inference") \
M(SchemaInferenceCacheSchemaMisses, "Number of times the requested source is in cache but the schema is not in cache during schema inference") \
M(SchemaInferenceCacheNumRowsMisses, "Number of times the requested source is in cache but the number of rows is not in cache while count from files") \
M(SchemaInferenceCacheEvictions, "Number of times a schema from cache was evicted due to overflow") \
M(SchemaInferenceCacheInvalidations, "Number of times a schema in cache became invalid due to changes in data") \
Expand Down Expand Up @@ -570,7 +570,7 @@ The server successfully detected this situation and will download merged part fr
\
M(ReadTaskRequestsSent, "The number of callbacks requested from the remote server back to the initiator server to choose the read task (for s3Cluster table function and similar). Measured on the remote server side.") \
M(MergeTreeReadTaskRequestsSent, "The number of callbacks requested from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the remote server side.") \
M(MergeTreeAllRangesAnnouncementsSent, "The number of announcement sent from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
M(MergeTreeAllRangesAnnouncementsSent, "The number of announcements sent from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
M(ReadTaskRequestsSentElapsedMicroseconds, "Time spent in callbacks requested from the remote server back to the initiator server to choose the read task (for s3Cluster table function and similar). Measured on the remote server side.") \
M(MergeTreeReadTaskRequestsSentElapsedMicroseconds, "Time spent in callbacks requested from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the remote server side.") \
M(MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, "Time spent in sending the announcement from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
Expand All @@ -586,6 +586,8 @@ The server successfully detected this situation and will download merged part fr
M(LogWarning, "Number of log messages with level Warning") \
M(LogError, "Number of log messages with level Error") \
M(LogFatal, "Number of log messages with level Fatal") \
\
M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \

#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)
Expand Down
14 changes: 13 additions & 1 deletion src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
Expand Up @@ -23,6 +23,11 @@
#include <fmt/core.h>
#include <fmt/format.h>

namespace ProfileEvents
{
extern const Event ParallelReplicasUsedCount;
}

namespace DB
{
struct Part
Expand Down Expand Up @@ -573,7 +578,14 @@ ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelR
initialize();
}

return pimpl->handleRequest(std::move(request));
auto response = pimpl->handleRequest(std::move(request));
if (!response.finish)
{
if (replicas_used.insert(request.replica_num).second)
ProfileEvents::increment(ProfileEvents::ParallelReplicasUsedCount);
}

return response;
}

void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica_number)
Expand Down
Expand Up @@ -39,6 +39,7 @@ class ParallelReplicasReadingCoordinator
std::atomic<bool> initialized{false};
std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation
std::set<size_t> replicas_used;

/// To initialize `pimpl` we need to know the coordinator mode. We can know it only from initial announcement or regular request.
/// The problem is `markReplicaAsUnavailable` might be called before any of these requests happened.
Expand Down
@@ -0,0 +1,8 @@
100 4950
1
89
90
91
92
93
1
25 changes: 25 additions & 0 deletions tests/queries/0_stateless/02950_parallel_replicas_used_count.sql
@@ -0,0 +1,25 @@
DROP TABLE IF EXISTS test;

CREATE TABLE test (k UInt64, v String)
ENGINE = MergeTree
ORDER BY k;

INSERT INTO test SELECT number, toString(number) FROM numbers(100);

SET allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 3, prefer_localhost_replica = 0, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';

-- default coordinator
SELECT count(), sum(k)
FROM test
SETTINGS log_comment = '02950_parallel_replicas_used_replicas_count';

SYSTEM FLUSH LOGS;
SELECT ProfileEvents['ParallelReplicasUsedCount'] FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0;

-- In order coordinator
SELECT k FROM test order by k limit 5 offset 89 SETTINGS optimize_read_in_order=1, log_comment='02950_parallel_replicas_used_replicas_count_2';

SYSTEM FLUSH LOGS;
SELECT ProfileEvents['ParallelReplicasUsedCount'] FROM system.query_log WHERE type = 'QueryFinish' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = '02950_parallel_replicas_used_replicas_count_2' AND type = 'QueryFinish' AND initial_query_id = query_id) SETTINGS allow_experimental_parallel_reading_from_replicas=0;

DROP TABLE test;