Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve concurrent parts removal with zero copy replication #49630

Merged
merged 5 commits into from May 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
123 changes: 102 additions & 21 deletions src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -19,6 +19,7 @@
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadFuzzer.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
Expand Down Expand Up @@ -2447,9 +2448,13 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
}

/// Parallel parts removal.
size_t num_threads = std::min<size_t>(settings->max_part_removal_threads, parts_to_remove.size());
size_t num_threads = settings->max_part_removal_threads;
if (!num_threads)
num_threads = getNumberOfPhysicalCPUCores() * 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't understand why size of this pure IO-thread pool depend on number of cores.... But maybe it make sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for max_part_removal_threads = 0 which means "auto", and it's default value. Previously it was just getNumberOfPhysicalCPUCores() and also min(threads, parts) did not work correctly when thread = 0. We can change the default value to, for example, 32 or 128, and throw an exception if it's set to 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, agree

num_threads = std::min<size_t>(num_threads, parts_to_remove.size());
std::mutex part_names_mutex;
ThreadPool pool(CurrentMetrics::MergeTreePartsCleanerThreads, CurrentMetrics::MergeTreePartsCleanerThreadsActive, num_threads);
ThreadPool pool(CurrentMetrics::MergeTreePartsCleanerThreads, CurrentMetrics::MergeTreePartsCleanerThreadsActive,
num_threads, num_threads, /* unlimited queue size */ 0);

bool has_zero_copy_parts = false;
if (settings->allow_remote_fs_zero_copy_replication && dynamic_cast<StorageReplicatedMergeTree *>(this) != nullptr)
Expand Down Expand Up @@ -2506,27 +2511,102 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
/// We remove disjoint subsets of parts in parallel.
/// The problem is that it's not trivial to divide Outdated parts into disjoint subsets,
/// because Outdated parts legally can be intersecting (but intersecting parts must be separated by a DROP_RANGE).
/// So we ignore level and version and use block numbers only.
ActiveDataPartSet independent_ranges_set(format_version);
for (const auto & part : parts_to_remove)
/// So we ignore level and version and use block numbers only (they cannot intersect by block numbers unless we have a bug).

struct RemovalRanges
{
MergeTreePartInfo range_info = part->info;
range_info.level = static_cast<UInt32>(range_info.max_block - range_info.min_block);
range_info.mutation = 0;
independent_ranges_set.add(range_info, range_info.getPartNameV1());
}
std::vector<MergeTreePartInfo> infos;
std::vector<DataPartsVector> parts;
std::vector<UInt64> split_level;
};

auto split_into_independent_ranges = [this](const DataPartsVector & parts_to_remove_, size_t split_level = 0) -> RemovalRanges
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please, let's make split_level without default value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it's not level (collision with term merge level). It's just how many times we tried to split source range? (split_times?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a kind of "reversed level", each time we increase it, we get parts with lower merge levels. Okay, let's rename

{
ActiveDataPartSet independent_ranges_set(format_version);
for (const auto & part : parts_to_remove_)
{
MergeTreePartInfo range_info = part->info;
range_info.level = static_cast<UInt32>(range_info.max_block - range_info.min_block);
range_info.mutation = 0;
independent_ranges_set.add(range_info, range_info.getPartNameV1());
}

RemovalRanges independent_ranges;
independent_ranges.infos = independent_ranges_set.getPartInfos();
size_t num_ranges = independent_ranges.infos.size();
independent_ranges.parts.resize(num_ranges);
independent_ranges.split_level.resize(num_ranges, split_level);
size_t avg_range_size = parts_to_remove_.size() / num_ranges;

size_t sum_of_ranges = 0;
for (size_t i = 0; i < num_ranges; ++i)
{
MergeTreePartInfo & range = independent_ranges.infos[i];
DataPartsVector & parts_in_range = independent_ranges.parts[i];
range.level = MergeTreePartInfo::MAX_LEVEL;
range.mutation = MergeTreePartInfo::MAX_BLOCK_NUMBER;

parts_in_range.reserve(avg_range_size * 2);
for (const auto & part : parts_to_remove_)
if (range.contains(part->info))
parts_in_range.push_back(part);
sum_of_ranges += parts_in_range.size();
}

if (parts_to_remove_.size() != sum_of_ranges)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of removed parts is not equal to number of parts in independent ranges "
"({} != {}), it's a bug", parts_to_remove_.size(), sum_of_ranges);

return independent_ranges;
};

