Skip to content

Commit

Permalink
Merge pull request #17254 from azat/fix-dist-query-cancelation
Browse files Browse the repository at this point in the history
Fix "Unexpected packet Data received from client"  for Distributed queries
  • Loading branch information
alexey-milovidov committed Nov 26, 2020
2 parents c438b43 + 8138668 commit fb3a69b
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 4 deletions.
11 changes: 11 additions & 0 deletions src/DataStreams/RemoteQueryExecutor.cpp
Expand Up @@ -151,6 +151,17 @@ void RemoteQueryExecutor::sendQuery()
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;

/// Query cannot be canceled in the middle of the send query,
/// since there are multiple packages:
/// - Query
/// - Data (multiple times)
///
/// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw:
///
/// Unexpected packet Data received from client
///
std::lock_guard guard(was_cancelled_mutex);

established = true;

auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/ClusterProxy/IStreamFactory.h
Expand Up @@ -36,7 +36,8 @@ class IStreamFactory
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & res,
Pipes & remote_pipes,
Pipes & delayed_pipes) = 0;
Pipes & delayed_pipes,
Poco::Logger * log) = 0;
};

}
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
Expand Up @@ -117,7 +117,8 @@ void SelectStreamFactory::createForShard(
const SelectQueryInfo &,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,
Pipes & delayed_pipes)
Pipes & delayed_pipes,
Poco::Logger * log)
{
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
Expand All @@ -143,6 +144,8 @@ void SelectStreamFactory::createForShard(
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
remote_query_executor->setLogger(log);

remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/ClusterProxy/SelectStreamFactory.h
Expand Up @@ -41,7 +41,8 @@ class SelectStreamFactory final : public IStreamFactory
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,
Pipes & delayed_pipes) override;
Pipes & delayed_pipes,
Poco::Logger * log) override;

private:
const Block header;
Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/ClusterProxy/executeQuery.cpp
Expand Up @@ -119,7 +119,11 @@ void executeQuery(
throttler = user_level_throttler;

for (const auto & shard_info : query_info.cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, plans, remote_pipes, delayed_pipes);
{
stream_factory.createForShard(shard_info, query, query_ast,
new_context, throttler, query_info, plans,
remote_pipes, delayed_pipes, log);
}

if (!remote_pipes.empty())
{
Expand Down

0 comments on commit fb3a69b

Please sign in to comment.