Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumentation of callbacks (read tasks of parallel replicas) for distributed queries #46313

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@
M(ThreadsInOvercommitTracker, "Number of waiting threads inside of OvercommitTracker") \
M(IOUringPendingEvents, "Number of io_uring SQEs waiting to be submitted") \
M(IOUringInFlightEvents, "Number of io_uring SQEs in flight") \
M(ReadTaskRequestsSent, "The current number of callback requests in flight from the remote server back to the initiator server to choose the read task (for s3Cluster table function and similar). Measured on the remote server side.") \
M(MergeTreeReadTaskRequestsSent, "The current number of callback requests in flight from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the remote server side.") \
M(MergeTreeAllRangesAnnouncementsSent, "The current number of announcement being sent in flight from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.")

namespace CurrentMetrics
{
Expand Down
10 changes: 10 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,16 @@ The server successfully detected this situation and will download merged part fr
M(IOUringSQEsResubmits, "Total number of io_uring SQE resubmits performed") \
M(IOUringCQEsCompleted, "Total number of successfully completed io_uring CQEs") \
M(IOUringCQEsFailed, "Total number of completed io_uring CQEs with failures") \
\
M(ReadTaskRequestsReceived, "The number of callbacks requested from the remote server back to the initiator server to choose the read task (for s3Cluster table function and similar). Measured on the initiator server side.") \
M(MergeTreeReadTaskRequestsReceived, "The number of callbacks requested from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the initiator server side.") \
\
M(ReadTaskRequestsSent, "The number of callbacks requested from the remote server back to the initiator server to choose the read task (for s3Cluster table function and similar). Measured on the remote server side.") \
M(MergeTreeReadTaskRequestsSent, "The number of callbacks requested from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the remote server side.") \
M(MergeTreeAllRangesAnnouncementsSent, "The number of announcement sent from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
M(ReadTaskRequestsSentElapsedMicroseconds, "Time spent in callbacks requested from the remote server back to the initiator server to choose the read task (for s3Cluster table function and similar). Measured on the remote server side.") \
M(MergeTreeReadTaskRequestsSentElapsedMicroseconds, "Time spent in callbacks requested from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the remote server side.") \
M(MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, "Time spent in sending the announcement from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.")

namespace ProfileEvents
{
Expand Down
12 changes: 9 additions & 3 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
#include <Columns/ColumnConst.h>
#include <Common/CurrentThread.h>
#include "Core/Protocol.h"
#include "IO/ReadHelpers.h"
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
Expand All @@ -25,7 +23,6 @@
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <IO/ReadBufferFromString.h>


namespace CurrentMetrics
Expand All @@ -34,6 +31,12 @@ namespace CurrentMetrics
extern const Metric ActiveSyncDrainedConnections;
}

namespace ProfileEvents
{
extern const Event ReadTaskRequestsReceived;
extern const Event MergeTreeReadTaskRequestsReceived;
}

namespace DB
{

Expand Down Expand Up @@ -490,6 +493,8 @@ void RemoteQueryExecutor::processReadTaskRequest()
{
if (!task_iterator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");

ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
auto response = (*task_iterator)();
connections->sendReadTaskResponse(response);
}
Expand All @@ -499,6 +504,7 @@ void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest re
if (!parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");

ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
auto response = parallel_reading_coordinator->handleRequest(std::move(request));
connections->sendMergeTreeReadTaskResponse(response);
}
Expand Down
32 changes: 30 additions & 2 deletions src/Server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ using namespace DB;
namespace CurrentMetrics
{
extern const Metric QueryThread;
extern const Metric ReadTaskRequestsSent;
extern const Metric MergeTreeReadTaskRequestsSent;
extern const Metric MergeTreeAllRangesAnnouncementsSent;
}

namespace ProfileEvents
{
extern const Event ReadTaskRequestsSent;
extern const Event MergeTreeReadTaskRequestsSent;
extern const Event MergeTreeAllRangesAnnouncementsSent;
extern const Event ReadTaskRequestsSentElapsedMicroseconds;
extern const Event MergeTreeReadTaskRequestsSentElapsedMicroseconds;
extern const Event MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds;
}

namespace
Expand Down Expand Up @@ -355,34 +368,49 @@ void TCPHandler::runImpl()
/// This callback is needed for requesting read tasks inside pipeline for distributed processing
query_context->setReadTaskCallback([this]() -> String
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::ReadTaskRequestsSent);

std::lock_guard lock(task_callback_mutex);

if (state.is_cancelled)
return {};

sendReadTaskRequestAssumeLocked();
return receiveReadTaskResponseAssumeLocked();
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsSent);
auto res = receiveReadTaskResponseAssumeLocked();
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsSentElapsedMicroseconds, watch.elapsedMicroseconds());
return res;
});

query_context->setMergeTreeAllRangesCallback([this](InitialAllRangesAnnouncement announcement)
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent);
std::lock_guard lock(task_callback_mutex);

if (state.is_cancelled)
return;

sendMergeTreeAllRangesAnnounecementAssumeLocked(announcement);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSent);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, watch.elapsedMicroseconds());
});

query_context->setMergeTreeReadTaskCallback([this](ParallelReadRequest request) -> std::optional<ParallelReadResponse>
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex);

if (state.is_cancelled)
return std::nullopt;

sendMergeTreeReadTaskRequestAssumeLocked(std::move(request));
return receivePartitionMergeTreeReadTaskResponseAssumeLocked();
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsSent);
auto res = receivePartitionMergeTreeReadTaskResponseAssumeLocked();
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsSentElapsedMicroseconds, watch.elapsedMicroseconds());
return res;
});

/// Processing Query
Expand Down