Skip to content

Commit

Permalink
Add existing_count.txt & Consider deleted rows when selecting parts t…
Browse files Browse the repository at this point in the history
…o merge
  • Loading branch information
jewelzqiu committed Jan 4, 2024
1 parent ce13b21 commit 5a03270
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ void DataPartStorageOnDiskBase::clearDirectory(
request.emplace_back(fs::path(dir) / "delete-on-destroy.txt", true);
request.emplace_back(fs::path(dir) / "txn_version.txt", true);
request.emplace_back(fs::path(dir) / "metadata_version.txt", true);
request.emplace_back(fs::path(dir) / "existing_count.txt", true);

disk->removeSharedFiles(request, !can_remove_shared_data, names_not_to_remove);
disk->removeDirectory(dir);
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/DataPartsExchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
if (file_name != "checksums.txt" &&
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME &&
file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)
file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME &&
file_name != IMergeTreeDataPart::EXISTING_COUNT_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
}

Expand Down
108 changes: 108 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,15 @@ UInt64 IMergeTreeDataPart::getMarksCount() const
return index_granularity.getMarksCount();
}

UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
{
if (storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge && existing_rows_count < rows_count
&& supportLightweightDeleteMutate() && hasLightweightDelete() && rows_count > 0)
return bytes_on_disk * existing_rows_count / rows_count;
else
return bytes_on_disk;
}

