Skip to content

Commit

Permalink
Merge pull request #56303 from ClickHouse/fix_obscure_segfault
Browse files Browse the repository at this point in the history
Fix rare race condition related to Memory allocation failure
  • Loading branch information
alesapin committed Nov 3, 2023
2 parents 9e21f83 + 3b212a2 commit f697964
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/Common/FailPoint.cpp
Expand Up @@ -39,6 +39,7 @@ static struct InitFiu
REGULAR(cluster_discovery_faults) \
REGULAR(check_table_query_delay_for_part) \
REGULAR(dummy_failpoint) \
REGULAR(prefetched_reader_pool_failpoint) \
PAUSEABLE_ONCE(dummy_pausable_failpoint_once) \
PAUSEABLE(dummy_pausable_failpoint)

Expand Down
59 changes: 46 additions & 13 deletions src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp
Expand Up @@ -12,6 +12,8 @@
#include <base/getThreadId.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>


namespace ProfileEvents
{
Expand All @@ -28,6 +30,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace FailPoints
{
extern const char prefetched_reader_pool_failpoint[];
}

bool MergeTreePrefetchedReadPool::TaskHolder::operator<(const TaskHolder & other) const
{
chassert(task->priority >= 0);
Expand All @@ -37,27 +44,53 @@ bool MergeTreePrefetchedReadPool::TaskHolder::operator<(const TaskHolder & other
return task->priority > other.task->priority; /// Less is better.
}

MergeTreePrefetchedReadPool::PrefetechedReaders::PrefetechedReaders(

MergeTreePrefetchedReadPool::PrefetchedReaders::~PrefetchedReaders()
{
for (auto & prefetch_future : prefetch_futures)
if (prefetch_future.valid())
prefetch_future.wait();
}

MergeTreePrefetchedReadPool::PrefetchedReaders::PrefetchedReaders(
MergeTreeReadTask::Readers readers_,
Priority priority_,
MergeTreePrefetchedReadPool & pool_)
: is_valid(true)
, readers(std::move(readers_))
{
prefetch_futures.push_back(pool_.createPrefetchedFuture(readers.main.get(), priority_));
try
{
prefetch_futures.reserve(1 + readers.prewhere.size());

prefetch_futures.push_back(pool_.createPrefetchedFuture(readers.main.get(), priority_));

for (const auto & reader : readers.prewhere)
prefetch_futures.push_back(pool_.createPrefetchedFuture(reader.get(), priority_));

fiu_do_on(FailPoints::prefetched_reader_pool_failpoint,
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failpoint for prefetched reader enabled");
});
}
catch (...) /// in case of memory exceptions we have to wait
{
for (auto & prefetch_future : prefetch_futures)
if (prefetch_future.valid())
prefetch_future.wait();

for (const auto & reader : readers.prewhere)
prefetch_futures.push_back(pool_.createPrefetchedFuture(reader.get(), priority_));
throw;
}
}

void MergeTreePrefetchedReadPool::PrefetechedReaders::wait()
void MergeTreePrefetchedReadPool::PrefetchedReaders::wait()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
for (auto & prefetch_future : prefetch_futures)
prefetch_future.wait();
}

MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetechedReaders::get()
MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetchedReaders::get()
{
SCOPE_EXIT({ is_valid = false; });
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
Expand Down Expand Up @@ -125,12 +158,12 @@ std::future<void> MergeTreePrefetchedReadPool::createPrefetchedFuture(IMergeTree

void MergeTreePrefetchedReadPool::createPrefetchedReadersForTask(ThreadTask & task)
{
if (task.readers_future.valid())
if (task.isValidReadersFuture())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task already has a reader");

auto extras = getExtras();
auto readers = MergeTreeReadTask::createReaders(task.read_info, extras, task.ranges);
task.readers_future = PrefetechedReaders(std::move(readers), task.priority, *this);
task.readers_future = std::make_unique<PrefetchedReaders>(std::move(readers), task.priority, *this);
}

void MergeTreePrefetchedReadPool::startPrefetches()
Expand Down Expand Up @@ -210,7 +243,7 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::stealTask(size_t thread, Merge

auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->readers_future.valid(); });
[](const auto & task) { return task->isValidReadersFuture(); });

if (task_it == thread_tasks.end())
{
Expand All @@ -237,7 +270,7 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::stealTask(size_t thread, Merge

auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->readers_future.valid(); });
[](const auto & task) { return task->isValidReadersFuture(); });

