From 605a6bae4098482c56fd3823ce35e48ca1bd4605 Mon Sep 17 00:00:00 2001 From: Andrew Slabko Date: Thu, 13 Nov 2025 19:18:16 +0100 Subject: [PATCH 1/2] Clean up and simplify Begin/EndInsert - Renamed `InsertData` to `SendInsertBlock`, to prevent it from appearing first when searching for `Insert...` - Removed `EndInsert` with a block. Instead, users should now use `SendInsertBlock` followed by `EndInsert`. - Removed `ReceivePreparePackets` and replaced it with a call to `ReceivePacket`, similar to what is done in `Insert`. - Renamed `inserting` to `inserting_` to follow the member variable naming convention. --- clickhouse/client.cpp | 168 ++++++++++++------------------------------ clickhouse/client.h | 3 +- ut/client_ut.cpp | 4 +- 3 files changed, 51 insertions(+), 124 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 36a4e74d..a56b5614 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -163,7 +163,7 @@ class Client::Impl { Block BeginInsert(Query query); - void InsertData(const Block& block); + void SendInsertBlock(const Block& block); void EndInsert(); @@ -181,7 +181,6 @@ class Client::Impl { bool Handshake(); bool ReceivePacket(uint64_t* server_packet = nullptr); - bool ReceivePreparePackets(uint64_t* server_packet = nullptr); void SendQuery(const Query& query, bool finalize = true); void FinalizeQuery(); @@ -215,7 +214,6 @@ class Client::Impl { } private: - bool inserting; /// In case of network errors tries to reconnect to server and /// call fuc several times. void RetryGuard(std::function func); @@ -259,6 +257,8 @@ class Client::Impl { std::optional current_endpoint_; ServerInfo server_info_; + + bool inserting_; }; ClientOptions modifyClientOptions(ClientOptions opts) @@ -294,9 +294,10 @@ Client::Impl::~Impl() { } void Client::Impl::ExecuteQuery(Query query) { - if (inserting) { - throw ProtocolError("cannot execute query while inserting"); + if (inserting_) { + throw ValidationError("cannot execute query while inserting"); } + EnsureNull en(static_cast(&query), &events_); if (options_.ping_before_query) { @@ -312,9 +313,10 @@ void Client::Impl::ExecuteQuery(Query query) { void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) { - if (inserting) { - throw ProtocolError("cannot execute query while inserting"); + if (inserting_) { + throw ValidationError("cannot execute query while inserting"); } + if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables"); } @@ -378,15 +380,18 @@ std::string NameToQueryString(const std::string &input) } void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) { - if (inserting) { - throw ProtocolError("cannot execute query while inserting"); + if (inserting_) { + throw ValidationError("cannot execute query while inserting, use SendInsertData instead"); } + if (options_.ping_before_query) { RetryGuard([this]() { Ping(); }); } + inserting_ = true; + std::stringstream fields_section; - const auto num_columns = block.GetColumnCount(); + const auto num_columns = block.GetColumnCount(); for (unsigned int i = 0; i < num_columns; ++i) { if (i == num_columns - 1) { @@ -399,41 +404,34 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id); SendQuery(query); - uint64_t server_packet; - // Receive data packet. - while (true) { - bool ret = ReceivePacket(&server_packet); - - if (!ret) { - throw ProtocolError("fail to receive data packet"); - } + // Wait for a data packet and return + uint64_t server_packet = 0; + while (ReceivePacket(&server_packet)) { if (server_packet == ServerCodes::Data) { - break; - } - if (server_packet == ServerCodes::Progress) { - continue; + SendData(block); + EndInsert(); + return; } } - // Send data. - inserting = true; - SendData(block); - EndInsert(); + throw ProtocolError("fail to receive data packet"); } -void Client::Impl::InsertData(const Block& block) { - if (!inserting) { - throw ProtocolError("illegal call to InsertData without first calling BeginInsert"); +void Client::Impl::SendInsertBlock(const Block& block) { + if (!inserting_) { + throw ValidationError("illegal call to InsertData without first calling BeginInsert"); } + SendData(block); } void Client::Impl::EndInsert() { - if (!inserting) return; + if (!inserting_) { + return; + } // Send empty block as marker of end of data. SendData(Block()); - inserting = false; // Wait for EOS. uint64_t eos_packet{0}; @@ -446,12 +444,14 @@ void Client::Impl::EndInsert() { throw ProtocolError(std::string{"unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "} + (eos_packet ? std::to_string(eos_packet) : "nothing") + ")"); } + inserting_ = false; } void Client::Impl::Ping() { - if (inserting) { - throw ProtocolError("cannot execute query while inserting"); + if (inserting_) { + throw ValidationError("cannot execute query while inserting"); } + WireFormat::WriteUInt64(*output_, ClientCodes::Ping); output_->Flush(); @@ -465,7 +465,7 @@ void Client::Impl::Ping() { void Client::Impl::ResetConnection() { InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value())); - inserting = false; + inserting_ = false; if (!Handshake()) { throw ProtocolError("fail to connect to " + options_.host); @@ -685,78 +685,6 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { } } -bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) { - uint64_t packet_type = 0; - - while (true) { - if (!WireFormat::ReadVarint64(*input_, &packet_type)) { - throw std::runtime_error("unexpected package type " + - std::to_string((int)packet_type) + " for insert query"); - } - if (server_packet) { - *server_packet = packet_type; - } - - switch (packet_type) { - case ServerCodes::Data: { - if (!ReceiveData()) { - throw ProtocolError("can't read data packet from input stream"); - } - return true; - } - - case ServerCodes::Exception: { - ReceiveException(); - return false; - } - - case ServerCodes::ProfileInfo: - case ServerCodes::Progress: - case ServerCodes::Pong: - case ServerCodes::Hello: - continue; - - case ServerCodes::Log: { - // log tag - if (!WireFormat::SkipString(*input_)) { - return false; - } - Block block; - - // Use uncompressed stream since log blocks usually contain only one row - if (!ReadBlock(*input_, &block)) { - return false; - } - - if (events_) { - events_->OnServerLog(block); - } - continue; - } - - case ServerCodes::TableColumns: { - // external table name - if (!WireFormat::SkipString(*input_)) { - return false; - } - - // columns metadata - if (!WireFormat::SkipString(*input_)) { - return false; - } - continue; - } - - // No others expected. - case ServerCodes::EndOfStream: - case ServerCodes::ProfileEvents: - default: - throw UnimplementedError("unimplemented " + std::to_string((int)packet_type)); - break; - } - } -} - bool Client::Impl::ReadBlock(InputStream& input, Block* block) { // Additional information about block. if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { @@ -1194,15 +1122,18 @@ void Client::Impl::RetryGuard(std::function func) { } Block Client::Impl::BeginInsert(Query query) { - if (inserting) { - throw ProtocolError("cannot execute query while inserting"); + if (inserting_) { + throw ValidationError("cannot execute query while inserting"); } + EnsureNull en(static_cast(&query), &events_); if (options_.ping_before_query) { RetryGuard([this]() { Ping(); }); } + inserting_ = true; + // Create a callback to extract the block with the proper query columns. Block block; query.OnData([&block](const Block& b) { @@ -1212,13 +1143,15 @@ Block Client::Impl::BeginInsert(Query query) { SendQuery(query.GetText()); - // Receive data packet but keep the query/connection open. - if (!ReceivePreparePackets()) { - throw std::runtime_error("fail to receive data packet"); + // Wait for a data packet and return + uint64_t server_packet = 0; + while (ReceivePacket(&server_packet)) { + if (server_packet == ServerCodes::Data) { + return block; + } } - inserting = true; - return block; + throw ProtocolError("fail to receive data packet"); } Client::Client(const ClientOptions& opts) @@ -1293,13 +1226,8 @@ Block Client::BeginInsert(const std::string& query, const std::string& query_id) return impl_->BeginInsert(Query(query, query_id)); } -void Client::InsertData(const Block& block) { - impl_->InsertData(block); -} - -void Client::EndInsert(const Block& block) { - impl_->InsertData(block); - impl_->EndInsert(); +void Client::SendInsertBlock(const Block& block) { + impl_->SendInsertBlock(block); } void Client::EndInsert() { diff --git a/clickhouse/client.h b/clickhouse/client.h index 73a974f7..3ca9133b 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -278,11 +278,10 @@ class Client { Block BeginInsert(const std::string& query, const std::string& query_id); /// Insert data using a \p block returned by \p BeginInsert. - void InsertData(const Block& block); + void SendInsertBlock(const Block& block); /// End an \p INSERT session started by \p BeginInsert. void EndInsert(); - void EndInsert(const Block& block); /// Ping server for aliveness. void Ping(); diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 514b8eae..e6106d25 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -402,7 +402,7 @@ TEST_P(ClientCase, InsertData) { f->Append(td.f); } block.RefreshRowCount(); - client_->InsertData(block); + client_->SendInsertBlock(block); block.Clear(); // Insert some more values. @@ -412,7 +412,7 @@ TEST_P(ClientCase, InsertData) { f->Append(td.f); } block.RefreshRowCount(); - client_->InsertData(block); + client_->SendInsertBlock(block); block.Clear(); client_->EndInsert(); // Second call to EndInsert should be no-op. From 8b96f8c62d2c6455eec001e8d8336f17861ecd5a Mon Sep 17 00:00:00 2001 From: Andrew Slabko Date: Thu, 13 Nov 2025 21:25:23 +0100 Subject: [PATCH 2/2] Move `BeginInsert` to match the declaration order Move the `BeginInsert` body to match the order of the class declaration, placing it between `Insert` and `SendInsertData`. --- clickhouse/client.cpp | 66 +++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index a56b5614..0fdfc49d 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -417,6 +417,39 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer throw ProtocolError("fail to receive data packet"); } +Block Client::Impl::BeginInsert(Query query) { + if (inserting_) { + throw ValidationError("cannot execute query while inserting"); + } + + EnsureNull en(static_cast(&query), &events_); + + if (options_.ping_before_query) { + RetryGuard([this]() { Ping(); }); + } + + inserting_ = true; + + // Create a callback to extract the block with the proper query columns. + Block block; + query.OnData([&block](const Block& b) { + block = std::move(b); + return true; + }); + + SendQuery(query.GetText()); + + // Wait for a data packet and return + uint64_t server_packet = 0; + while (ReceivePacket(&server_packet)) { + if (server_packet == ServerCodes::Data) { + return block; + } + } + + throw ProtocolError("fail to receive data packet"); +} + void Client::Impl::SendInsertBlock(const Block& block) { if (!inserting_) { throw ValidationError("illegal call to InsertData without first calling BeginInsert"); @@ -1121,39 +1154,6 @@ void Client::Impl::RetryGuard(std::function func) { } } -Block Client::Impl::BeginInsert(Query query) { - if (inserting_) { - throw ValidationError("cannot execute query while inserting"); - } - - EnsureNull en(static_cast(&query), &events_); - - if (options_.ping_before_query) { - RetryGuard([this]() { Ping(); }); - } - - inserting_ = true; - - // Create a callback to extract the block with the proper query columns. - Block block; - query.OnData([&block](const Block& b) { - block = std::move(b); - return true; - }); - - SendQuery(query.GetText()); - - // Wait for a data packet and return - uint64_t server_packet = 0; - while (ReceivePacket(&server_packet)) { - if (server_packet == ServerCodes::Data) { - return block; - } - } - - throw ProtocolError("fail to receive data packet"); -} - Client::Client(const ClientOptions& opts) : options_(opts) , impl_(new Impl(opts))