Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "Unexpected packet Data received from client" for Distributed queries #17254

Merged
merged 4 commits into from Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/DataStreams/RemoteQueryExecutor.cpp
Expand Up @@ -151,6 +151,17 @@ void RemoteQueryExecutor::sendQuery()
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;

/// Query cannot be canceled in the middle of the send query,
/// since there are multiple packages:
/// - Query
/// - Data (multiple times)
///
/// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw:
///
/// Unexpected packet Data received from client
///
std::lock_guard guard(was_cancelled_mutex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok, but not clear why cancel_mutex from MultiplexedConnections does not help:

void MultiplexedConnections::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
bool with_pending_data)
{
std::lock_guard lock(cancel_mutex);

Probably, because we send empty block here:

I suppose it may be more logical to add sendScalars() and sendExternalTables logic into MultiplexedConnections::sendQuery.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, because we send empty block here:

Indeed.

I suppose it may be more logical to add sendScalars() and sendExternalTables logic into MultiplexedConnections::sendQuery.

Agree


established = true;

auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/ClusterProxy/IStreamFactory.h
Expand Up @@ -36,7 +36,8 @@ class IStreamFactory
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & res,
Pipes & remote_pipes,
Pipes & delayed_pipes) = 0;
Pipes & delayed_pipes,
Poco::Logger * log) = 0;
};

}
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
Expand Up @@ -112,7 +112,8 @@ void SelectStreamFactory::createForShard(
const SelectQueryInfo &,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,
Pipes & delayed_pipes)
Pipes & delayed_pipes,
Poco::Logger * log)
{
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
Expand All @@ -138,6 +139,8 @@ void SelectStreamFactory::createForShard(
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
remote_query_executor->setLogger(log);

remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/ClusterProxy/SelectStreamFactory.h
Expand Up @@ -41,7 +41,8 @@ class SelectStreamFactory final : public IStreamFactory
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,
Pipes & delayed_pipes) override;
Pipes & delayed_pipes,
Poco::Logger * log) override;

private:
const Block header;
Expand Down
7 changes: 6 additions & 1 deletion src/Interpreters/ClusterProxy/executeQuery.cpp
Expand Up @@ -119,7 +119,12 @@ void executeQuery(
throttler = user_level_throttler;

for (const auto & shard_info : query_info.cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, plans, remote_pipes, delayed_pipes);
{
stream_factory.createForShard(shard_info, query, query_ast,
new_context, throttler, query_info, plans,
remote_pipes, delayed_pipes,
log);
}

if (!remote_pipes.empty())
{
Expand Down