Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider lightweight deleted rows when selecting parts to merge #58223

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,15 @@ UInt64 IMergeTreeDataPart::getMarksCount() const
return index_granularity.getMarksCount();
}

UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
{
if (storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge && supportLightweightDeleteMutate() && hasLightweightDelete()
&& existing_rows_count.has_value() && existing_rows_count.value() < rows_count && rows_count > 0)
return bytes_on_disk * existing_rows_count.value() / rows_count;
else
return bytes_on_disk;
}

size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{
auto checksum = checksums.files.find(file_name);
Expand Down Expand Up @@ -691,6 +700,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks

calculateColumnsAndSecondaryIndicesSizesOnDisk();
loadRowsCount(); /// Must be called after loadIndexGranularity() as it uses the value of `index_granularity`.
loadExistingRowsCount(); /// Must be called after loadRowsCount() as it uses the value of `rows_count`.
loadPartitionAndMinMaxIndex();
if (!parent_part)
{
Expand Down Expand Up @@ -1313,6 +1323,87 @@ void IMergeTreeDataPart::loadRowsCount()
}
}

void IMergeTreeDataPart::loadExistingRowsCount()
{
if (existing_rows_count.has_value())
return;

if (!rows_count || !storage.getSettings()->load_existing_rows_count_for_old_parts || !supportLightweightDeleteMutate()
|| !hasLightweightDelete())
existing_rows_count = rows_count;
else
existing_rows_count = readExistingRowsCount();
}

UInt64 IMergeTreeDataPart::readExistingRowsCount()
{
const size_t total_mark = getMarksCount();
if (!total_mark)
return rows_count;

NamesAndTypesList cols;
cols.emplace_back(RowExistsColumn::name, RowExistsColumn::type);

StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);

MergeTreeReaderPtr reader = getReader(
cols,
storage_snapshot_ptr,
MarkRanges{MarkRange(0, total_mark)},
/*virtual_fields=*/ {},
/*uncompressed_cache=*/{},
storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{},
ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{});

if (!reader)
{
LOG_WARNING(storage.log, "Create reader failed while reading existing rows count");
return rows_count;
}

size_t current_mark = 0;
bool continue_reading = false;
size_t current_row = 0;
size_t existing_count = 0;

while (current_row < rows_count)
{
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
continue_reading = (current_mark != 0);

Columns result;
result.resize(1);

size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result);
if (!rows_read)
{
LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name);
return rows_count;
}

current_row += rows_read;
current_mark += (rows_to_read == rows_read);

const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(result[0].get());
if (!row_exists_col)
{
LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name);
return rows_count;
}

for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;
}

LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_count);
return existing_count;
}

void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{
files.push_back("count.txt");
Expand Down
13 changes: 13 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

size_t rows_count = 0;

/// Existing rows count (excluding lightweight deleted rows)
std::optional<size_t> existing_rows_count;

time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
Expand Down Expand Up @@ -373,6 +376,10 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }
void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; }

/// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true
/// Otherwise returns bytes_on_disk
UInt64 getExistingBytesOnDisk() const;

size_t getFileSizeOrZero(const String & file_name) const;
auto getFilesChecksums() const { return checksums.files; }

Expand Down Expand Up @@ -499,6 +506,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const;

/// Read existing rows count from _row_exists column
UInt64 readExistingRowsCount();

void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);

/// Checks the consistency of this data part.
Expand Down Expand Up @@ -664,6 +674,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();

/// Load existing rows count from _row_exists column if load_existing_rows_count_for_old_parts is true.
void loadExistingRowsCount();

static void appendFilesOfRowsCount(Strings & files);

/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
}

/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true);

/// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8261,6 +8261,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE

new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->existing_rows_count = block.rows();

new_data_part->partition = partition;

Expand Down
11 changes: 7 additions & 4 deletions src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
}

IMergeSelector::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.size = part->getExistingBytesOnDisk();
part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
Expand Down Expand Up @@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
return SelectPartsDecision::CANNOT_SELECT;
}

sum_bytes += (*it)->getBytesOnDisk();
sum_bytes += (*it)->getExistingBytesOnDisk();

prev_it = it;
++it;
Expand Down Expand Up @@ -793,7 +793,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
}


size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & account_for_deleted)
{
size_t res = 0;
time_t current_time = std::time(nullptr);
Expand All @@ -804,7 +804,10 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
if (part_max_ttl && part_max_ttl <= current_time)
continue;

res += part->getBytesOnDisk();
if (account_for_deleted)
res += part->getExistingBytesOnDisk();
else
res += part->getBytesOnDisk();
}

