Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rare race condition related to Memory allocation failure #56303

Merged
merged 5 commits into from Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common/FailPoint.cpp
Expand Up @@ -39,6 +39,7 @@ static struct InitFiu
REGULAR(cluster_discovery_faults) \
REGULAR(check_table_query_delay_for_part) \
REGULAR(dummy_failpoint) \
REGULAR(prefeteched_reader_pool_failpoint) \
alesapin marked this conversation as resolved.
Show resolved Hide resolved
alesapin marked this conversation as resolved.
Show resolved Hide resolved
PAUSEABLE_ONCE(dummy_pausable_failpoint_once) \
PAUSEABLE(dummy_pausable_failpoint)

Expand Down
51 changes: 41 additions & 10 deletions src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp
Expand Up @@ -12,6 +12,8 @@
#include <base/getThreadId.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>


namespace ProfileEvents
{
Expand All @@ -28,6 +30,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace FailPoints
{
extern const char prefeteched_reader_pool_failpoint[];
alesapin marked this conversation as resolved.
Show resolved Hide resolved
}

bool MergeTreePrefetchedReadPool::TaskHolder::operator<(const TaskHolder & other) const
{
chassert(task->priority >= 0);
Expand All @@ -37,17 +44,41 @@ bool MergeTreePrefetchedReadPool::TaskHolder::operator<(const TaskHolder & other
return task->priority > other.task->priority; /// Less is better.
}


MergeTreePrefetchedReadPool::PrefetechedReaders::~PrefetechedReaders()
{
for (auto & prefetch_future : prefetch_futures)
if (prefetch_future.valid())
prefetch_future.wait();
}

MergeTreePrefetchedReadPool::PrefetechedReaders::PrefetechedReaders(
MergeTreeReadTask::Readers readers_,
Priority priority_,
MergeTreePrefetchedReadPool & pool_)
: is_valid(true)
, readers(std::move(readers_))
{
prefetch_futures.push_back(pool_.createPrefetchedFuture(readers.main.get(), priority_));
try
{
prefetch_futures.push_back(pool_.createPrefetchedFuture(readers.main.get(), priority_));

for (const auto & reader : readers.prewhere)
prefetch_futures.push_back(pool_.createPrefetchedFuture(reader.get(), priority_));

fiu_do_on(FailPoints::prefeteched_reader_pool_failpoint,
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failpoint for prefeteched reader enabled");
});
}
catch (...) /// in case of memory exceptions we have to wait
{
for (auto & prefetch_future : prefetch_futures)
antonio2368 marked this conversation as resolved.
Show resolved Hide resolved
if (prefetch_future.valid())
prefetch_future.wait();

for (const auto & reader : readers.prewhere)
prefetch_futures.push_back(pool_.createPrefetchedFuture(reader.get(), priority_));
throw;
}
}

void MergeTreePrefetchedReadPool::PrefetechedReaders::wait()
Expand Down Expand Up @@ -125,12 +156,12 @@ std::future<void> MergeTreePrefetchedReadPool::createPrefetchedFuture(IMergeTree

void MergeTreePrefetchedReadPool::createPrefetchedReadersForTask(ThreadTask & task)
{
if (task.readers_future.valid())
if (task.isValidReadersFuture())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task already has a reader");

auto extras = getExtras();
auto readers = MergeTreeReadTask::createReaders(task.read_info, extras, task.ranges);
task.readers_future = PrefetechedReaders(std::move(readers), task.priority, *this);
task.readers_future = std::make_unique<PrefetechedReaders>(std::move(readers), task.priority, *this);
}

void MergeTreePrefetchedReadPool::startPrefetches()
Expand Down Expand Up @@ -210,7 +241,7 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::stealTask(size_t thread, Merge

auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->readers_future.valid(); });
[](const auto & task) { return task->isValidReadersFuture(); });

if (task_it == thread_tasks.end())
{
Expand All @@ -237,7 +268,7 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::stealTask(size_t thread, Merge

auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->readers_future.valid(); });
[](const auto & task) { return task->isValidReadersFuture(); });

assert(task_it != thread_tasks.end());
auto thread_task = std::move(*task_it);
Expand Down Expand Up @@ -285,13 +316,13 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::stealTask(size_t thread, Merge

MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::createTask(ThreadTask & task, MergeTreeReadTask * previous_task)
{
if (task.readers_future.valid())
if (task.isValidReadersFuture())
{
auto size_predictor = task.read_info->shared_size_predictor
? std::make_unique<MergeTreeBlockSizePredictor>(*task.read_info->shared_size_predictor)
: nullptr;

return std::make_unique<MergeTreeReadTask>(task.read_info, task.readers_future.get(), task.ranges, std::move(size_predictor));
return std::make_unique<MergeTreeReadTask>(task.read_info, task.readers_future->get(), task.ranges, std::move(size_predictor));
}

return MergeTreeReadPoolBase::createTask(task.read_info, task.ranges, previous_task);
Expand Down Expand Up @@ -563,7 +594,7 @@ std::string MergeTreePrefetchedReadPool::dumpTasks(const TasksPerThread & tasks)
{
result << '\t';
result << ++no << ": ";
result << "reader future: " << task->readers_future.valid() << ", ";
result << "reader future: " << task->isValidReadersFuture() << ", ";
result << "part: " << task->read_info->data_part->name << ", ";
result << "ranges: " << toString(task->ranges);
}
Expand Down
12 changes: 9 additions & 3 deletions src/Storages/MergeTree/MergeTreePrefetchedReadPool.h
Expand Up @@ -57,6 +57,7 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo
void wait();
MergeTreeReadTask::Readers get();
bool valid() const { return is_valid; }
~PrefetechedReaders();

private:
bool is_valid = false;
Expand All @@ -75,14 +76,19 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo

~ThreadTask()
{
if (readers_future.valid())
readers_future.wait();
if (readers_future && readers_future->valid())
readers_future->wait();
}

bool isValidReadersFuture() const
{
return readers_future && readers_future->valid();
}

InfoPtr read_info;
MarkRanges ranges;
Priority priority;
PrefetechedReaders readers_future;
std::unique_ptr<PrefetechedReaders> readers_future;
};

struct TaskHolder
Expand Down
Empty file.
24 changes: 24 additions & 0 deletions tests/queries/0_stateless/02910_prefetch_unexpceted_exception.sql
@@ -0,0 +1,24 @@
-- Tags: no-parallel, no-random-settings, no-random-merge-tree-settings
-- no-parallel -- enables failpoint
-- no-random-settings -- depend on type of part, should always fail
drop table if exists prefetched_table;

CREATE TABLE prefetched_table(key UInt64, s String) Engine = MergeTree() order by key;

INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);
INSERT INTO prefetched_table SELECT rand(), randomString(5) from numbers(1000);

SET local_filesystem_read_prefetch=1;
SET allow_prefetched_read_pool_for_remote_filesystem=1;
SET allow_prefetched_read_pool_for_local_filesystem=1;

SYSTEM ENABLE FAILPOINT prefeteched_reader_pool_failpoint;

SELECT * FROM prefetched_table FORMAT Null; --{serverError 36}

SYSTEM DISABLE FAILPOINT prefeteched_reader_pool_failpoint;

drop table if exists prefetched_table;