Skip to content

Commit

Permalink
Merge pull request #58223 from jewelzqiu/existing-count
Browse files Browse the repository at this point in the history
Consider lightweight deleted rows when selecting parts to merge
  • Loading branch information
yakov-olkhovskiy committed Mar 15, 2024
2 parents 3190e46 + be4554b commit a31f551
Show file tree
Hide file tree
Showing 15 changed files with 213 additions and 10 deletions.
91 changes: 91 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,15 @@ UInt64 IMergeTreeDataPart::getMarksCount() const
return index_granularity.getMarksCount();
}

UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
{
if (storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge && supportLightweightDeleteMutate() && hasLightweightDelete()
&& existing_rows_count.has_value() && existing_rows_count.value() < rows_count && rows_count > 0)
return bytes_on_disk * existing_rows_count.value() / rows_count;
else
return bytes_on_disk;
}

size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{
auto checksum = checksums.files.find(file_name);
Expand Down Expand Up @@ -691,6 +700,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks

calculateColumnsAndSecondaryIndicesSizesOnDisk();
loadRowsCount(); /// Must be called after loadIndexGranularity() as it uses the value of `index_granularity`.
loadExistingRowsCount(); /// Must be called after loadRowsCount() as it uses the value of `rows_count`.
loadPartitionAndMinMaxIndex();
if (!parent_part)
{
Expand Down Expand Up @@ -1334,6 +1344,87 @@ void IMergeTreeDataPart::loadRowsCount()
}
}

void IMergeTreeDataPart::loadExistingRowsCount()
{
if (existing_rows_count.has_value())
return;

if (!rows_count || !storage.getSettings()->load_existing_rows_count_for_old_parts || !supportLightweightDeleteMutate()
|| !hasLightweightDelete())
existing_rows_count = rows_count;
else
existing_rows_count = readExistingRowsCount();
}

UInt64 IMergeTreeDataPart::readExistingRowsCount()
{
const size_t total_mark = getMarksCount();
if (!total_mark)
return rows_count;

NamesAndTypesList cols;
cols.emplace_back(RowExistsColumn::name, RowExistsColumn::type);

StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);

MergeTreeReaderPtr reader = getReader(
cols,
storage_snapshot_ptr,
MarkRanges{MarkRange(0, total_mark)},
/*virtual_fields=*/ {},
/*uncompressed_cache=*/{},
storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{},
ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{});

if (!reader)
{
LOG_WARNING(storage.log, "Create reader failed while reading existing rows count");
return rows_count;
}

size_t current_mark = 0;
bool continue_reading = false;
size_t current_row = 0;
size_t existing_count = 0;

while (current_row < rows_count)
{
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
continue_reading = (current_mark != 0);

Columns result;
result.resize(1);

size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result);
if (!rows_read)
{
LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name);
return rows_count;
}

current_row += rows_read;
current_mark += (rows_to_read == rows_read);

const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(result[0].get());
if (!row_exists_col)
{
LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name);
return rows_count;
}

for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;
}

LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_count);
return existing_count;
}

void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{
files.push_back("count.txt");
Expand Down
13 changes: 13 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

size_t rows_count = 0;

/// Existing rows count (excluding lightweight deleted rows)
std::optional<size_t> existing_rows_count;

time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
Expand Down Expand Up @@ -373,6 +376,10 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }
void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; }

/// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true
/// Otherwise returns bytes_on_disk
UInt64 getExistingBytesOnDisk() const;

size_t getFileSizeOrZero(const String & file_name) const;
auto getFilesChecksums() const { return checksums.files; }

Expand Down Expand Up @@ -499,6 +506,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const;

/// Read existing rows count from _row_exists column
UInt64 readExistingRowsCount();

void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);

/// Checks the consistency of this data part.
Expand Down Expand Up @@ -664,6 +674,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();

/// Load existing rows count from _row_exists column if load_existing_rows_count_for_old_parts is true.
void loadExistingRowsCount();

static void appendFilesOfRowsCount(Strings & files);

/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
}

/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true);

/// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8263,6 +8263,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE

new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->existing_rows_count = block.rows();

new_data_part->partition = partition;

Expand Down
11 changes: 7 additions & 4 deletions src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
}

IMergeSelector::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.size = part->getExistingBytesOnDisk();
part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
Expand Down Expand Up @@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
return SelectPartsDecision::CANNOT_SELECT;
}

sum_bytes += (*it)->getBytesOnDisk();
sum_bytes += (*it)->getExistingBytesOnDisk();

prev_it = it;
++it;
Expand Down Expand Up @@ -793,7 +793,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
}


size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & account_for_deleted)
{
size_t res = 0;
time_t current_time = std::time(nullptr);
Expand All @@ -804,7 +804,10 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
if (part_max_ttl && part_max_ttl <= current_time)
continue;

res += part->getBytesOnDisk();
if (account_for_deleted)
res += part->getExistingBytesOnDisk();
else
res += part->getBytesOnDisk();
}

