Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logical error in vertical merge + replacing merge tree + optimize cleanup #54368

Merged
merged 2 commits into from Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp
Expand Up @@ -21,10 +21,14 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool cleanup_)
bool cleanup_,
size_t * cleanedup_rows_count_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes), cleanup(cleanup_)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes)
, cleanup(cleanup_)
, cleanedup_rows_count(cleanedup_rows_count_)
{

if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
if (!version_column.empty())
Expand Down Expand Up @@ -74,10 +78,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// Write the data for the previous primary key.
if (!selected_row.empty())
{
if (is_deleted_column_number!=-1)
if (is_deleted_column_number != -1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
*cleanedup_rows_count += current_row_sources.size();
}
else
insertRow();
Expand All @@ -91,7 +98,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);

if ((is_deleted_column_number!=-1))
if (is_deleted_column_number != -1)
{
const UInt8 is_deleted = assert_cast<const ColumnUInt8 &>(*current->all_columns[is_deleted_column_number]).getData()[current->getRow()];
if ((is_deleted != 1) && (is_deleted != 0))
Expand Down Expand Up @@ -129,10 +136,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// We will write the data for the last primary key.
if (!selected_row.empty())
{
if (is_deleted_column_number!=-1)
if (is_deleted_column_number != -1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
*cleanedup_rows_count += current_row_sources.size();
}
else
insertRow();
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h
Expand Up @@ -27,7 +27,8 @@ class ReplacingSortedAlgorithm final : public IMergingAlgorithmWithSharedChunks
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false);
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr);

Status merge() override;

Expand All @@ -37,6 +38,7 @@ class ReplacingSortedAlgorithm final : public IMergingAlgorithmWithSharedChunks
ssize_t is_deleted_column_number = -1;
ssize_t version_column_number = -1;
bool cleanup = false;
size_t * cleanedup_rows_count = nullptr;

using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 2; /// last, current.
Expand Down
6 changes: 4 additions & 2 deletions src/Processors/Merges/ReplacingSortedTransform.h
Expand Up @@ -19,7 +19,8 @@ class ReplacingSortedTransform final : public IMergingTransform<ReplacingSortedA
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false)
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
Expand All @@ -31,7 +32,8 @@ class ReplacingSortedTransform final : public IMergingTransform<ReplacingSortedA
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes,
cleanup)
cleanup,
cleanedup_rows_count)
{
}

Expand Down
14 changes: 8 additions & 6 deletions src/Storages/MergeTree/MergeTask.cpp
Expand Up @@ -487,6 +487,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const

size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read;
size_t input_rows_filtered = *global_ctx->input_rows_filtered;
size_t cleanedup_rows_count = global_ctx->cleanedup_rows_count;
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);

Expand All @@ -499,12 +500,13 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1) && sum_input_rows_exact != rows_sources_count + input_rows_filtered)
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1)
&& sum_input_rows_exact != rows_sources_count + input_rows_filtered + cleanedup_rows_count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) and cleaned up rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, cleanedup_rows_count, rows_sources_count);

ctx->rows_sources_read_buf = std::make_unique<CompressedReadBufferFromFile>(ctx->tmp_disk->readFile(fileName(ctx->rows_sources_file->path())));

Expand Down Expand Up @@ -975,7 +977,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup);
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup, &global_ctx->cleanedup_rows_count);
break;

case MergeTreeData::MergingParams::Graphite:
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTask.h
Expand Up @@ -145,6 +145,7 @@ class MergeTask
bool deduplicate{false};
Names deduplicate_by_columns{};
bool cleanup{false};
size_t cleanedup_rows_count{0};

NamesAndTypesList gathering_columns{};
NamesAndTypesList merging_columns{};
Expand Down
@@ -0,0 +1,13 @@
== Only last version remains after OPTIMIZE W/ CLEANUP ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
d6 3 0
== OPTIMIZE W/ CLEANUP (remove d6) ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
@@ -0,0 +1,23 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 0,
min_rows_for_wide_part = 1,
min_bytes_for_wide_part = 1;

-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);
-- Insert previous version of 'd6' but only v=3 is_deleted=false will remain
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 2, 1);
SELECT '== Only last version remains after OPTIMIZE W/ CLEANUP ==';
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;

-- insert d6 v=3 is_deleted=true (timestamp more recent so this version should be the one take into acount)
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 1);

SELECT '== OPTIMIZE W/ CLEANUP (remove d6) ==';
OPTIMIZE TABLE test FINAL CLEANUP;
-- No d6 anymore
select * from test order by uid;

DROP TABLE IF EXISTS test;