Skip to content

Commit

Permalink
Merge 'send-progress' into 'cnch-dev'
Browse files Browse the repository at this point in the history
feat(clickhousech@m-17104648): send progress in optimizer mode

See merge request: https://code.byted.org/dp/ClickHouse/merge_requests/19287

# Conflicts:
#	src/Core/Settings.h
#	src/Interpreters/DistributedStages/PlanSegmentExecutor.cpp
#	src/Interpreters/DistributedStages/PlanSegmentManagerRpcService.cpp
#	src/Interpreters/SegmentScheduler.cpp
  • Loading branch information
jay-bytedance committed May 8, 2024
1 parent 61950a6 commit 78a5c07
Show file tree
Hide file tree
Showing 49 changed files with 818 additions and 190 deletions.
6 changes: 5 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ enum PreloadLevelSettings : UInt64
M(UInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \
M(UInt64, interactive_delay, 100000, "The interval in microseconds to check if the request is cancelled, and to send progress info.", 0) \
M(UInt64, interactive_delay_optimizer_mode, 0, "The interval(in optimizer mode) in microseconds to check if the request is cancelled, and to send progress info.", 0) \
M(Seconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.", 0) \
M(Milliseconds, \
connect_timeout_with_failover_ms, \
Expand Down Expand Up @@ -1724,9 +1725,12 @@ enum PreloadLevelSettings : UInt64
0) \
M(Bool, exchange_enable_keep_order_parallel_shuffle, false, "Whether enable parallel shuffle when exchange need keep order", 0) \
M(Bool, exchange_enable_force_remote_mode, false, "Force exchange data transfer through network", 0) \
M(Bool, enable_wait_for_post_processing, false, "Whether a query needs to wait for post processing rpcs done before end", 0) \
M(Bool, exchange_enable_force_keep_order, false, "Force exchange keep data order", 0) \
M(Bool, exchange_force_use_buffer, false, "Force exchange use buffer as possible", 0) \
M(UInt64, distributed_query_wait_exception_ms, 1000, "Wait final planSegment exception from segmentScheduler.", 0) \
M(Bool, exchange_enable_node_stable_hash, false, "Force exchange use buffer as possible", 0) \
M(UInt64, wait_for_post_processing_timeout_ms, 1000, "Timeout for waiting post processing rpc from workers.", 0) \
M(UInt64, distributed_query_wait_exception_ms, 1000,"Wait final planSegment exception from segmentScheduler.", 0) \
M(UInt64, distributed_max_parallel_size, false, "Max distributed execution parallel size", 0) \
\
/** Runtime Filter settings */ \
Expand Down
4 changes: 0 additions & 4 deletions src/DataStreams/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,6 @@ void RemoteQueryExecutor::parseQueryWorkerMetrics(const QueryWorkerMetricElement
context->getQueryContext()->insertQueryWorkerMetricsElement(*element);
else if (context->getServerType() == ServerType::cnch_worker) /// For cnch aggre worker, store the elements and forward them to cnch server
context->getQueryContext()->addQueryWorkerMetricElements(std::move(element));

auto internal_progress_callback = context->getInternalProgressCallback();
if (internal_progress_callback)
internal_progress_callback({element->read_rows, element->read_bytes, 0, 0, element->read_cached_bytes});
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/DataStreams/copyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::func
copyDataImpl(from, to, is_cancelled, doNothing);
}

void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress)
void copyData(
IBlockInputStream & from,
IBlockOutputStream & to,
const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress)
{
copyDataImpl(from, to, is_cancelled, progress);
}
Expand Down
8 changes: 5 additions & 3 deletions src/DataStreams/copyData.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::func

void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);

void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress);

void copyData(
IBlockInputStream & from,
IBlockOutputStream & to,
const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress);
}
136 changes: 135 additions & 1 deletion src/IO/Progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
*/

#include "Progress.h"
#include <atomic>

#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Protos/plan_segment_manager.pb.h>


namespace DB
Expand Down Expand Up @@ -86,6 +88,99 @@ void ProgressValues::writeJSON(WriteBuffer & out) const
writeCString("\"}", out);
}

Protos::Progress ProgressValues::toProto() const
{
Protos::Progress proto;
proto.set_read_rows(this->read_rows);
proto.set_read_bytes(this->read_bytes);
proto.set_read_raw_bytes(this->read_raw_bytes);
proto.set_written_rows(this->written_rows);
proto.set_written_bytes(this->written_bytes);
proto.set_total_rows_to_read(this->total_rows_to_read);
proto.set_total_raw_bytes_to_read(this->total_raw_bytes_to_read);
proto.set_disk_cache_read_bytes(this->disk_cache_read_bytes);
return proto;
}

void ProgressValues::fromProto(const Protos::Progress & progress)
{
this->read_rows = progress.read_rows();
this->read_bytes = progress.read_bytes();
this->read_raw_bytes = progress.read_raw_bytes();
this->written_rows = progress.written_rows();
this->written_bytes = progress.written_bytes();
this->total_rows_to_read = progress.total_rows_to_read();
this->total_raw_bytes_to_read = progress.total_raw_bytes_to_read();
this->disk_cache_read_bytes = progress.disk_cache_read_bytes();
}