return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeDataMergerMutator.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class MergeTreeDataMergerMutator


/// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & account_for_deleted = false);

private:
/** Select all parts belonging to the same partition.
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(

new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->existing_rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct Settings;
M(UInt64, compact_parts_max_bytes_to_buffer, 128 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_max_granules_to_buffer, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_merge_max_bytes_to_prefetch_part, 16 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(Bool, load_existing_rows_count_for_old_parts, false, "Whether to load existing_rows_count for existing parts. If false, existing_rows_count will be equal to rows_count for existing parts.", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
Expand Down Expand Up @@ -79,6 +80,7 @@ struct Settings;
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/MergeTree/MergedBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();

/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
if (!new_part->existing_rows_count.has_value())
new_part->existing_rows_count = rows_count;

if (default_codec != nullptr)
new_part->default_codec = default_codec;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MutateFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
}

/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false);

if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
Expand Down
54 changes: 54 additions & 0 deletions src/Storages/MergeTree/MutateTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis
return true;
}

static UInt64 getExistingRowsCount(const Block & block)
{
auto column = block.getByName(RowExistsColumn::name).column;
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(column.get());

if (!row_exists_col)
{
LOG_WARNING(&Poco::Logger::get("MutationHelpers::getExistingRowsCount"), "_row_exists column type is not UInt8");
return block.rows();
}

UInt64 existing_count = 0;

for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;

return existing_count;
}

/** Split mutation commands into two parts:
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
Expand Down Expand Up @@ -997,6 +1017,9 @@ struct MutationContext
bool need_prefix = true;

scope_guard temporary_directory_lock;

/// Whether this mutation contains lightweight delete
bool has_lightweight_delete;
};

using MutationContextPtr = std::shared_ptr<MutationContext>;
Expand Down Expand Up @@ -1191,6 +1214,7 @@ class PartMergerWriter
}
case State::SUCCESS:
{
finalize();
return false;
}
}
Expand Down Expand Up @@ -1226,6 +1250,11 @@ class PartMergerWriter
const ProjectionsDescription & projections;

ExecutableTaskPtr merge_projection_parts_task_ptr;

/// Existing rows count calculated during part writing.
/// It is initialized in prepare(), calculated in mutateOriginalPartAndPrepareProjections()
/// and set to new_data_part in finalize()
size_t existing_rows_count;
};


Expand All @@ -1238,6 +1267,8 @@ void PartMergerWriter::prepare()
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}

existing_rows_count = 0;
}


Expand All @@ -1251,6 +1282,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()

ctx->out->write(cur_block);

if (ctx->has_lightweight_delete)
existing_rows_count += MutationHelpers::getExistingRowsCount(cur_block);
yakov-olkhovskiy marked this conversation as resolved.
Show resolved Hide resolved

for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
Expand Down Expand Up @@ -1340,6 +1374,12 @@ bool PartMergerWriter::iterateThroughAllProjections()
return true;
}

void PartMergerWriter::finalize()
{
if (ctx->has_lightweight_delete)
ctx->new_data_part->existing_rows_count = existing_rows_count;
}

class MutateAllPartColumnsTask : public IExecutableTask
{
public:
Expand Down Expand Up @@ -2185,6 +2225,20 @@ bool MutateTask::prepare()
if (ctx->mutating_pipeline_builder.initialized())
ctx->execute_ttl_type = MutationHelpers::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies());

if (ctx->updated_header.has(RowExistsColumn::name))
{
/// This mutation contains lightweight delete, reset existing_rows_count of new data part to 0
/// It will be updated while writing _row_exists column
ctx->has_lightweight_delete = true;
}
else
{
ctx->has_lightweight_delete = false;

/// This mutation does not contains lightweight delete, copy existing_rows_count from source part
ctx->new_data_part->existing_rows_count = ctx->source_part->existing_rows_count.value_or(ctx->source_part->rows_count);
}

/// All columns from part are changed and may be some more that were missing before in part
/// TODO We can materialize compact part without copying data
if (!isWidePart(ctx->source_part) || !isFullPartStorage(ctx->source_part->getDataPartStorage())
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
{
sum_parts_size_in_bytes += part->getBytesOnDisk();
if (entry.type == LogEntry::MERGE_PARTS)
sum_parts_size_in_bytes += part->getExistingBytesOnDisk();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();

if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{
Expand Down