Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel replicas always skip unavailable ones #50293

Merged
merged 14 commits into from May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions programs/server/config.xml
Expand Up @@ -909,6 +909,11 @@
<host>127.0.0.10</host>
<port>9000</port>
</replica>
<!-- Unavailable replica -->
<replica>
<host>127.0.0.11</host>
<port>1234</port>
</replica>
</shard>
</parallel_replicas>
<test_cluster_two_shards_localhost>
Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -458,6 +458,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}

/// Set skip_unavailable_shards to true only if it wasn't disabled explicitly
if (settings.allow_experimental_parallel_reading_from_replicas > 0 && !settings.skip_unavailable_shards && !settings.isChanged("skip_unavailable_shards"))
{
context->setSetting("skip_unavailable_shards", true);
}

/// Check support for JOIN for parallel replicas with custom key
if (joined_tables.tablesCount() > 1 && !settings.parallel_replicas_custom_key.value.empty())
{
Expand Down
39 changes: 27 additions & 12 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Expand Up @@ -47,8 +47,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{}

RemoteQueryExecutor::RemoteQueryExecutor(
Expand Down Expand Up @@ -90,8 +89,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
Expand All @@ -108,8 +106,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, extension(extension_)
{
create_connections = [this, pool, throttler, extension_](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
Expand Down Expand Up @@ -247,6 +244,13 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
finished = true;
sent_query = true;

/// We need to tell the coordinator not to wait for this replica.
if (extension && extension->parallel_reading_coordinator)
{
chassert(extension->replica_info);
extension->parallel_reading_coordinator->markReplicaAsUnavailble(extension->replica_info->number_of_current_replica);
}

return;
}

Expand Down Expand Up @@ -360,7 +364,18 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context->resume();

if (needToSkipUnavailableShard())
{
/// We need to tell the coordinator not to wait for this replica.
/// But at this point it may lead to an incomplete result set, because
/// this replica committed to read some part of there data and then died.
if (extension && extension->parallel_reading_coordinator)
{
chassert(extension->parallel_reading_coordinator);
extension->parallel_reading_coordinator->markReplicaAsUnavailble(extension->replica_info->number_of_current_replica);
}

return ReadResult(Block());
}

/// Check if packet is not ready yet.
if (read_context->isInProgress())
Expand Down Expand Up @@ -524,30 +539,30 @@ bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)

void RemoteQueryExecutor::processReadTaskRequest()
{
if (!task_iterator)
if (!extension || !extension->task_iterator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");

ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
auto response = (*task_iterator)();
auto response = (*extension->task_iterator)();
connections->sendReadTaskResponse(response);
}

void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request)
{
if (!parallel_reading_coordinator)
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");

ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
auto response = parallel_reading_coordinator->handleRequest(std::move(request));
auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request));
connections->sendMergeTreeReadTaskResponse(response);
}

void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
{
if (!parallel_reading_coordinator)
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");

parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
}

void RemoteQueryExecutor::finish()
Expand Down
4 changes: 2 additions & 2 deletions src/QueryPipeline/RemoteQueryExecutor.h
Expand Up @@ -212,11 +212,11 @@ class RemoteQueryExecutor
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;

std::optional<Extension> extension;
/// Initiator identifier for distributed task processing
std::shared_ptr<TaskIterator> task_iterator;

std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator;

/// This is needed only for parallel reading from replicas, because
/// we create a RemoteQueryExecutor per replica and have to store additional info
/// about the number of the current replica or the count of replicas at all.
Expand Down
46 changes: 43 additions & 3 deletions src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
Expand Up @@ -19,6 +19,7 @@
#include "Storages/MergeTree/RequestResponse.h"
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/IntersectionsIndexes.h>
#include <fmt/core.h>
#include <fmt/format.h>

namespace DB
Expand Down Expand Up @@ -61,18 +62,22 @@ class ParallelReplicasReadingCoordinator::ImplInterface
{
size_t number_of_requests{0};
size_t sum_marks{0};
bool is_unavailable{false};
};
using Stats = std::vector<Stat>;
static String toString(Stats stats)
{
String result = "Statistics: ";
std::vector<String> stats_by_replica;
for (size_t i = 0; i < stats.size(); ++i)
result += fmt::format("-- replica {}, requests: {} marks: {} ", i, stats[i].number_of_requests, stats[i].sum_marks);
stats_by_replica.push_back(fmt::format("replica {}{} - {{requests: {} marks: {}}}", i, stats[i].is_unavailable ? " is unavailable" : "", stats[i].number_of_requests, stats[i].sum_marks));
result += fmt::format("{}", fmt::join(stats_by_replica, "; "));
return result;
}