bool ProgressValues::empty() const
{
return read_rows == 0 && read_bytes == 0 && read_raw_bytes == 0 && written_rows == 0 && written_bytes == 0 && total_rows_to_read == 0
&& disk_cache_read_bytes == 0 && total_raw_bytes_to_read == 0;
}

ProgressValues ProgressValues::operator-(const ProgressValues & other) const
{
ProgressValues v;
v.read_rows = read_rows >= other.read_rows ? read_rows - other.read_rows : 0;
v.read_bytes = read_bytes >= other.read_bytes ? read_bytes - other.read_bytes : 0;
v.read_raw_bytes = read_raw_bytes >= other.read_raw_bytes ? read_raw_bytes - other.read_raw_bytes : 0;

v.written_rows = written_rows >= other.written_rows ? written_rows - other.written_rows : 0;
v.written_bytes = written_bytes >= other.written_bytes ? written_bytes - other.written_bytes : 0;

v.disk_cache_read_bytes
= disk_cache_read_bytes >= other.disk_cache_read_bytes ? disk_cache_read_bytes - other.disk_cache_read_bytes : 0;

v.total_rows_to_read = total_rows_to_read >= other.total_rows_to_read ? total_rows_to_read - other.total_rows_to_read : 0;
v.total_raw_bytes_to_read
= total_raw_bytes_to_read >= other.total_raw_bytes_to_read ? total_raw_bytes_to_read - other.total_raw_bytes_to_read : 0;

return v;
}

ProgressValues ProgressValues::operator+(const ProgressValues & other) const
{
ProgressValues v;
v.read_rows = read_rows + other.read_rows;
v.read_bytes = read_bytes + other.read_bytes;
v.read_raw_bytes = read_raw_bytes + other.read_raw_bytes;

v.written_rows = written_rows + other.written_rows;
v.written_bytes = written_bytes + other.written_bytes;

v.disk_cache_read_bytes = disk_cache_read_bytes + other.disk_cache_read_bytes;

v.total_rows_to_read = total_rows_to_read + other.total_rows_to_read;
v.total_raw_bytes_to_read = total_raw_bytes_to_read + other.total_raw_bytes_to_read;

return v;
}

bool ProgressValues::operator==(const ProgressValues & other) const
{
return read_rows == other.read_rows && read_bytes == other.read_bytes && read_raw_bytes == other.read_raw_bytes
&& written_rows == other.written_rows && written_bytes == other.written_bytes && total_rows_to_read == other.total_rows_to_read
&& disk_cache_read_bytes == other.disk_cache_read_bytes && total_raw_bytes_to_read == other.total_raw_bytes_to_read;
}

String ProgressValues::toString() const
{
return fmt::format(
"progress[read_rows={}, read_bytes={}, read_raw_bytes={}, written_rows={}, written_bytes={}, disk_cache_read_bytes={}, "
"total_rows_to_read={}, "
"total_raw_bytes_to_read={}]",
read_rows,
read_bytes,
read_raw_bytes,
written_rows,
written_bytes,
disk_cache_read_bytes,
total_rows_to_read,
total_raw_bytes_to_read);
}

bool Progress::incrementPiecewiseAtomically(const Progress & rhs)
{
read_rows += rhs.read_rows;
Expand Down Expand Up @@ -202,4 +297,43 @@ void Progress::writeJSON(WriteBuffer & out) const
getValues().writeJSON(out);
}

Protos::Progress Progress::toProto() const
{
Protos::Progress proto;
proto.set_read_rows(read_rows.load(std::memory_order_relaxed));
proto.set_read_bytes(read_bytes.load(std::memory_order_relaxed));
proto.set_read_raw_bytes(read_raw_bytes.load(std::memory_order_relaxed));
proto.set_total_rows_to_read(total_rows_to_read.load(std::memory_order_relaxed));
proto.set_total_raw_bytes_to_read(total_raw_bytes_to_read.load(std::memory_order_relaxed));
proto.set_disk_cache_read_bytes(disk_cache_read_bytes.load(std::memory_order_relaxed));
proto.set_written_rows(written_rows.load(std::memory_order_relaxed));
proto.set_written_bytes(written_bytes.load(std::memory_order_relaxed));
proto.set_written_elapsed_milliseconds(written_elapsed_milliseconds.load(std::memory_order_relaxed));
return proto;
}

void Progress::fromProto(const Protos::Progress & progress)
{
read_rows.store(progress.read_rows(), std::memory_order_relaxed);
read_bytes.store(progress.read_bytes(), std::memory_order_relaxed);
read_raw_bytes.store(progress.read_raw_bytes(), std::memory_order_relaxed);

total_rows_to_read.store(progress.total_rows_to_read(), std::memory_order_relaxed);
total_raw_bytes_to_read.store(progress.total_raw_bytes_to_read(), std::memory_order_relaxed);
disk_cache_read_bytes.store(progress.disk_cache_read_bytes(), std::memory_order_relaxed);

written_rows.store(progress.written_rows(), std::memory_order_relaxed);
written_bytes.store(progress.written_bytes(), std::memory_order_relaxed);

written_elapsed_milliseconds.store(progress.written_elapsed_milliseconds(), std::memory_order_relaxed);
}

