From 0c4f5498ff772f2492c2e6cc31da7d29e670aca3 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Thu, 26 Nov 2020 05:22:35 +0300 Subject: [PATCH] Backport #17254 to 20.12: Fix \"Unexpected packet Data received from client\" for Distributed queries --- src/DataStreams/RemoteQueryExecutor.cpp | 11 +++++++++++ src/Interpreters/ClusterProxy/IStreamFactory.h | 3 ++- src/Interpreters/ClusterProxy/SelectStreamFactory.cpp | 5 ++++- src/Interpreters/ClusterProxy/SelectStreamFactory.h | 3 ++- src/Interpreters/ClusterProxy/executeQuery.cpp | 6 +++++- ...imize_distributed_group_by_sharding_key.reference} | 0 ...44_optimize_distributed_group_by_sharding_key.sql} | 0 7 files changed, 24 insertions(+), 4 deletions(-) rename tests/queries/0_stateless/{01247_optimize_distributed_group_by_sharding_key.reference => 01244_optimize_distributed_group_by_sharding_key.reference} (100%) rename tests/queries/0_stateless/{01247_optimize_distributed_group_by_sharding_key.sql => 01244_optimize_distributed_group_by_sharding_key.sql} (100%) diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 38486aa63684..6ee78d805831 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -151,6 +151,17 @@ void RemoteQueryExecutor::sendQuery() if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size()) return; + /// Query cannot be canceled in the middle of the send query, + /// since there are multiple packages: + /// - Query + /// - Data (multiple times) + /// + /// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw: + /// + /// Unexpected packet Data received from client + /// + std::lock_guard guard(was_cancelled_mutex); + established = true; auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); diff --git a/src/Interpreters/ClusterProxy/IStreamFactory.h b/src/Interpreters/ClusterProxy/IStreamFactory.h index 80be585d15e7..c0b887e0489b 100644 --- a/src/Interpreters/ClusterProxy/IStreamFactory.h +++ b/src/Interpreters/ClusterProxy/IStreamFactory.h @@ -36,7 +36,8 @@ class IStreamFactory const SelectQueryInfo & query_info, std::vector & res, Pipes & remote_pipes, - Pipes & delayed_pipes) = 0; + Pipes & delayed_pipes, + Poco::Logger * log) = 0; }; } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 0020db52d3c4..56f306595ac1 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -117,7 +117,8 @@ void SelectStreamFactory::createForShard( const SelectQueryInfo &, std::vector & plans, Pipes & remote_pipes, - Pipes & delayed_pipes) + Pipes & delayed_pipes, + Poco::Logger * log) { bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; bool add_totals = false; @@ -143,6 +144,8 @@ void SelectStreamFactory::createForShard( { auto remote_query_executor = std::make_shared( shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage); + remote_query_executor->setLogger(log); + remote_query_executor->setPoolMode(PoolMode::GET_MANY); if (!table_func_ptr) remote_query_executor->setMainTable(main_table); diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index 9b57b92ed50e..b51ac109a11c 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -41,7 +41,8 @@ class SelectStreamFactory final : public IStreamFactory const SelectQueryInfo & query_info, std::vector & plans, Pipes & remote_pipes, - Pipes & delayed_pipes) override; + Pipes & delayed_pipes, + Poco::Logger * log) override; private: const Block header; diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 6b4f28694a93..c79b17eac2a9 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -119,7 +119,11 @@ void executeQuery( throttler = user_level_throttler; for (const auto & shard_info : query_info.cluster->getShardsInfo()) - stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, plans, remote_pipes, delayed_pipes); + { + stream_factory.createForShard(shard_info, query, query_ast, + new_context, throttler, query_info, plans, + remote_pipes, delayed_pipes, log); + } if (!remote_pipes.empty()) { diff --git a/tests/queries/0_stateless/01247_optimize_distributed_group_by_sharding_key.reference b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference similarity index 100% rename from tests/queries/0_stateless/01247_optimize_distributed_group_by_sharding_key.reference rename to tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.reference diff --git a/tests/queries/0_stateless/01247_optimize_distributed_group_by_sharding_key.sql b/tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql similarity index 100% rename from tests/queries/0_stateless/01247_optimize_distributed_group_by_sharding_key.sql rename to tests/queries/0_stateless/01244_optimize_distributed_group_by_sharding_key.sql