Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tavplubix committed May 15, 2023
1 parent 1b2c774 commit 7298652
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
98 changes: 61 additions & 37 deletions src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -2500,10 +2500,10 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
{
std::vector<MergeTreePartInfo> infos;
std::vector<DataPartsVector> parts;
std::vector<UInt64> split_level;
std::vector<UInt64> split_times;
};

auto split_into_independent_ranges = [this](const DataPartsVector & parts_to_remove_, size_t split_level = 0) -> RemovalRanges
auto split_into_independent_ranges = [this](const DataPartsVector & parts_to_remove_, size_t split_times) -> RemovalRanges
{
ActiveDataPartSet independent_ranges_set(format_version);
for (const auto & part : parts_to_remove_)
Expand All @@ -2518,7 +2518,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
independent_ranges.infos = independent_ranges_set.getPartInfos();
size_t num_ranges = independent_ranges.infos.size();
independent_ranges.parts.resize(num_ranges);
independent_ranges.split_level.resize(num_ranges, split_level);
independent_ranges.split_times.resize(num_ranges, split_times);
size_t avg_range_size = parts_to_remove_.size() / num_ranges;

size_t sum_of_ranges = 0;
Expand All @@ -2543,85 +2543,109 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
return independent_ranges;
};

RemovalRanges independent_ranges = split_into_independent_ranges(parts_to_remove);
auto schedule_parts_removal = [this, &pool, &part_names_mutex, part_names_succeed](
const MergeTreePartInfo & range, DataPartsVector && parts_in_range)
{
/// Below, range should be captured by copy to avoid use-after-scope on exception from pool
pool.scheduleOrThrowOnError(
[this, range, &part_names_mutex, part_names_succeed, thread_group = CurrentThread::getGroup(), batch = std::move(parts_in_range)]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);

LOG_TRACE(log, "Removing {} parts in blocks range {}", batch.size(), range.getPartNameForLogs());

for (const auto & part : batch)
{
asMutableDeletingPart(part)->remove();
if (part_names_succeed)
{
std::lock_guard lock(part_names_mutex);
part_names_succeed->insert(part->name);
}
}
});
};

RemovalRanges independent_ranges = split_into_independent_ranges(parts_to_remove, /* split_times */ 0);
DataPartsVector excluded_parts;
size_t num_ranges = independent_ranges.infos.size();
size_t sum_of_ranges = 0;
size_t total_excluded = 0;
for (size_t i = 0; i < num_ranges; ++i)
{
MergeTreePartInfo & range = independent_ranges.infos[i];
DataPartsVector & parts_in_range = independent_ranges.parts[i];
UInt64 split_level = independent_ranges.split_level[i];
UInt64 split_times = independent_ranges.split_times[i];

/// It may happen that we have a huge part covering thousands small parts.
/// In this case, we will get a huge range that will be process by only one thread causing really long tail latency.
/// Let's try to exclude such parts in order to get smaller tasks for thread pool and more uniform distribution.
if (settings->concurrent_part_removal_threshold < parts_in_range.size() &&
split_level < settings->zero_copy_concurrent_part_removal_max_split_level)
split_times < settings->zero_copy_concurrent_part_removal_max_split_times)
{
auto top_level_parts_pred = [&range](const DataPartPtr & part)
auto smaller_parts_pred = [&range](const DataPartPtr & part)
{
return part->info.min_block == range.min_block && part->info.max_block == range.max_block;
return !(part->info.min_block == range.min_block && part->info.max_block == range.max_block);
};

size_t top_level_count = std::count_if(parts_in_range.begin(), parts_in_range.end(), top_level_parts_pred);
if (settings->zero_copy_concurrent_part_removal_max_postpone_ratio < static_cast<Float32>(top_level_count) / parts_in_range.size())
size_t covered_parts_count = std::count_if(parts_in_range.begin(), parts_in_range.end(), smaller_parts_pred);
size_t top_level_count = parts_in_range.size() - covered_parts_count;
chassert(top_level_count);
Float32 parts_to_exclude_ratio = static_cast<Float32>(top_level_count) / parts_in_range.size();
if (settings->zero_copy_concurrent_part_removal_max_postpone_ratio < parts_to_exclude_ratio)
{
/// Most likely we have a long mutations chain here
LOG_DEBUG(log, "Block range {} contains {} parts including {} top-level parts, will not try to split it",
range.getPartNameForLogs(), parts_in_range.size(), top_level_count);
}
else
{
auto new_end_it = std::remove_if(parts_in_range.begin(), parts_in_range.end(), top_level_parts_pred);
auto new_end_it = std::partition(parts_in_range.begin(), parts_in_range.end(), smaller_parts_pred);
std::move(new_end_it, parts_in_range.end(), std::back_inserter(excluded_parts));
parts_in_range.erase(new_end_it, parts_in_range.end());

RemovalRanges subranges = split_into_independent_ranges(parts_in_range, split_level + 1);
RemovalRanges subranges = split_into_independent_ranges(parts_in_range, split_times + 1);

LOG_DEBUG(log, "Block range {} contained {} parts, it was split into {} independent subranges after excluding {} top-level parts",
range.getPartNameForLogs(), parts_in_range.size() + top_level_count, subranges.infos.size(), top_level_count);

std::move(subranges.infos.begin(), subranges.infos.end(), std::back_inserter(independent_ranges.infos));
std::move(subranges.parts.begin(), subranges.parts.end(), std::back_inserter(independent_ranges.parts));
std::move(subranges.split_level.begin(), subranges.split_level.end(), std::back_inserter(independent_ranges.split_level));
std::move(subranges.split_times.begin(), subranges.split_times.end(), std::back_inserter(independent_ranges.split_times));
num_ranges += subranges.infos.size();
total_excluded += top_level_count;
continue;
}
}