assert(task_it != thread_tasks.end());
auto thread_task = std::move(*task_it);
Expand Down Expand Up @@ -285,13 +318,13 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::stealTask(size_t thread, Merge

MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::createTask(ThreadTask & task, MergeTreeReadTask * previous_task)
{
if (task.readers_future.valid())
if (task.isValidReadersFuture())
{
auto size_predictor = task.read_info->shared_size_predictor
? std::make_unique<MergeTreeBlockSizePredictor>(*task.read_info->shared_size_predictor)
: nullptr;

return std::make_unique<MergeTreeReadTask>(task.read_info, task.readers_future.get(), task.ranges, std::move(size_predictor));
return std::make_unique<MergeTreeReadTask>(task.read_info, task.readers_future->get(), task.ranges, std::move(size_predictor));
}

return MergeTreeReadPoolBase::createTask(task.read_info, task.ranges, previous_task);
Expand Down Expand Up @@ -563,7 +596,7 @@ std::string MergeTreePrefetchedReadPool::dumpTasks(const TasksPerThread & tasks)
{
result << '\t';
result << ++no << ": ";
result << "reader future: " << task->readers_future.valid() << ", ";
result << "reader future: " << task->isValidReadersFuture() << ", ";
result << "part: " << task->read_info->data_part->name << ", ";
result << "ranges: " << toString(task->ranges);
}
Expand Down
18 changes: 12 additions & 6 deletions src/Storages/MergeTree/MergeTreePrefetchedReadPool.h
Expand Up @@ -48,15 +48,16 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo
size_t required_readers_num = 0;
};

class PrefetechedReaders
class PrefetchedReaders
{
public:
PrefetechedReaders() = default;
PrefetechedReaders(MergeTreeReadTask::Readers readers_, Priority priority_, MergeTreePrefetchedReadPool & pool_);
PrefetchedReaders() = default;
PrefetchedReaders(MergeTreeReadTask::Readers readers_, Priority priority_, MergeTreePrefetchedReadPool & pool_);

void wait();
MergeTreeReadTask::Readers get();
bool valid() const { return is_valid; }
~PrefetchedReaders();

private:
bool is_valid = false;
Expand All @@ -75,14 +76,19 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo

~ThreadTask()
{
if (readers_future.valid())
readers_future.wait();
if (readers_future && readers_future->valid())
readers_future->wait();
}

bool isValidReadersFuture() const
{
return readers_future && readers_future->valid();
}

InfoPtr read_info;
MarkRanges ranges;
Priority priority;
PrefetechedReaders readers_future;
std::unique_ptr<PrefetchedReaders> readers_future;
};

struct TaskHolder
Expand Down
Empty file.
24 changes: 24 additions & 0 deletions tests/queries/0_stateless/02910_prefetch_unexpceted_exception.sql
@@ -0,0 +1,24 @@
-- Tags: no-parallel, no-random-settings, no-random-merge-tree-settings
-- no-parallel -- enables failpoint
-- no-random-settings -- depend on type of part, should always fail
drop table if exists prefetched_table;

CREATE TABLE prefetched_table(key UInt64, s String) Engine = MergeTree() order by key;

INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);

SET local_filesystem_read_prefetch=1;
SET allow_prefetched_read_pool_for_remote_filesystem=1;
SET allow_prefetched_read_pool_for_local_filesystem=1;

SYSTEM ENABLE FAILPOINT prefetched_reader_pool_failpoint;

SELECT * FROM prefetched_table FORMAT Null; --{serverError 36}

SYSTEM DISABLE FAILPOINT prefetched_reader_pool_failpoint;

drop table if exists prefetched_table;

0 comments on commit f697964

Please sign in to comment.