diff --git a/README.md b/README.md index 3b9f7303..bd485753 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,47 @@ target_link_libraries(${PROJECT_NAME} PRIVATE clickhouse-cpp-lib) - run `rm -rf build && cmake -B build -S . && cmake --build build -j32` to remove remainders of the previous builds, run CMake and build the application. The generated binary is located in location `build/application-example`. +## Batch Insertion + +In addition to the `Insert` method, which inserts all the data in a block in a +single call, you can use the `BeginInsert` / `InsertData` / `EndInsert` +pattern to insert batches of data. This can be useful for managing larger data +sets without inflating memory with the entire set. + +To use it pass `BeginInsert` an `INSERT` statement ending in `VALUES` but with +no actual values. Use the resulting `Block` to append batches of data, sending +each to the sever with `InsertData`. Finally, call `EndInsert` (or let the +client go out of scope) to signal the server that insertion is complete. +Example: + +```cpp +// Start the insertion. +auto block = client->BeginInsert("INSERT INTO foo (id, name) VALUES"); + +// Grab the columns from the block. +auto col1 = block[0]->As(); +auto col2 = block[1]->As(); + +// Add a couple of records to the block. +col1.Append(1); +col1.Append(2); +col2.Append("holden"); +col2.Append("naomi"); + +// Send those records. +block.RefreshRowCount(); +client->InsertData(block); +block.Clear(); + +// Add another record. +col1.Append(3); +col2.Append("amos"); + +// Send it and finish. +block.RefreshRowCount(); +client->EndInsert(block); +``` + ## Thread-safety ⚠ Please note that `Client` instance is NOT thread-safe. I.e. you must create a separate `Client` for each thread or utilize some synchronization techniques. ⚠ diff --git a/clickhouse/block.h b/clickhouse/block.h index 8a42ae65..a32cd029 100644 --- a/clickhouse/block.h +++ b/clickhouse/block.h @@ -85,10 +85,10 @@ class Block { return columns_.at(idx).name; } - /// Convinience method to wipe out all rows from all columns + /// Convenience method to wipe out all rows from all columns void Clear(); - /// Convinience method to do Reserve() on all columns + /// Convenience method to do Reserve() on all columns void Reserve(size_t new_cap); /// Reference to column by index in the block. diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index da7f9774..36a4e74d 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -161,6 +161,12 @@ class Client::Impl { void Insert(const std::string& table_name, const std::string& query_id, const Block& block); + Block BeginInsert(Query query); + + void InsertData(const Block& block); + + void EndInsert(); + void Ping(); void ResetConnection(); @@ -175,6 +181,7 @@ class Client::Impl { bool Handshake(); bool ReceivePacket(uint64_t* server_packet = nullptr); + bool ReceivePreparePackets(uint64_t* server_packet = nullptr); void SendQuery(const Query& query, bool finalize = true); void FinalizeQuery(); @@ -208,6 +215,7 @@ class Client::Impl { } private: + bool inserting; /// In case of network errors tries to reconnect to server and /// call fuc several times. void RetryGuard(std::function func); @@ -280,10 +288,15 @@ Client::Impl::Impl(const ClientOptions& opts, } } -Client::Impl::~Impl() -{ } +Client::Impl::~Impl() { + // Wrap up an insert if one is in progress. + EndInsert(); +} void Client::Impl::ExecuteQuery(Query query) { + if (inserting) { + throw ProtocolError("cannot execute query while inserting"); + } EnsureNull en(static_cast(&query), &events_); if (options_.ping_before_query) { @@ -299,6 +312,9 @@ void Client::Impl::ExecuteQuery(Query query) { void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) { + if (inserting) { + throw ProtocolError("cannot execute query while inserting"); + } if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables"); } @@ -362,6 +378,9 @@ std::string NameToQueryString(const std::string &input) } void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) { + if (inserting) { + throw ProtocolError("cannot execute query while inserting"); + } if (options_.ping_before_query) { RetryGuard([this]() { Ping(); }); } @@ -397,10 +416,24 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer } // Send data. + inserting = true; SendData(block); - // Send empty block as marker of - // end of data. + EndInsert(); +} + +void Client::Impl::InsertData(const Block& block) { + if (!inserting) { + throw ProtocolError("illegal call to InsertData without first calling BeginInsert"); + } + SendData(block); +} + +void Client::Impl::EndInsert() { + if (!inserting) return; + + // Send empty block as marker of end of data. SendData(Block()); + inserting = false; // Wait for EOS. uint64_t eos_packet{0}; @@ -416,6 +449,9 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer } void Client::Impl::Ping() { + if (inserting) { + throw ProtocolError("cannot execute query while inserting"); + } WireFormat::WriteUInt64(*output_, ClientCodes::Ping); output_->Flush(); @@ -429,6 +465,7 @@ void Client::Impl::Ping() { void Client::Impl::ResetConnection() { InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value())); + inserting = false; if (!Handshake()) { throw ProtocolError("fail to connect to " + options_.host); @@ -648,6 +685,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { } } +bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) { + uint64_t packet_type = 0; + + while (true) { + if (!WireFormat::ReadVarint64(*input_, &packet_type)) { + throw std::runtime_error("unexpected package type " + + std::to_string((int)packet_type) + " for insert query"); + } + if (server_packet) { + *server_packet = packet_type; + } + + switch (packet_type) { + case ServerCodes::Data: { + if (!ReceiveData()) { + throw ProtocolError("can't read data packet from input stream"); + } + return true; + } + + case ServerCodes::Exception: { + ReceiveException(); + return false; + } + + case ServerCodes::ProfileInfo: + case ServerCodes::Progress: + case ServerCodes::Pong: + case ServerCodes::Hello: + continue; + + case ServerCodes::Log: { + // log tag + if (!WireFormat::SkipString(*input_)) { + return false; + } + Block block; + + // Use uncompressed stream since log blocks usually contain only one row + if (!ReadBlock(*input_, &block)) { + return false; + } + + if (events_) { + events_->OnServerLog(block); + } + continue; + } + + case ServerCodes::TableColumns: { + // external table name + if (!WireFormat::SkipString(*input_)) { + return false; + } + + // columns metadata + if (!WireFormat::SkipString(*input_)) { + return false; + } + continue; + } + + // No others expected. + case ServerCodes::EndOfStream: + case ServerCodes::ProfileEvents: + default: + throw UnimplementedError("unimplemented " + std::to_string((int)packet_type)); + break; + } + } +} + bool Client::Impl::ReadBlock(InputStream& input, Block* block) { // Additional information about block. if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { @@ -923,7 +1032,6 @@ void Client::Impl::FinalizeQuery() { output_->Flush(); } - void Client::Impl::WriteBlock(const Block& block, OutputStream& output) { // Additional information about block. if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { @@ -1063,7 +1171,7 @@ void Client::Impl::RetryGuard(std::function func) { } } } - // Connectiong with current_endpoint_ are broken. + // Connections with current_endpoint_ are broken. // Trying to establish with the another one from the list. size_t connection_attempts_count = GetConnectionAttempts(); for (size_t i = 0; i < connection_attempts_count;) @@ -1085,6 +1193,34 @@ void Client::Impl::RetryGuard(std::function func) { } } +Block Client::Impl::BeginInsert(Query query) { + if (inserting) { + throw ProtocolError("cannot execute query while inserting"); + } + EnsureNull en(static_cast(&query), &events_); + + if (options_.ping_before_query) { + RetryGuard([this]() { Ping(); }); + } + + // Create a callback to extract the block with the proper query columns. + Block block; + query.OnData([&block](const Block& b) { + block = std::move(b); + return true; + }); + + SendQuery(query.GetText()); + + // Receive data packet but keep the query/connection open. + if (!ReceivePreparePackets()) { + throw std::runtime_error("fail to receive data packet"); + } + + inserting = true; + return block; +} + Client::Client(const ClientOptions& opts) : options_(opts) , impl_(new Impl(opts)) @@ -1149,6 +1285,27 @@ void Client::Insert(const std::string& table_name, const std::string& query_id, impl_->Insert(table_name, query_id, block); } +Block Client::BeginInsert(const std::string& query) { + return impl_->BeginInsert(Query(query)); +} + +Block Client::BeginInsert(const std::string& query, const std::string& query_id) { + return impl_->BeginInsert(Query(query, query_id)); +} + +void Client::InsertData(const Block& block) { + impl_->InsertData(block); +} + +void Client::EndInsert(const Block& block) { + impl_->InsertData(block); + impl_->EndInsert(); +} + +void Client::EndInsert() { + impl_->EndInsert(); +} + void Client::Ping() { impl_->Ping(); } diff --git a/clickhouse/client.h b/clickhouse/client.h index dd4048cc..73a974f7 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -273,6 +273,17 @@ class Client { void Insert(const std::string& table_name, const Block& block); void Insert(const std::string& table_name, const std::string& query_id, const Block& block); + /// Start an \p INSERT statement, insert batches of data, then finish the insert. + Block BeginInsert(const std::string& query); + Block BeginInsert(const std::string& query, const std::string& query_id); + + /// Insert data using a \p block returned by \p BeginInsert. + void InsertData(const Block& block); + + /// End an \p INSERT session started by \p BeginInsert. + void EndInsert(); + void EndInsert(const Block& block); + /// Ping server for aliveness. void Ping(); diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 27a7c75f..514b8eae 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -358,6 +358,97 @@ TEST_P(ClientCase, Generic) { EXPECT_EQ(sizeof(TEST_DATA)/sizeof(TEST_DATA[0]), row); } +TEST_P(ClientCase, InsertData) { + client_->Execute( + "CREATE TEMPORARY TABLE IF NOT EXISTS test_clickhouse_cpp_insert (id UInt64, name String, f Bool)"); + + const struct { + uint64_t id; + std::string name; + bool f; + } TEST_DATA[] = { + { 1, "id", true }, + { 3, "foo", false }, + { 5, "bar", true }, + { 7, "name", false }, + }; + + const struct { + uint64_t id; + std::string name; + bool f; + } TEST_DATA2[] = { + { 2, "holden", true }, + { 4, "naomi", false }, + { 6, "amos", true }, + { 8, "alex", false }, + }; + + /// Insert some values. + { + // Prepare the insert. + auto block = client_->BeginInsert("INSERT INTO test_clickhouse_cpp_insert VALUES"); + EXPECT_EQ(size_t(3), block.GetColumnCount()); + + // Fetch the derived columns. + auto id = block[0]->As(); + auto name = block[1]->As(); + auto f = block[2]->As(); + + // Insert some values. + for (auto const& td : TEST_DATA) { + id->Append(td.id); + name->Append(td.name); + f->Append(td.f); + } + block.RefreshRowCount(); + client_->InsertData(block); + block.Clear(); + + // Insert some more values. + for (auto const& td : TEST_DATA2) { + id->Append(td.id); + name->Append(td.name); + f->Append(td.f); + } + block.RefreshRowCount(); + client_->InsertData(block); + block.Clear(); + client_->EndInsert(); + // Second call to EndInsert should be no-op. + client_->EndInsert(); + } + + /// Select values inserted in the previous steps. + size_t row = 0; + client_->Select("SELECT id, name, f FROM test_clickhouse_cpp_insert", [TEST_DATA, TEST_DATA2, &row](const Block& block) + { + if (block.GetRowCount() == 0) { + return; + } + EXPECT_EQ("id", block.GetColumnName(0)); + EXPECT_EQ("name", block.GetColumnName(1)); + size_t block_two_row_num = sizeof(TEST_DATA)/sizeof(TEST_DATA[0]); + + if (row < block_two_row_num) { + for (size_t c = 0; c < block.GetRowCount(); ++c, ++row) { + EXPECT_EQ(TEST_DATA[row].id, (*block[0]->As())[c]); + EXPECT_EQ(TEST_DATA[row].name, (*block[1]->As())[c]); + EXPECT_EQ(TEST_DATA[row].f, (*block[2]->As())[c]); + } + } else { + for (size_t c = 0; c < block.GetRowCount(); ++c, ++row) { + EXPECT_EQ(TEST_DATA2[row-block_two_row_num].id, (*block[0]->As())[c]); + EXPECT_EQ(TEST_DATA2[row-block_two_row_num].name, (*block[1]->As())[c]); + EXPECT_EQ(TEST_DATA2[row-block_two_row_num].f, (*block[2]->As())[c]); + } + } + } + ); + auto exp = sizeof(TEST_DATA)/sizeof(TEST_DATA[0]) + sizeof(TEST_DATA2)/sizeof(TEST_DATA2[0]); + EXPECT_EQ(exp, row); +} + TEST_P(ClientCase, Nullable) { /// Create a table. client_->Execute( @@ -949,7 +1040,7 @@ TEST_P(ClientCase, Query_ID) { EXPECT_EQ(5u, total_count); } -// Spontaneosly fails on INSERTint data. +// Spontaneously fails on INSERT int data. TEST_P(ClientCase, DISABLED_ArrayArrayUInt64) { // Based on https://github.com/ClickHouse/clickhouse-cpp/issues/43 std::cerr << "Connected to: " << client_->GetServerInfo() << std::endl;