Stats stats;
size_t replicas_count;
size_t replicas_count{0};
size_t unavailable_replicas_count{0};

explicit ImplInterface(size_t replicas_count_)
: stats{replicas_count_}
Expand All @@ -82,6 +87,7 @@ class ParallelReplicasReadingCoordinator::ImplInterface
virtual ~ImplInterface() = default;
virtual ParallelReadResponse handleRequest(ParallelReadRequest request) = 0;
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
virtual void markReplicaAsUnavailble(size_t replica_number) = 0;
nikitamikhaylov marked this conversation as resolved.
Show resolved Hide resolved
};

using Parts = std::set<Part>;
Expand Down Expand Up @@ -128,6 +134,7 @@ class DefaultCoordinator : public ParallelReplicasReadingCoordinator::ImplInterf

ParallelReadResponse handleRequest(ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailble(size_t replica_number) override;

void updateReadingState(const InitialAllRangesAnnouncement & announcement);
void finalizeReadingState();
Expand Down Expand Up @@ -199,6 +206,17 @@ void DefaultCoordinator::updateReadingState(const InitialAllRangesAnnouncement &
}
}

void DefaultCoordinator::markReplicaAsUnavailble(size_t replica_number)
{
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);

++unavailable_replicas_count;
stats[replica_number].is_unavailable = true;

if (sent_initial_requests == replicas_count - unavailable_replicas_count)
finalizeReadingState();
}

void DefaultCoordinator::finalizeReadingState()
{
/// Clear all the delayed queue
Expand Down Expand Up @@ -345,12 +363,23 @@ class InOrderCoordinator : public ParallelReplicasReadingCoordinator::ImplInterf

ParallelReadResponse handleRequest([[ maybe_unused ]] ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement([[ maybe_unused ]] InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailble(size_t replica_number) override;

Parts all_parts_to_read;

Poco::Logger * log = &Poco::Logger::get(fmt::format("{}{}", magic_enum::enum_name(mode), "Coordinator"));
};

template <CoordinationMode mode>
void InOrderCoordinator<mode>::markReplicaAsUnavailble(size_t replica_number)
{
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);

stats[replica_number].is_unavailable = true;
++unavailable_replicas_count;

/// There is nothing to do else.
}

template <CoordinationMode mode>
void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
Expand Down Expand Up @@ -388,7 +417,6 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
}
}


template <CoordinationMode mode>
ParallelReadResponse InOrderCoordinator<mode>::handleRequest(ParallelReadRequest request)
{
Expand Down Expand Up @@ -512,6 +540,18 @@ void ParallelReplicasReadingCoordinator::setMode(CoordinationMode mode_)
mode = mode_;
nikitamikhaylov marked this conversation as resolved.
Show resolved Hide resolved
}

void ParallelReplicasReadingCoordinator::markReplicaAsUnavailble(size_t replica_number)
{
std::lock_guard lock(mutex);

if (!pimpl)
{
initialize();
}

return pimpl->markReplicaAsUnavailble(replica_number);
}

void ParallelReplicasReadingCoordinator::initialize()
{
switch (mode)
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h
Expand Up @@ -22,6 +22,12 @@ class ParallelReplicasReadingCoordinator
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement);
ParallelReadResponse handleRequest(ParallelReadRequest request);

/// Called when some replica is unavailable and we skipped it.
/// This is needed to "finalize" reading state e.g. spread all the marks using
/// consistent hashing, because otherwise coordinator will continue working in
/// "pending" state waiting for the unavailable replica to send the announcement.
void markReplicaAsUnavailble(size_t replica_number);

private:
void initialize();

Expand Down
@@ -0,0 +1 @@
1
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh


${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test_parallel_replicas_unavailable_shards"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE test_parallel_replicas_unavailable_shards (n UInt64) ENGINE=MergeTree() ORDER BY tuple()"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10)"

${CLICKHOUSE_CLIENT} --send_logs_level 'debug' --skip_unavailable_shards 1 --allow_experimental_parallel_reading_from_replicas 1 --max_parallel_replicas 11 --use_hedged_requests 0 --cluster_for_parallel_replicas 'parallel_replicas' --parallel_replicas_for_non_replicated_merge_tree 1 -q "SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*)" 2>&1

${CLICKHOUSE_CLIENT} --query="DROP TABLE test_parallel_replicas_unavailable_shards"