size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{
auto checksum = checksums.files.find(file_name);
Expand Down Expand Up @@ -672,6 +681,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
calculateColumnsAndSecondaryIndicesSizesOnDisk();
loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity`
loadRowsCount(); /// Must be called after loadIndexGranularity() as it uses the value of `index_granularity`.
loadExistingRowsCount(); /// Must be called after loadRowsCount() as it uses the value of `rows_count`.
loadPartitionAndMinMaxIndex();
if (!parent_part)
{
Expand Down Expand Up @@ -859,6 +869,9 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
if (getDataPartStorage().exists(METADATA_VERSION_FILE_NAME))
result.emplace(METADATA_VERSION_FILE_NAME);

if (getDataPartStorage().exists(EXISTING_COUNT_FILE_NAME))
result.emplace(EXISTING_COUNT_FILE_NAME);

return result;
}

Expand Down Expand Up @@ -1287,6 +1300,101 @@ void IMergeTreeDataPart::loadRowsCount()
}
}

void IMergeTreeDataPart::loadExistingRowsCount()
{
if (!rows_count)
{
existing_rows_count = 0;
}
else if (!supportLightweightDeleteMutate() || !hasLightweightDelete())
{
existing_rows_count = rows_count;
}
else if (getDataPartStorage().exists(EXISTING_COUNT_FILE_NAME))
{
auto buf = metadata_manager->read(EXISTING_COUNT_FILE_NAME);
readIntText(existing_rows_count, *buf);
assertEOF(*buf);
}
else
{
existing_rows_count = readExistingRowsCount();

auto out_existing_count = getDataPartStorage().writeFile(EXISTING_COUNT_FILE_NAME, 4096, storage.getContext()->getWriteSettings());
writeIntText(existing_rows_count, *out_existing_count);
out_existing_count->finalize();
out_existing_count->sync();
}
}

UInt64 IMergeTreeDataPart::readExistingRowsCount()
{
const size_t total_mark = getMarksCount();
if (!total_mark)
return rows_count;

NamesAndTypesList cols;
cols.push_back(LightweightDeleteDescription::FILTER_COLUMN);

StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);

MergeTreeReaderPtr reader = getReader(
cols,
storage_snapshot_ptr,
MarkRanges{MarkRange(0, total_mark)},
nullptr,
storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{},
ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{});

if (!reader)
{
LOG_WARNING(storage.log, "Create reader failed while reading existing rows count");
return rows_count;
}

size_t current_mark = 0;
bool continue_reading = false;
size_t current_row = 0;
size_t existing_count = 0;

while (current_row < rows_count)
{
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
continue_reading = (current_mark != 0);

Columns result;
result.resize(1);

size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result);
if (!rows_read)
{
LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name);
return rows_count;
}

current_row += rows_read;
current_mark += (rows_to_read == rows_read);

const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(result[0].get());
if (!row_exists_col)
{
LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name);
return rows_count;
}

for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;
}

LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_count);
return existing_count;
}

void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{
files.push_back("count.txt");
Expand Down
20 changes: 20 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

size_t rows_count = 0;

/// Existing rows count (excluding lightweight deleted rows)
/// UINT64_MAX -> uninitialized
/// 0 -> all rows were deleted
/// if reading failed, it will be set to rows_count
size_t existing_rows_count = UINT64_MAX;

time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
Expand Down Expand Up @@ -374,6 +380,10 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }
void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; }

/// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true
/// Otherwise returns bytes_on_disk
UInt64 getExistingBytesOnDisk() const;

size_t getFileSizeOrZero(const String & file_name) const;
auto getFilesChecksums() const { return checksums.files; }

Expand Down Expand Up @@ -452,6 +462,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

static inline constexpr auto METADATA_VERSION_FILE_NAME = "metadata_version.txt";

/// File that contains existing (excluding lightweight deleted) rows count of the part
static inline constexpr auto EXISTING_COUNT_FILE_NAME = "existing_count.txt";

/// One of part files which is used to check how many references (I'd like
/// to say hardlinks, but it will confuse even more) we have for the part
/// for zero copy replication. Sadly it's very complex.
Expand Down Expand Up @@ -500,6 +513,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const { return columns.contains(LightweightDeleteDescription::FILTER_COLUMN.name); }

/// Read existing rows count from _row_exists column
UInt64 readExistingRowsCount();

void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);

/// Checks the consistency of this data part.
Expand Down Expand Up @@ -656,6 +672,10 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();

/// Load existing rows count for this part from disk if existing_count.txt exists.
/// Otherwise read from _row_exists column.
void loadExistingRowsCount();

static void appendFilesOfRowsCount(Strings & files);

/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
}

/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true);

/// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos;
Expand Down
11 changes: 7 additions & 4 deletions src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
}

IMergeSelector::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.size = part->getExistingBytesOnDisk();
part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
Expand Down Expand Up @@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
return SelectPartsDecision::CANNOT_SELECT;
}

sum_bytes += (*it)->getBytesOnDisk();
sum_bytes += (*it)->getExistingBytesOnDisk();

prev_it = it;
++it;
Expand Down Expand Up @@ -793,7 +793,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
}


size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge)
{
size_t res = 0;
time_t current_time = std::time(nullptr);
Expand All @@ -804,7 +804,10 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
if (part_max_ttl && part_max_ttl <= current_time)
continue;

res += part->getBytesOnDisk();
if (is_merge)
res += part->getExistingBytesOnDisk();
else
res += part->getBytesOnDisk();
}

return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeDataMergerMutator.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class MergeTreeDataMergerMutator


/// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge);

private:
/** Select all parts belonging to the same partition.
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct Settings;
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MutateFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
}

/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false);

if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
Expand Down
25 changes: 25 additions & 0 deletions src/Storages/MergeTree/MutateTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,9 @@ struct MutationContext
bool need_prefix = true;

scope_guard temporary_directory_lock;

/// Whether this mutation contains lightweight delete
bool has_lightweight_delete = false;
};

using MutationContextPtr = std::shared_ptr<MutationContext>;
Expand Down Expand Up @@ -1871,6 +1874,7 @@ MutateTask::MutateTask(
ctx->txn = txn;
ctx->source_part = ctx->future_part->parts[0];
ctx->need_prefix = need_prefix_;
ctx->has_lightweight_delete = commands_->hasLightWeightDeleteCommands();

auto storage_snapshot = ctx->data->getStorageSnapshotWithoutData(ctx->metadata_snapshot, context_);
extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false);
Expand All @@ -1896,6 +1900,27 @@ bool MutateTask::execute()
if (task->executeStep())
return true;

/// Save existing_rows_count before finishing mutation
if (ctx->new_data_part->supportLightweightDeleteMutate() && ctx->new_data_part->hasLightweightDelete())
{
UInt64 existing_rows_count = ctx->source_part->existing_rows_count;
if (ctx->has_lightweight_delete)
existing_rows_count = ctx->new_data_part->readExistingRowsCount();

ctx->new_data_part->existing_rows_count = existing_rows_count;

/// If existing_rows_count >= rows_count, there must be something wrong while reading it
if (existing_rows_count < ctx->new_data_part->rows_count)
{
/// Write existing rows count file
auto out_existing_count = ctx->new_data_part->getDataPartStorage().writeFile(
IMergeTreeDataPart::EXISTING_COUNT_FILE_NAME, 4096, ctx->context->getWriteSettings());
writeIntText(existing_rows_count, *out_existing_count);
out_existing_count->finalize();
out_existing_count->sync();
}
}

// The `new_data_part` is a shared pointer and must be moved to allow
// part deletion in case it is needed in `MutateFromLogEntryTask::finalize`.
//
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (auto part_in_memory = asInMemoryPart(part))
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else if (entry.type == LogEntry::MERGE_PARTS)
sum_parts_size_in_bytes += part->getExistingBytesOnDisk();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();
}
Expand Down
11 changes: 11 additions & 0 deletions src/Storages/MutationCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <Common/quoteString.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/LightweightDeleteDescription.h>


namespace DB
Expand Down Expand Up @@ -249,6 +250,16 @@ bool MutationCommands::hasNonEmptyMutationCommands() const
return false;
}

bool MutationCommands::hasLightWeightDeleteCommands() const
{
for (const auto & command : *this)
if (command.type == MutationCommand::Type::UPDATE
&& command.column_to_update_expression.contains(LightweightDeleteDescription::FILTER_COLUMN.name))
return true;

return false;
}

bool MutationCommands::containBarrierCommand() const
{
for (const auto & command : *this)
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MutationCommands.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class MutationCommands : public std::vector<MutationCommand>
void readText(ReadBuffer & in);
std::string toString() const;
bool hasNonEmptyMutationCommands() const;
bool hasLightWeightDeleteCommands() const;

/// These set of commands contain barrier command and shouldn't
/// stick with other commands. Commands from one set have already been validated
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (isTTLMergeType(future_part->merge_type))
getContext()->getMergeList().bookMergeWithTTL();

merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false);
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
}

Expand Down Expand Up @@ -1303,7 +1303,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
future_part->name = part->getNewName(new_part_info);
future_part->part_format = part->getFormat();

tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}, false), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2
2
1

0 comments on commit 5a03270

Please sign in to comment.