bool Progress::empty() const
{
return read_rows.load(std::memory_order_relaxed) == 0 && read_bytes.load(std::memory_order_relaxed) == 0
&& read_raw_bytes.load(std::memory_order_relaxed) == 0 && total_rows_to_read.load(std::memory_order_relaxed)
&& total_raw_bytes_to_read.load(std::memory_order_relaxed) == 0 && disk_cache_read_bytes.load(std::memory_order_relaxed)
&& written_rows.load(std::memory_order_relaxed) && written_bytes.load(std::memory_order_relaxed) == 0
&& written_elapsed_milliseconds.load(std::memory_order_relaxed);
}
}
24 changes: 24 additions & 0 deletions src/IO/Progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <common/types.h>

#include <Core/ProtocolDefines.h>
#include <Protos/plan_segment_manager.pb.h>
#include <Common/Stopwatch.h>


Expand Down Expand Up @@ -53,6 +54,13 @@ struct ProgressValues
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
void writeJSON(WriteBuffer & out) const;
Protos::Progress toProto() const;
void fromProto(const Protos::Progress & progress);
bool empty() const;
ProgressValues operator-(const ProgressValues & other) const;
ProgressValues operator+(const ProgressValues & other) const;
bool operator==(const ProgressValues & other) const;
String toString() const;
};

struct ReadProgress
Expand Down Expand Up @@ -122,6 +130,18 @@ struct Progress
, written_elapsed_milliseconds(written_elapsed_milliseconds_)
, disk_cache_read_bytes(disk_cache_read_bytes_) {}

explicit Progress(const ProgressValues & v)
: read_rows(v.read_rows)
, read_bytes(v.read_bytes)
, read_raw_bytes(v.read_raw_bytes)
, total_rows_to_read(v.total_rows_to_read)
, total_raw_bytes_to_read(v.total_raw_bytes_to_read)
, written_rows(v.written_rows)
, written_bytes(v.written_bytes)
, disk_cache_read_bytes(v.disk_cache_read_bytes)
{
}

explicit Progress(ReadProgress read_progress)
: read_rows(read_progress.read_rows)
, read_bytes(read_progress.read_bytes)
Expand All @@ -141,6 +161,10 @@ struct Progress
/// Progress in JSON format (single line, without whitespaces) is used in HTTP headers.
void writeJSON(WriteBuffer & out) const;

Protos::Progress toProto() const;
void fromProto(const Protos::Progress & progress);
bool empty() const;

/// Each value separately is changed atomically (but not whole object).
bool incrementPiecewiseAtomically(const Progress & rhs);

Expand Down
11 changes: 0 additions & 11 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1587,7 +1587,6 @@ String Context::formatUserName(const String & name)
{
//CNCH multi-tenant user name pattern from gateway client: {tenant_id}`{user_name}
String user = name;
bool pushed = false;
if (auto pos = user.find('`'); pos != String::npos)
{
auto tenant_id = String(user.c_str(), pos);
Expand Down Expand Up @@ -2602,16 +2601,6 @@ ProgressCallback Context::getProgressCallback() const
return progress_callback;
}

void Context::setInternalProgressCallback(ProgressCallback callback)
{
internal_progress_callback = callback;
}

ProgressCallback Context::getInternalProgressCallback() const
{
return internal_progress_callback;
}

void Context::setProcessListEntry(std::shared_ptr<ProcessListEntry> process_list_entry_)
{
process_list_entry = process_list_entry_;
Expand Down
4 changes: 0 additions & 4 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ class Context : public std::enable_shared_from_this<Context>

using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
ProgressCallback internal_progress_callback;

using FileProgressCallback = std::function<void(const FileProgress & progress)>;
FileProgressCallback file_progress_callback; /// Callback for tracking progress of file loading.
Expand Down Expand Up @@ -1067,9 +1066,6 @@ class Context : public std::enable_shared_from_this<Context>
/// Used in InterpreterSelectQuery to pass it to the IBlockInputStream.
ProgressCallback getProgressCallback() const;

void setInternalProgressCallback(ProgressCallback callback);
ProgressCallback getInternalProgressCallback() const;

void setFileProgressCallback(FileProgressCallback && callback) { file_progress_callback = callback; }
FileProgressCallback getFileProgressCallback() const { return file_progress_callback; }

Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/DAGGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ struct DAGGraph

PlanSegment * getPlanSegmentPtr(size_t id);

/// all segments containing only table scan
Source sources;
/// all segments contain at least table scan
/// all segments containing at least one table scan
Source any_tables;
size_t final = std::numeric_limits<size_t>::max();
std::set<size_t> scheduled_segments;
Expand Down

0 comments on commit 78a5c07

Please sign in to comment.