Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ void Client::Impl::SendBlockData(const Block& block) {
if (compression_ == CompressionState::Enable) {
std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get(), options_.max_compression_chunk_size, options_.compression_method);
BufferedOutput buffered(std::move(compressed_output), options_.max_compression_chunk_size);

WriteBlock(block, buffered);
} else {
WriteBlock(block, *output_);
Expand Down Expand Up @@ -696,7 +696,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
if (!WireFormat::ReadString(input, &type)) {
return false;
}

if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) {
uint8_t custom_format_len;
if (!WireFormat::ReadFixed(input, &custom_format_len)) {
Expand All @@ -705,7 +705,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
if (custom_format_len > 0) {
throw UnimplementedError(std::string("unsupported custom serialization"));
}
}
}

if (ColumnRef col = CreateColumnByType(type, create_column_settings)) {
if (num_rows && !col->Load(&input, num_rows)) {
Expand Down Expand Up @@ -909,7 +909,7 @@ void Client::Impl::SendQuery(const Query& query, bool finalize) {
}
WireFormat::WriteString(*output_, std::string()); // empty string after last param
}

if (finalize) {
FinalizeQuery();
}
Expand Down Expand Up @@ -1101,6 +1101,9 @@ Client::Client(const ClientOptions& opts,
Client::~Client()
{ }

Client::Client(Client&&) = default;
Client& Client::operator=(Client&&) = default;

void Client::Execute(const Query& query) {
impl_->ExecuteQuery(query);
}
Expand Down
8 changes: 7 additions & 1 deletion clickhouse/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ class Client {
std::unique_ptr<SocketFactory> socket_factory);
~Client();

// movable only
Client(Client&&);
Client& operator=(Client&&);
Client(const Client&) = delete;
Client& operator=(const Client&) = delete;

/// Intends for execute arbitrary queries.
void Execute(const Query& query);

Expand Down Expand Up @@ -300,7 +306,7 @@ class Client {
static Version GetVersion();

private:
const ClientOptions options_;
ClientOptions options_;

class Impl;
std::unique_ptr<Impl> impl_;
Expand Down
197 changes: 196 additions & 1 deletion ut/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <ostream>
#include <string_view>
#include <thread>
#include <tuple>
#include <chrono>

using namespace clickhouse;
Expand Down Expand Up @@ -1567,7 +1568,7 @@ TEST_P(ClientCase, ClientName) {

FlushLogs();

std::string query_log_query
std::string query_log_query
= "SELECT CAST(client_name, 'String') FROM system.query_log WHERE query_id = '" + query_id + "'";

size_t total_rows = 0;
Expand All @@ -1580,3 +1581,197 @@ TEST_P(ClientCase, ClientName) {
});
ASSERT_GT(total_rows, 0UL) << "Query with query_id " << query_id << " is not found";
}


namespace clickhouse {

// in cpp20, the equality operators can be defaulted and this code removed

bool operator==(
const ClientOptions::SSLOptions::CommandAndValue& lhs,
const ClientOptions::SSLOptions::CommandAndValue& rhs) {
return lhs.command == rhs.command &&
lhs.value == rhs.value;
}

bool operator!=(
const ClientOptions::SSLOptions::CommandAndValue& lhs,
const ClientOptions::SSLOptions::CommandAndValue& rhs) {
return !(lhs == rhs);
}

auto tied(const ClientOptions::SSLOptions& o) {
return std::tie(
o.ssl_context,
o.use_default_ca_locations,
o.path_to_ca_files,
o.path_to_ca_directory,
o.min_protocol_version,
o.max_protocol_version,
o.context_options,
o.use_sni,
o.skip_verification,
o.host_flags,
o.configuration
);
}

bool operator==(
const ClientOptions::SSLOptions& lhs,
const ClientOptions::SSLOptions& rhs) {
return tied(lhs) == tied(rhs);
}

bool operator!=(
const ClientOptions::SSLOptions& lhs,
const ClientOptions::SSLOptions& rhs) {
return !(lhs == rhs);
}


auto tied(const ClientOptions& o) {
return std::tie(
o.host,
o.port,
o.endpoints,
o.default_database,
o.user,
o.password,
o.rethrow_exceptions,
o.ping_before_query,
o.send_retries,
o.retry_timeout,
o.tcp_keepalive,
o.tcp_keepalive_idle,
o.tcp_keepalive_intvl,
o.tcp_keepalive_cnt,
o.tcp_nodelay,
o.connection_connect_timeout,
o.connection_recv_timeout,
o.connection_send_timeout,
o.backward_compatibility_lowcardinality_as_wrapped_column,
o.max_compression_chunk_size,
o.ssl_options
);
}

bool operator==(const ClientOptions& lhs, const ClientOptions& rhs) {
return tied(lhs) == tied(rhs);
}

bool operator!=(const ClientOptions& lhs, const ClientOptions& rhs) {
return !(lhs == rhs);
}

} // namespace clickhouse


TEST_P(ClientCase, ClientOptionsMove) {
using secs = std::chrono::seconds;

// purposely initialize with values different than default
auto sslopt = ClientOptions::SSLOptions{}
.SetUseDefaultCALocations(false)
.SetPathToCAFiles({"path"})
.SetPathToCADirectory({"pathdir"})
.SetMinProtocolVersion(42)
.SetMaxProtocolVersion(99)
.SetContextOptions(33)
.SetUseSNI(false)
.SetSkipVerification(true)
.SetHostVerifyFlags(66)
.SetConfiguration({{"cmd", "value"}});

auto opt = ClientOptions{}
.SetHost("host")
.SetPort(static_cast<uint16_t>(9010))
.SetEndpoints({{"ehost", 7777}})
.SetUser("user")
.SetPassword("pwd")
.SetDefaultDatabase("db")
.SetRethrowException(false)
.SetPingBeforeQuery(true)
.SetSendRetries(2)
.SetRetryTimeout(secs(6))
.TcpKeepAlive(true)
.SetTcpKeepAliveIdle(secs(7))
.SetTcpKeepAliveCount(4)
.TcpNoDelay(false)
.SetConnectionConnectTimeout(secs(8))
.SetConnectionRecvTimeout(secs(1))
.SetConnectionSendTimeout(secs(1))
.SetMaxCompressionChunkSize(30000)
#ifdef WITH_OPENSSL
.SetSSLOptions(sslopt)
#endif
;

ASSERT_NE(opt, ClientOptions{});
auto ogOpt = opt;
ASSERT_EQ(opt, ogOpt);
auto moveOptCtor{std::move(opt)};
ASSERT_EQ(moveOptCtor, ogOpt);
ASSERT_NE(moveOptCtor, opt);
ASSERT_NE(moveOptCtor, ClientOptions{});

ASSERT_NE(opt, ogOpt);
opt = ogOpt;
ASSERT_EQ(opt, ogOpt);
auto moveOptAssign = std::move(opt);
ASSERT_EQ(moveOptAssign, moveOptCtor);
ASSERT_EQ(moveOptAssign, ogOpt);
ASSERT_NE(moveOptAssign, opt);
ASSERT_NE(moveOptAssign, ClientOptions{});
}

TEST_P(ClientCase, ClientMoveConstructor) {
Client client{std::move(*client_.get())};
::createTableWithOneColumn<ColumnString>(client, table_name, column_name);

client.Execute("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')");

std::array<std::string, 2> data{"Foo", "Bar"};

size_t total_rows = 0;
client.Select(getOneColumnSelectQuery(), [&total_rows, &data](const Block& block) {
total_rows += block.GetRowCount();
if (block.GetRowCount() == 0) {
return;
}

ASSERT_EQ(1U, block.GetColumnCount());
if (auto col = block[0]->As<ColumnString>()) {
ASSERT_EQ(data.size(), col->Size());
for (size_t i = 0; i < col->Size(); ++i) {
EXPECT_EQ(data[i], (*col)[i]) << " at index: " << i;
}
}
});
ASSERT_EQ(total_rows, 2U);
}

TEST_P(ClientCase, ClientMoveAssign) {
Client client = std::move(*client_.get());
::createTableWithOneColumn<ColumnString>(client, table_name, column_name);

client.Execute("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')");

std::array<std::string, 2> data{"Foo", "Bar"};

size_t total_rows = 0;
client.Select(getOneColumnSelectQuery(), [&total_rows, &data](const Block& block) {
total_rows += block.GetRowCount();
if (block.GetRowCount() == 0) {
return;
}

ASSERT_EQ(1U, block.GetColumnCount());
if (auto col = block[0]->As<ColumnString>()) {
ASSERT_EQ(data.size(), col->Size());
for (size_t i = 0; i < col->Size(); ++i) {
EXPECT_EQ(data[i], (*col)[i]) << " at index: " << i;
}
}
});
ASSERT_EQ(total_rows, 2U);
}
Loading