Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async connect to hosts with multiple ips #51934

Merged
merged 7 commits into from Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Client/Connection.cpp
Expand Up @@ -105,6 +105,8 @@ void Connection::connect(const ConnectionTimeouts & timeouts)

for (auto it = addresses.begin(); it != addresses.end();)
{
have_more_addresses_to_connect = it != std::prev(addresses.end());

if (connected)
disconnect();

Expand Down
4 changes: 4 additions & 0 deletions src/Client/Connection.h
Expand Up @@ -159,6 +159,8 @@ class Connection : public IServerConnection
out->setAsyncCallback(async_callback);
}

bool haveMoreAddressesToConnect() const { return have_more_addresses_to_connect; }

private:
String host;
UInt16 port;
Expand Down Expand Up @@ -227,6 +229,8 @@ class Connection : public IServerConnection
std::shared_ptr<WriteBuffer> maybe_compressed_out;
std::unique_ptr<NativeWriter> block_out;

bool have_more_addresses_to_connect = false;

/// Logger is created lazily, for avoid to run DNS request in constructor.
class LoggerWrapper
{
Expand Down
7 changes: 6 additions & 1 deletion src/Client/ConnectionEstablisher.cpp
Expand Up @@ -179,7 +179,7 @@ bool ConnectionEstablisherAsync::checkTimeout()
is_timeout_alarmed = true;
}

if (is_timeout_alarmed && !is_socket_ready)
if (is_timeout_alarmed && !is_socket_ready && !haveMoreAddressesToConnect())
{
/// In not async case timeout exception would be thrown and caught in ConnectionEstablisher::run,
/// but in async case we process timeout outside and cannot throw exception. So, we just save fail message.
Expand Down Expand Up @@ -225,6 +225,11 @@ void ConnectionEstablisherAsync::resetResult()
}
}

bool ConnectionEstablisherAsync::haveMoreAddressesToConnect()
{
return !result.entry.isNull() && result.entry->haveMoreAddressesToConnect();
}

#endif

}
2 changes: 2 additions & 0 deletions src/Client/ConnectionEstablisher.h
Expand Up @@ -104,6 +104,8 @@ class ConnectionEstablisherAsync : public AsyncTaskExecutor

void resetResult();

bool haveMoreAddressesToConnect();

ConnectionEstablisher connection_establisher;
TryResult result;
std::string fail_message;
Expand Down
Empty file.
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<use_hedged_requests>1</use_hedged_requests>
</default>
</profiles>
</clickhouse>
@@ -0,0 +1,4 @@
<clickhouse>
<listen_host>::</listen_host>
</clickhouse>

72 changes: 72 additions & 0 deletions tests/integration/test_async_connect_to_multiple_ips/test.py
@@ -0,0 +1,72 @@
import pytest
from helpers.cluster import ClickHouseCluster


cluster = ClickHouseCluster(__file__)


@pytest.fixture(scope="module")
def cluster_without_dns_cache_update():
try:
cluster.start()

yield cluster

except Exception as ex:
print(ex)

finally:
cluster.shutdown()
pass


node1 = cluster.add_instance(
"node1",
main_configs=["configs/listen_host.xml"],
user_configs=["configs/enable_hedged.xml"],
with_zookeeper=True,
ipv4_address="10.5.95.11",
)

node2 = cluster.add_instance(
"node2",
main_configs=["configs/listen_host.xml"],
user_configs=["configs/enable_hedged.xml"],
with_zookeeper=True,
ipv4_address="10.5.95.12",
)


# node1 - source with table, have invalid ipv6
# node2 - destination, doing remote query
def test(cluster_without_dns_cache_update):
node1.query(
"CREATE TABLE test(t Date, label UInt8) ENGINE = MergeTree PARTITION BY t ORDER BY label;"
)
node1.query("INSERT INTO test SELECT toDate('2022-12-28'), 1;")
assert node1.query("SELECT count(*) FROM test") == "1\n"

wrong_ip = "2001:3984:3989::1:1118"

node2.exec_in_container(
(["bash", "-c", "echo '{} {}' >> /etc/hosts".format(wrong_ip, node1.name)])
)
node2.exec_in_container(
(
[
"bash",
"-c",
"echo '{} {}' >> /etc/hosts".format(node1.ipv4_address, node1.name),
]
)
)

assert node1.query("SELECT count(*) from test") == "1\n"
node2.query("SYSTEM DROP DNS CACHE")
node1.query("SYSTEM DROP DNS CACHE")
assert (
node2.query(
f"SELECT count(*) FROM remote('{node1.name}', default.test) limit 1;"
)
== "1\n"
)