Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 68 additions & 140 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class Client::Impl {

Block BeginInsert(Query query);

void InsertData(const Block& block);
void SendInsertBlock(const Block& block);

void EndInsert();

Expand All @@ -181,7 +181,6 @@ class Client::Impl {
bool Handshake();

bool ReceivePacket(uint64_t* server_packet = nullptr);
bool ReceivePreparePackets(uint64_t* server_packet = nullptr);

void SendQuery(const Query& query, bool finalize = true);
void FinalizeQuery();
Expand Down Expand Up @@ -215,7 +214,6 @@ class Client::Impl {
}

private:
bool inserting;
/// In case of network errors tries to reconnect to server and
/// call fuc several times.
void RetryGuard(std::function<void()> func);
Expand Down Expand Up @@ -259,6 +257,8 @@ class Client::Impl {
std::optional<Endpoint> current_endpoint_;

ServerInfo server_info_;

bool inserting_;
};

ClientOptions modifyClientOptions(ClientOptions opts)
Expand Down Expand Up @@ -294,9 +294,10 @@ Client::Impl::~Impl() {
}

void Client::Impl::ExecuteQuery(Query query) {
if (inserting) {
throw ProtocolError("cannot execute query while inserting");
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}

EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

if (options_.ping_before_query) {
Expand All @@ -312,9 +313,10 @@ void Client::Impl::ExecuteQuery(Query query) {


void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
if (inserting) {
throw ProtocolError("cannot execute query while inserting");
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}

if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
}
Expand Down Expand Up @@ -378,15 +380,18 @@ std::string NameToQueryString(const std::string &input)
}

void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
if (inserting) {
throw ProtocolError("cannot execute query while inserting");
if (inserting_) {
throw ValidationError("cannot execute query while inserting, use SendInsertData instead");
}

if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}

inserting_ = true;

std::stringstream fields_section;
const auto num_columns = block.GetColumnCount();
const auto num_columns = block.GetColumnCount();

for (unsigned int i = 0; i < num_columns; ++i) {
if (i == num_columns - 1) {
Expand All @@ -399,41 +404,67 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id);
SendQuery(query);

uint64_t server_packet;
// Receive data packet.
while (true) {
bool ret = ReceivePacket(&server_packet);

if (!ret) {
throw ProtocolError("fail to receive data packet");
}
// Wait for a data packet and return
uint64_t server_packet = 0;
while (ReceivePacket(&server_packet)) {
if (server_packet == ServerCodes::Data) {
break;
SendData(block);
EndInsert();
return;
}
if (server_packet == ServerCodes::Progress) {
continue;
}

throw ProtocolError("fail to receive data packet");
}

Block Client::Impl::BeginInsert(Query query) {
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}

EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}

inserting_ = true;

// Create a callback to extract the block with the proper query columns.
Block block;
query.OnData([&block](const Block& b) {
block = std::move(b);
return true;
});

SendQuery(query.GetText());

// Wait for a data packet and return
uint64_t server_packet = 0;
while (ReceivePacket(&server_packet)) {
if (server_packet == ServerCodes::Data) {
return block;
}
}

// Send data.
inserting = true;
SendData(block);
EndInsert();
throw ProtocolError("fail to receive data packet");
}

void Client::Impl::InsertData(const Block& block) {
if (!inserting) {
throw ProtocolError("illegal call to InsertData without first calling BeginInsert");
void Client::Impl::SendInsertBlock(const Block& block) {
if (!inserting_) {
throw ValidationError("illegal call to InsertData without first calling BeginInsert");
}

SendData(block);
}

void Client::Impl::EndInsert() {
if (!inserting) return;
if (!inserting_) {
return;
}

// Send empty block as marker of end of data.
SendData(Block());
inserting = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it here because I got an error from the bits waiting for a result when I had it where you've now moved it.


// Wait for EOS.
uint64_t eos_packet{0};
Expand All @@ -446,12 +477,14 @@ void Client::Impl::EndInsert() {
throw ProtocolError(std::string{"unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "}
+ (eos_packet ? std::to_string(eos_packet) : "nothing") + ")");
}
inserting_ = false;
}

void Client::Impl::Ping() {
if (inserting) {
throw ProtocolError("cannot execute query while inserting");
if (inserting_) {
throw ValidationError("cannot execute query while inserting");
}

WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
output_->Flush();

Expand All @@ -465,7 +498,7 @@ void Client::Impl::Ping() {

void Client::Impl::ResetConnection() {
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
inserting = false;
inserting_ = false;

if (!Handshake()) {
throw ProtocolError("fail to connect to " + options_.host);
Expand Down Expand Up @@ -685,78 +718,6 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
}
}

bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) {
uint64_t packet_type = 0;

while (true) {
if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
throw std::runtime_error("unexpected package type " +
std::to_string((int)packet_type) + " for insert query");
}
if (server_packet) {
*server_packet = packet_type;
}

switch (packet_type) {
case ServerCodes::Data: {
if (!ReceiveData()) {
throw ProtocolError("can't read data packet from input stream");
}
return true;
}

case ServerCodes::Exception: {
ReceiveException();
return false;
}

case ServerCodes::ProfileInfo:
case ServerCodes::Progress:
case ServerCodes::Pong:
case ServerCodes::Hello:
continue;

case ServerCodes::Log: {
// log tag
if (!WireFormat::SkipString(*input_)) {
return false;
}
Block block;

// Use uncompressed stream since log blocks usually contain only one row
if (!ReadBlock(*input_, &block)) {
return false;
}

if (events_) {
events_->OnServerLog(block);
}
continue;
}

case ServerCodes::TableColumns: {
// external table name
if (!WireFormat::SkipString(*input_)) {
return false;
}

// columns metadata
if (!WireFormat::SkipString(*input_)) {
return false;
}
continue;
}

// No others expected.
case ServerCodes::EndOfStream:
case ServerCodes::ProfileEvents:
default:
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
break;
}
}
}

bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
// Additional information about block.
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
Expand Down Expand Up @@ -1193,34 +1154,6 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
}
}

Block Client::Impl::BeginInsert(Query query) {
if (inserting) {
throw ProtocolError("cannot execute query while inserting");
}
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}

// Create a callback to extract the block with the proper query columns.
Block block;
query.OnData([&block](const Block& b) {
block = std::move(b);
return true;
});

SendQuery(query.GetText());

// Receive data packet but keep the query/connection open.
if (!ReceivePreparePackets()) {
throw std::runtime_error("fail to receive data packet");
}

inserting = true;
return block;
}

Client::Client(const ClientOptions& opts)
: options_(opts)
, impl_(new Impl(opts))
Expand Down Expand Up @@ -1293,13 +1226,8 @@ Block Client::BeginInsert(const std::string& query, const std::string& query_id)
return impl_->BeginInsert(Query(query, query_id));
}

void Client::InsertData(const Block& block) {
impl_->InsertData(block);
}

void Client::EndInsert(const Block& block) {
impl_->InsertData(block);
impl_->EndInsert();
void Client::SendInsertBlock(const Block& block) {
impl_->SendInsertBlock(block);
}

void Client::EndInsert() {
Expand Down
3 changes: 1 addition & 2 deletions clickhouse/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,10 @@ class Client {
Block BeginInsert(const std::string& query, const std::string& query_id);

/// Insert data using a \p block returned by \p BeginInsert.
void InsertData(const Block& block);
void SendInsertBlock(const Block& block);

/// End an \p INSERT session started by \p BeginInsert.
void EndInsert();
void EndInsert(const Block& block);

/// Ping server for aliveness.
void Ping();
Expand Down
4 changes: 2 additions & 2 deletions ut/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ TEST_P(ClientCase, InsertData) {
f->Append(td.f);
}
block.RefreshRowCount();
client_->InsertData(block);
client_->SendInsertBlock(block);
block.Clear();

// Insert some more values.
Expand All @@ -412,7 +412,7 @@ TEST_P(ClientCase, InsertData) {
f->Append(td.f);
}
block.RefreshRowCount();
client_->InsertData(block);
client_->SendInsertBlock(block);
block.Clear();
client_->EndInsert();
// Second call to EndInsert should be no-op.
Expand Down
Loading