Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to make parallel replicas [sl]eepy #63194

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions docker/test/stateful/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,26 @@ function start()
--keeper_server.tcp_port 29181 --keeper_server.server_id 3
fi

if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000
export THREAD_FUZZER_SLEEP_PROBABILITY=0.1
export THREAD_FUZZER_SLEEP_TIME_US_MAX=100000

export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1

export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US_MAX=10000
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US_MAX=10000
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US_MAX=10000
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US_MAX=10000
fi

counter=0
until clickhouse-client --query "SELECT 1"
do
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <iterator>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>

#include <Common/ThreadFuzzer.h>

namespace DB
{
Expand Down Expand Up @@ -45,6 +45,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id
if (no_more_tasks_available)
return nullptr;

ThreadFuzzer::maybeInjectSleep();
if (buffered_ranges.empty())
{
auto result = extension.callback(ParallelReadRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
#include <Common/ThreadFuzzer.h>

namespace DB
{
Expand Down Expand Up @@ -70,6 +71,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
return std::nullopt;
};

ThreadFuzzer::maybeInjectSleep();
if (auto result = get_from_buffer(); result)
return createTask(per_part_infos[task_idx], std::move(*result), previous_task);

Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <boost/algorithm/string/split.hpp>
#include <fmt/core.h>
#include <fmt/format.h>
#include <Common/ThreadFuzzer.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
Expand Down Expand Up @@ -428,6 +429,7 @@ void DefaultCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnno

LOG_DEBUG(log, "Initial request from replica {}: {}", announcement.replica_num, announcement.describe());

ThreadFuzzer::maybeInjectSleep();
initializeReadingState(std::move(announcement));

if (replica_num >= stats.size())
Expand Down Expand Up @@ -740,6 +742,8 @@ ParallelReadResponse DefaultCoordinator::handleRequest(ParallelReadRequest reque
{
LOG_TRACE(log, "Handling request from replica {}, minimal marks size is {}", request.replica_num, request.min_number_of_marks);

ThreadFuzzer::maybeInjectSleep();

ParallelReadResponse response;

size_t current_mark_size = 0;
Expand Down