Skip to content

Commit

Permalink
Get existing rows count on new data part writing
Browse files Browse the repository at this point in the history
  • Loading branch information
jewelzqiu committed Jan 10, 2024
1 parent bb28d2b commit a63c8cd
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 35 deletions.
14 changes: 14 additions & 0 deletions src/Storages/MergeTree/MergedBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = writer->getIndexGranularity();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();

/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count
if (new_part->existing_rows_count > rows_count)
new_part->existing_rows_count = rows_count;

if (default_codec != nullptr)
new_part->default_codec = default_codec;

Expand Down Expand Up @@ -311,6 +316,15 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
"It is a bug.", new_part->name);
}

/// For backward compatibility, we don't add existing_count.txt into checksums
if (new_part->supportLightweightDeleteMutate() && new_part->hasLightweightDelete() && new_part->existing_rows_count <= rows_count)
{
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::EXISTING_COUNT_FILE_NAME, 4096, write_settings);
writeIntText(new_part->existing_rows_count, *out);
out->preFinalize();
written_files.emplace_back(std::move(out));
}

{
/// Write file with checksums.
auto out = new_part->getDataPartStorage().writeFile("checksums.txt", 4096, write_settings);
Expand Down
69 changes: 46 additions & 23 deletions src/Storages/MergeTree/MutateTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis
return true;
}

static UInt64 getExistingRowsCount(const Block & block)
{
auto column = block.getByName(LightweightDeleteDescription::FILTER_COLUMN.name).column;
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(column.get());

if (!row_exists_col)
{
LOG_WARNING(&Poco::Logger::get("MutationHelpers::getExistingRowsCount"), "_row_exists column type is not UInt8");
return block.rows();
}

UInt64 existing_count = 0;

for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;

return existing_count;
}

/** Split mutation commands into two parts:
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
Expand Down Expand Up @@ -858,6 +878,15 @@ void finalizeMutatedPart(
written_files.push_back(std::move(out_columns));
}

/// Existing parts with lightweight delete will generate existing_count.txt on loading and will not be added into checksums,
/// For compatibility concerns, we don't add existing_count.txt into checksums here
if (new_data_part->supportLightweightDeleteMutate() && new_data_part->hasLightweightDelete())
{
auto out_existing_count = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::EXISTING_COUNT_FILE_NAME, 4096, context->getWriteSettings());
writeIntText(new_data_part->existing_rows_count, *out_existing_count);
written_files.push_back(std::move(out_existing_count));
}

for (auto & file : written_files)
{
file->finalize();
Expand Down Expand Up @@ -957,7 +986,7 @@ struct MutationContext
scope_guard temporary_directory_lock;

/// Whether this mutation contains lightweight delete
bool has_lightweight_delete = false;
bool has_lightweight_delete;
};

using MutationContextPtr = std::shared_ptr<MutationContext>;
Expand Down Expand Up @@ -1218,6 +1247,9 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()

ctx->out->write(cur_block);

if (ctx->has_lightweight_delete)
ctx->new_data_part->existing_rows_count += MutationHelpers::getExistingRowsCount(cur_block);

for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
const auto & projection = *ctx->projections_to_build[i];
Expand Down Expand Up @@ -1874,7 +1906,6 @@ MutateTask::MutateTask(
ctx->txn = txn;
ctx->source_part = ctx->future_part->parts[0];
ctx->need_prefix = need_prefix_;
ctx->has_lightweight_delete = commands_->hasLightWeightDeleteCommands();

auto storage_snapshot = ctx->data->getStorageSnapshotWithoutData(ctx->metadata_snapshot, context_);
extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false);
Expand All @@ -1900,27 +1931,6 @@ bool MutateTask::execute()
if (task->executeStep())
return true;

/// Save existing_rows_count before finishing mutation
if (ctx->new_data_part->supportLightweightDeleteMutate() && ctx->new_data_part->hasLightweightDelete())
{
UInt64 existing_rows_count = ctx->source_part->existing_rows_count;
if (ctx->has_lightweight_delete)
existing_rows_count = ctx->new_data_part->readExistingRowsCount();

ctx->new_data_part->existing_rows_count = existing_rows_count;

/// If existing_rows_count >= rows_count, there must be something wrong while reading it
if (existing_rows_count < ctx->new_data_part->rows_count)
{
/// Write existing rows count file
auto out_existing_count = ctx->new_data_part->getDataPartStorage().writeFile(
IMergeTreeDataPart::EXISTING_COUNT_FILE_NAME, 4096, ctx->context->getWriteSettings());
writeIntText(existing_rows_count, *out_existing_count);
out_existing_count->finalize();
out_existing_count->sync();
}
}

// The `new_data_part` is a shared pointer and must be moved to allow
// part deletion in case it is needed in `MutateFromLogEntryTask::finalize`.
//
Expand Down Expand Up @@ -2126,6 +2136,19 @@ bool MutateTask::prepare()
if (ctx->mutating_pipeline_builder.initialized())
ctx->execute_ttl_type = MutationHelpers::shouldExecuteTTL(ctx->metadata_snapshot, ctx->interpreter->getColumnDependencies());

if (ctx->updated_header.has(LightweightDeleteDescription::FILTER_COLUMN.name))
{
/// This mutation contains lightweight delete, reset existing_rows_count of new data part to 0
/// It will be updated while writing _row_exists column
ctx->has_lightweight_delete = true;
ctx->new_data_part->existing_rows_count = 0;
}
else
{
ctx->has_lightweight_delete = false;
ctx->new_data_part->existing_rows_count = ctx->source_part->existing_rows_count;
}

/// All columns from part are changed and may be some more that were missing before in part
/// TODO We can materialize compact part without copying data
if (!isWidePart(ctx->source_part) || !isFullPartStorage(ctx->source_part->getDataPartStorage())
Expand Down
11 changes: 0 additions & 11 deletions src/Storages/MutationCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <Common/quoteString.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/LightweightDeleteDescription.h>


namespace DB
Expand Down Expand Up @@ -250,16 +249,6 @@ bool MutationCommands::hasNonEmptyMutationCommands() const
return false;
}

bool MutationCommands::hasLightWeightDeleteCommands() const
{
for (const auto & command : *this)
if (command.type == MutationCommand::Type::UPDATE
&& command.column_to_update_expression.contains(LightweightDeleteDescription::FILTER_COLUMN.name))
return true;

return false;
}

bool MutationCommands::containBarrierCommand() const
{
for (const auto & command : *this)
Expand Down
1 change: 0 additions & 1 deletion src/Storages/MutationCommands.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class MutationCommands : public std::vector<MutationCommand>
void readText(ReadBuffer & in);
std::string toString() const;
bool hasNonEmptyMutationCommands() const;
bool hasLightWeightDeleteCommands() const;

/// These set of commands contain barrier command and shouldn't
/// stick with other commands. Commands from one set have already been validated
Expand Down

0 comments on commit a63c8cd

Please sign in to comment.