sum_of_ranges += parts_in_range.size();

pool.scheduleOrThrowOnError(
[this, range, &part_names_mutex, part_names_succeed, thread_group = CurrentThread::getGroup(), batch = std::move(parts_in_range)]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
schedule_parts_removal(range, std::move(parts_in_range));
}

LOG_TRACE(log, "Removing {} parts in blocks range {}", batch.size(), range.getPartNameForLogs());
/// Remove excluded parts as well. They were reordered, so sort them again
std::sort(excluded_parts.begin(), excluded_parts.end(), [](const auto & x, const auto & y) { return x->info < y->info; });
LOG_TRACE(log, "Will remove {} big parts separately: {}", excluded_parts.size(), fmt::join(excluded_parts, ", "));

for (const auto & part : batch)
{
asMutableDeletingPart(part)->remove();
if (part_names_succeed)
{
std::lock_guard lock(part_names_mutex);
part_names_succeed->insert(part->name);
}
}
});
independent_ranges = split_into_independent_ranges(excluded_parts, /* split_times */ 0);
pool.wait();

for (size_t i = 0; i < independent_ranges.infos.size(); ++i)
{
MergeTreePartInfo & range = independent_ranges.infos[i];
DataPartsVector & parts_in_range = independent_ranges.parts[i];
schedule_parts_removal(range, std::move(parts_in_range));
}

pool.wait();

if (parts_to_remove.size() != sum_of_ranges + total_excluded)
if (parts_to_remove.size() != sum_of_ranges + excluded_parts.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Number of parts to remove was not equal to number of parts in independent ranges and excluded parts"
"({} != {} + {}), it's a bug", parts_to_remove.size(), sum_of_ranges, total_excluded);
"({} != {} + {}), it's a bug", parts_to_remove.size(), sum_of_ranges, excluded_parts.size());
}

size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -146,7 +146,7 @@ struct Settings;
M(MaxThreads, max_part_loading_threads, 0, "The number of threads to load data parts at startup.", 0) \
M(MaxThreads, max_part_removal_threads, 0, "The number of threads for concurrent removal of inactive data parts. One is usually enough, but in 'Google Compute Environment SSD Persistent Disks' file removal (unlink) operation is extraordinarily slow and you probably have to increase this number (recommended is up to 16).", 0) \
M(UInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \
M(UInt64, zero_copy_concurrent_part_removal_max_split_level, 3, "Max recursion depth for splitting independent Outdated parts ranges into smaller subranges (highly not recommended to change)", 0) \
M(UInt64, zero_copy_concurrent_part_removal_max_split_times, 5, "Max recursion depth for splitting independent Outdated parts ranges into smaller subranges (highly not recommended to change)", 0) \
M(Float, zero_copy_concurrent_part_removal_max_postpone_ratio, static_cast<Float32>(0.05), "Max percentage of top level parts to postpone removal in order to get smaller independent ranges (highly not recommended to change)", 0) \
M(String, storage_policy, "default", "Name of storage disk policy", 0) \
M(String, disk, "", "Name of storage disk. Can be specified instead of storage policy.", 0) \
Expand Down

0 comments on commit 7298652

Please sign in to comment.