return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeDataMergerMutator.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class MergeTreeDataMergerMutator


/// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & account_for_deleted = false);

private:
/** Select all parts belonging to the same partition.
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(

new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->existing_rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct Settings;
M(UInt64, compact_parts_max_bytes_to_buffer, 128 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_max_granules_to_buffer, 128, "Only available in ClickHouse Cloud", 0) \
M(UInt64, compact_parts_merge_max_bytes_to_prefetch_part, 16 * 1024 * 1024, "Only available in ClickHouse Cloud", 0) \
M(Bool, load_existing_rows_count_for_old_parts, false, "Whether to load existing_rows_count for existing parts. If false, existing_rows_count will be equal to rows_count for existing parts.", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
Expand Down Expand Up @@ -79,6 +80,7 @@ struct Settings;
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/MergeTree/MergedBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();

/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
if (!new_part->existing_rows_count.has_value())
new_part->existing_rows_count = rows_count;

if (default_codec != nullptr)
new_part->default_codec = default_codec;

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MutateFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
}

/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false);

if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
Expand Down
55 changes: 55 additions & 0 deletions src/Storages/MergeTree/MutateTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis
return true;
}

static UInt64 getExistingRowsCount(const Block & block)
{
auto column = block.getByName(RowExistsColumn::name).column;
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(column.get());

if (!row_exists_col)
{
LOG_WARNING(&Poco::Logger::get("MutationHelpers::getExistingRowsCount"), "_row_exists column type is not UInt8");
return block.rows();
}

UInt64 existing_count = 0;

for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;

return existing_count;
}

/** Split mutation commands into two parts:
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
Expand Down Expand Up @@ -997,6 +1017,9 @@ struct MutationContext
bool need_prefix = true;

scope_guard temporary_directory_lock;

/// Whether we need to count lightweight delete rows in this mutation
bool count_lightweight_deleted_rows;
};

using MutationContextPtr = std::shared_ptr<MutationContext>;
Expand Down Expand Up @@ -1191,6 +1214,7 @@ class PartMergerWriter
}
case State::SUCCESS:
{
finalize();
return false;
}
}
Expand Down Expand Up @@ -1226,6 +1250,11 @@ class PartMergerWriter
const ProjectionsDescription & projections;

ExecutableTaskPtr merge_projection_parts_task_ptr;

/// Existing rows count calculated during part writing.
/// It is initialized in prepare(), calculated in mutateOriginalPartAndPrepareProjections()
/// and set to new_data_part in finalize()
size_t existing_rows_count;
};


Expand All @@ -1238,6 +1267,8 @@ void PartMergerWriter::prepare()
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}

existing_rows_count = 0;
}


Expand All @@ -1251,6 +1282,10 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()

ctx->out->write(cur_block);

/// TODO: move this calculation to DELETE FROM mutation
if (ctx->count_lightweight_deleted_rows)
existing_rows_count += MutationHelpers::getExistingRowsCount(cur_block);

for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
Expand Down Expand Up @@ -1340,6 +1375,12 @@ bool PartMergerWriter::iterateThroughAllProjections()
return true;
}

void PartMergerWriter::finalize()
{
if (ctx->count_lightweight_deleted_rows)
ctx->new_data_part->existing_rows_count = existing_rows_count;
}

class MutateAllPartColumnsTask : public IExecutableTask
{
public:
Expand Down Expand Up @@ -2185,6 +2226,20 @@ bool MutateTask::prepare()
if (ctx->mutating_pipeline_builder.initialized())
ctx->execute_ttl_type = MutationHelpers::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies());

if (ctx->data->getSettings()->exclude_deleted_rows_for_part_size_in_merge && ctx->updated_header.has(RowExistsColumn::name))
{
/// This mutation contains lightweight delete and we need to count the deleted rows,
/// Reset existing_rows_count of new data part to 0 and it will be updated while writing _row_exists column
ctx->count_lightweight_deleted_rows = true;
}
else
{
ctx->count_lightweight_deleted_rows = false;

/// No need to count deleted rows, copy existing_rows_count from source part
ctx->new_data_part->existing_rows_count = ctx->source_part->existing_rows_count.value_or(ctx->source_part->rows_count);
}

/// All columns from part are changed and may be some more that were missing before in part
/// TODO We can materialize compact part without copying data
if (!isWidePart(ctx->source_part) || !isFullPartStorage(ctx->source_part->getDataPartStorage())
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
{
sum_parts_size_in_bytes += part->getBytesOnDisk();
if (entry.type == LogEntry::MERGE_PARTS)
sum_parts_size_in_bytes += part->getExistingBytesOnDisk();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();

if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{
Expand Down

0 comments on commit a31f551

Please sign in to comment.