auto independent_ranges_infos = independent_ranges_set.getPartInfos();
RemovalRanges independent_ranges = split_into_independent_ranges(parts_to_remove);
size_t num_ranges = independent_ranges.infos.size();
size_t sum_of_ranges = 0;
for (auto range : independent_ranges_infos)
size_t total_excluded = 0;
for (size_t i = 0; i < num_ranges; ++i)
{
range.level = MergeTreePartInfo::MAX_LEVEL;
range.mutation = MergeTreePartInfo::MAX_BLOCK_NUMBER;
MergeTreePartInfo & range = independent_ranges.infos[i];
DataPartsVector & parts_in_range = independent_ranges.parts[i];
UInt64 split_level = independent_ranges.split_level[i];

/// It may happen that we have a huge part covering thousands small parts.
/// In this case, we will get a huge range that will be process by only one thread causing really long tail latency.
/// Let's try to exclude such parts in order to get smaller tasks for thread pool and more uniform distribution.
if (settings->concurrent_part_removal_threshold < parts_in_range.size() &&
split_level < settings->zero_copy_concurrent_part_removal_max_split_level)
{
auto top_level_parts_pred = [&range](const DataPartPtr & part)
{
return part->info.min_block == range.min_block && part->info.max_block == range.max_block;
};

size_t top_level_count = std::count_if(parts_in_range.begin(), parts_in_range.end(), top_level_parts_pred);
if (settings->zero_copy_concurrent_part_removal_max_postpone_ratio < static_cast<Float32>(top_level_count) / parts_in_range.size())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is not obvious. So if we have range with 100 parts where 1 is covering and 99 is covered (merge from [0, ..., 99] to [0_99]) it will be true (while it should be false, isn't it?). But I'm not sure about right condition here...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should it be false? The purpose of this condition is to limit excluded parts percentage. For example, if we have 100 parts and 50 of them are top-level (it's possible when we have long mutations chain or repeated merges of the same blocks range), then excluding 50 parts probably will make it worse

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

{
/// Most likely we have a long mutations chain here
LOG_DEBUG(log, "Block range {} contains {} parts including {} top-level parts, will not try to split it",
range.getPartNameForLogs(), parts_in_range.size(), top_level_count);
}
else
{
auto new_end_it = std::remove_if(parts_in_range.begin(), parts_in_range.end(), top_level_parts_pred);
parts_in_range.erase(new_end_it, parts_in_range.end());

RemovalRanges subranges = split_into_independent_ranges(parts_in_range, split_level + 1);

LOG_DEBUG(log, "Block range {} contained {} parts, it was split into {} independent subranges after excluding {} top-level parts",
range.getPartNameForLogs(), parts_in_range.size() + top_level_count, subranges.infos.size(), top_level_count);

std::move(subranges.infos.begin(), subranges.infos.end(), std::back_inserter(independent_ranges.infos));
std::move(subranges.parts.begin(), subranges.parts.end(), std::back_inserter(independent_ranges.parts));
std::move(subranges.split_level.begin(), subranges.split_level.end(), std::back_inserter(independent_ranges.split_level));
num_ranges += subranges.infos.size();
total_excluded += top_level_count;
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is redundant and why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just last line of loop, looks like we will continue without this continue :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the last line

}
}

DataPartsVector parts_in_range;
for (const auto & part : parts_to_remove)
if (range.contains(part->info))
parts_in_range.push_back(part);
sum_of_ranges += parts_in_range.size();

pool.scheduleOrThrowOnError(
Expand Down Expand Up @@ -2555,9 +2635,10 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t

pool.wait();

if (parts_to_remove.size() != sum_of_ranges)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of removed parts is not equal to number of parts in independent ranges "
"({} != {}), it's a bug", parts_to_remove.size(), sum_of_ranges);
if (parts_to_remove.size() != sum_of_ranges + total_excluded)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Number of parts to remove was not equal to number of parts in independent ranges and excluded parts"
"({} != {} + {}), it's a bug", parts_to_remove.size(), sum_of_ranges, total_excluded);
}

size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -153,6 +153,8 @@ struct Settings;
M(MaxThreads, max_part_loading_threads, 0, "The number of threads to load data parts at startup.", 0) \
M(MaxThreads, max_part_removal_threads, 0, "The number of threads for concurrent removal of inactive data parts. One is usually enough, but in 'Google Compute Environment SSD Persistent Disks' file removal (unlink) operation is extraordinarily slow and you probably have to increase this number (recommended is up to 16).", 0) \
M(UInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \
M(UInt64, zero_copy_concurrent_part_removal_max_split_level, 3, "Max recursion depth for splitting independent Outdated parts ranges into smaller subranges (highly not recommended to change)", 0) \
M(Float, zero_copy_concurrent_part_removal_max_postpone_ratio, static_cast<Float32>(0.05), "Max percentage of top level parts to postpone removal in order to get smaller independent ranges (highly not recommended to change)", 0) \
M(String, storage_policy, "default", "Name of storage disk policy", 0) \
M(String, disk, "", "Name of storage disk. Can be specified instead of storage policy.", 0) \
M(Bool, allow_nullable_key, false, "Allow Nullable types as primary keys.", 0) \
Expand Down