Skip to content

Commit

Permalink
Try to make parallel replicas sleepy
Browse files Browse the repository at this point in the history
  • Loading branch information
alesapin committed Apr 30, 2024
1 parent 71589c2 commit 02f1cce
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 1 deletion.
20 changes: 20 additions & 0 deletions docker/test/stateful/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,26 @@ function start()
--keeper_server.tcp_port 29181 --keeper_server.server_id 3
fi

if [[ -n "$USE_PARALLEL_REPLICAS" ]] && [[ "$USE_PARALLEL_REPLICAS" -eq 1 ]]; then
export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000
export THREAD_FUZZER_SLEEP_PROBABILITY=0.1
export THREAD_FUZZER_SLEEP_TIME_US_MAX=100000

export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1

export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001
export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US_MAX=10000
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US_MAX=10000
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US_MAX=10000
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US_MAX=10000
fi

counter=0
until clickhouse-client --query "SELECT 1"
do
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <iterator>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>

#include <Common/ThreadFuzzer.h>

namespace DB
{
Expand Down Expand Up @@ -45,6 +45,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id
if (no_more_tasks_available)
return nullptr;

ThreadFuzzer::maybeInjectSleep();
if (buffered_ranges.empty())
{
auto result = extension.callback(ParallelReadRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
#include <Common/ThreadFuzzer.h>

namespace DB
{
Expand Down Expand Up @@ -70,6 +71,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
return std::nullopt;
};

ThreadFuzzer::maybeInjectSleep();
if (auto result = get_from_buffer(); result)
return createTask(per_part_infos[task_idx], std::move(*result), previous_task);

Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <boost/algorithm/string/split.hpp>
#include <fmt/core.h>
#include <fmt/format.h>
#include <Common/ThreadFuzzer.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
Expand Down Expand Up @@ -428,6 +429,7 @@ void DefaultCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnno

LOG_DEBUG(log, "Initial request from replica {}: {}", announcement.replica_num, announcement.describe());

ThreadFuzzer::maybeInjectSleep();
initializeReadingState(std::move(announcement));

if (replica_num >= stats.size())
Expand Down Expand Up @@ -740,6 +742,8 @@ ParallelReadResponse DefaultCoordinator::handleRequest(ParallelReadRequest reque
{
LOG_TRACE(log, "Handling request from replica {}, minimal marks size is {}", request.replica_num, request.min_number_of_marks);

ThreadFuzzer::maybeInjectSleep();

ParallelReadResponse response;

size_t current_mark_size = 0;
Expand Down

0 comments on commit 02f1cce

Please sign in to comment.