Skip to content

Commit

Permalink
Merge pull request #54368 from ClickHouse/fix_replacing_final_error
Browse files Browse the repository at this point in the history
Fix logical error in vertical merge + replacing merge tree + optimize cleanup
  • Loading branch information
robot-ch-test-poll2 committed Sep 7, 2023
2 parents fd4ddc4 + 05446d8 commit f28ad1e
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 16 deletions.
24 changes: 17 additions & 7 deletions src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp
Expand Up @@ -21,10 +21,14 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool cleanup_)
bool cleanup_,
size_t * cleanedup_rows_count_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes), cleanup(cleanup_)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes)
, cleanup(cleanup_)
, cleanedup_rows_count(cleanedup_rows_count_)
{

if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
if (!version_column.empty())
Expand Down Expand Up @@ -74,10 +78,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// Write the data for the previous primary key.
if (!selected_row.empty())
{
if (is_deleted_column_number!=-1)
if (is_deleted_column_number != -1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
*cleanedup_rows_count += current_row_sources.size();
}
else
insertRow();
Expand All @@ -91,7 +98,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
if (out_row_sources_buf)
current_row_sources.emplace_back(current.impl->order, true);

if ((is_deleted_column_number!=-1))
if (is_deleted_column_number != -1)
{
const UInt8 is_deleted = assert_cast<const ColumnUInt8 &>(*current->all_columns[is_deleted_column_number]).getData()[current->getRow()];
if ((is_deleted != 1) && (is_deleted != 0))
Expand Down Expand Up @@ -129,10 +136,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// We will write the data for the last primary key.
if (!selected_row.empty())
{
if (is_deleted_column_number!=-1)
if (is_deleted_column_number != -1)
{
if (!(cleanup && assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num]))
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
*cleanedup_rows_count += current_row_sources.size();
}
else
insertRow();
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h
Expand Up @@ -27,7 +27,8 @@ class ReplacingSortedAlgorithm final : public IMergingAlgorithmWithSharedChunks
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false);
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr);

Status merge() override;

Expand All @@ -37,6 +38,7 @@ class ReplacingSortedAlgorithm final : public IMergingAlgorithmWithSharedChunks
ssize_t is_deleted_column_number = -1;
ssize_t version_column_number = -1;
bool cleanup = false;
size_t * cleanedup_rows_count = nullptr;

using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 2; /// last, current.
Expand Down
6 changes: 4 additions & 2 deletions src/Processors/Merges/ReplacingSortedTransform.h
Expand Up @@ -19,7 +19,8 @@ class ReplacingSortedTransform final : public IMergingTransform<ReplacingSortedA
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false)
bool cleanup = false,
size_t * cleanedup_rows_count = nullptr)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
Expand All @@ -31,7 +32,8 @@ class ReplacingSortedTransform final : public IMergingTransform<ReplacingSortedA
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes,
cleanup)
cleanup,
cleanedup_rows_count)
{
}

Expand Down
14 changes: 8 additions & 6 deletions src/Storages/MergeTree/MergeTask.cpp
Expand Up @@ -487,6 +487,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const

size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read;
size_t input_rows_filtered = *global_ctx->input_rows_filtered;
size_t cleanedup_rows_count = global_ctx->cleanedup_rows_count;
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);

Expand All @@ -499,12 +500,13 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1) && sum_input_rows_exact != rows_sources_count + input_rows_filtered)
if ((rows_sources_count > 0 || global_ctx->future_part->parts.size() > 1)
&& sum_input_rows_exact != rows_sources_count + input_rows_filtered + cleanedup_rows_count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, rows_sources_count);
ErrorCodes::LOGICAL_ERROR,
"Number of rows in source parts ({}) excluding filtered rows ({}) and cleaned up rows ({}) differs from number "
"of bytes written to rows_sources file ({}). It is a bug.",
sum_input_rows_exact, input_rows_filtered, cleanedup_rows_count, rows_sources_count);

ctx->rows_sources_read_buf = std::make_unique<CompressedReadBufferFromFile>(ctx->tmp_disk->readFile(fileName(ctx->rows_sources_file->path())));

Expand Down Expand Up @@ -975,7 +977,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup);
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup, &global_ctx->cleanedup_rows_count);
break;

case MergeTreeData::MergingParams::Graphite:
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTask.h
Expand Up @@ -145,6 +145,7 @@ class MergeTask
bool deduplicate{false};
Names deduplicate_by_columns{};
bool cleanup{false};
size_t cleanedup_rows_count{0};

NamesAndTypesList gathering_columns{};
NamesAndTypesList merging_columns{};
Expand Down
@@ -0,0 +1,13 @@
== Only last version remains after OPTIMIZE W/ CLEANUP ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
d6 3 0
== OPTIMIZE W/ CLEANUP (remove d6) ==
d1 5 0
d2 1 0
d3 1 0
d4 1 0
d5 1 0
@@ -0,0 +1,23 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (uid String, version UInt32, is_deleted UInt8) ENGINE = ReplacingMergeTree(version, is_deleted) Order by (uid) SETTINGS vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 0,
min_rows_for_wide_part = 1,
min_bytes_for_wide_part = 1;

-- Expect d6 to be version=3 is_deleted=false
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 0);
-- Insert previous version of 'd6' but only v=3 is_deleted=false will remain
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 2, 1);
SELECT '== Only last version remains after OPTIMIZE W/ CLEANUP ==';
OPTIMIZE TABLE test FINAL CLEANUP;
select * from test order by uid;

-- insert d6 v=3 is_deleted=true (timestamp more recent so this version should be the one take into acount)
INSERT INTO test (*) VALUES ('d1', 1, 0), ('d1', 2, 1), ('d1', 3, 0), ('d1', 4, 1), ('d1', 5, 0), ('d2', 1, 0), ('d3', 1, 0), ('d4', 1, 0), ('d5', 1, 0), ('d6', 1, 0), ('d6', 3, 1);

SELECT '== OPTIMIZE W/ CLEANUP (remove d6) ==';
OPTIMIZE TABLE test FINAL CLEANUP;
-- No d6 anymore
select * from test order by uid;

DROP TABLE IF EXISTS test;

0 comments on commit f28ad1e

Please sign in to comment.