diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 31a772b0fe02..42a6b316a8bf 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1261,6 +1261,18 @@ static std::pair, String> createExpressionFor return {std::make_shared(std::move(actions)), sign_filter->getColumnName()}; } +static std::pair, String> createExpressionForIsDeleted(const String & is_deleted_column_name, const Block & header, const ContextPtr & context) +{ + ASTPtr is_deleted_identifier = std::make_shared(is_deleted_column_name); + ASTPtr is_deleted_filter = makeASTFunction("equals", is_deleted_identifier, std::make_shared(Field(static_cast(0)))); + + const auto & is_deleted_column = header.getByName(is_deleted_column_name); + + auto syntax_result = TreeRewriter(context).analyze(is_deleted_filter, {{is_deleted_column.name, is_deleted_column.type}}); + auto actions = ExpressionAnalyzer(is_deleted_filter, syntax_result, context).getActionsDAG(false); + return {std::make_shared(std::move(actions)), is_deleted_filter->getColumnName()}; +} + bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const { const auto & settings = context->getSettingsRef(); @@ -1354,7 +1366,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( bool no_merging_final = do_not_merge_across_partitions_select_final && std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0 && - data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order; + !reader_settings.read_in_order; if (no_merging_final) { @@ -1386,11 +1398,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( info.use_uncompressed_cache); }; - /// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column, - /// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step. + /// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column. + /// Non-intersecting ranges will just go through extra filter added by createExpressionForIsDeleted() to filter + /// deleted rows. bool split_parts_ranges_into_intersecting_and_non_intersecting_final - = settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final] - && data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order; + = settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final] && + !reader_settings.read_in_order; SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey( storage_snapshot->metadata->getPrimaryKey(), @@ -1477,6 +1490,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( return std::make_shared(header, expression, filter_name, true); }); } + else if (!data.merging_params.is_deleted_column.empty()) + { + auto columns_with_is_deleted = origin_column_names; + if (std::ranges::find(columns_with_is_deleted, data.merging_params.is_deleted_column) == columns_with_is_deleted.end()) + columns_with_is_deleted.push_back(data.merging_params.is_deleted_column); + + pipe = spreadMarkRangesAmongStreams( + std::move(non_intersecting_parts_by_primary_key), num_streams, columns_with_is_deleted); + auto [expression, filter_name] = createExpressionForIsDeleted(data.merging_params.is_deleted_column, pipe.getHeader(), context); + + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, expression, filter_name, true); + }); + } else { pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names); diff --git a/tests/queries/0_stateless/02490_replacing_merge_tree_is_deleted_column_transform_opt.reference b/tests/queries/0_stateless/02490_replacing_merge_tree_is_deleted_column_transform_opt.reference new file mode 100644 index 000000000000..a14298ab95f2 --- /dev/null +++ b/tests/queries/0_stateless/02490_replacing_merge_tree_is_deleted_column_transform_opt.reference @@ -0,0 +1,7 @@ +10000 +9950 +0 +10000 +17700 +17700 +17700 diff --git a/tests/queries/0_stateless/02490_replacing_merge_tree_is_deleted_column_transform_opt.sql b/tests/queries/0_stateless/02490_replacing_merge_tree_is_deleted_column_transform_opt.sql new file mode 100644 index 000000000000..dab053d3a30e --- /dev/null +++ b/tests/queries/0_stateless/02490_replacing_merge_tree_is_deleted_column_transform_opt.sql @@ -0,0 +1,72 @@ +-- Test for FINAL query on ReplacingMergeTree + is_deleted makes use of optimizations. + +DROP TABLE IF EXISTS tab; + +CREATE TABLE tab ( + pkey String, + id Int32, + v Int32, + version UInt64, + is_deleted UInt8 +) Engine = ReplacingMergeTree(version,is_deleted) +PARTITION BY pkey ORDER BY id +SETTINGS index_granularity=512; + +-- insert 10000 rows in partition 'A' and delete half of them and merge the 2 parts +INSERT INTO tab SELECT 'A', number, number, 1, 0 FROM numbers(10000); +INSERT INTO tab SELECT 'A', number, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000); + +OPTIMIZE TABLE tab SETTINGS mutations_sync = 2; + +SYSTEM STOP MERGES tab; + +-- insert 10000 rows in partition 'B' and delete half of them, but keep 2 parts +INSERT INTO tab SELECT 'B', number+1000000, number, 1, 0 FROM numbers(10000); +INSERT INTO tab SELECT 'B', number+1000000, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000); + +SET do_not_merge_across_partitions_select_final=1; + +-- verify : 10000 rows expected +SELECT count() +FROM tab FINAL; + +-- add a filter : 9950 rows expected +SELECT count() +FROM tab FINAL +WHERE id >= 100; + +-- only even id's are left - 0 rows expected +SELECT count() +FROM tab FINAL +WHERE (id % 2) = 1; + +-- 10000 rows expected +SELECT count() +FROM tab FINAL +WHERE (id % 2) = 0; + +-- create some more partitions +INSERT INTO tab SELECT 'C', number+2000000, number, 1, 0 FROM numbers(100); + +-- insert and delete some rows to get intersecting/non-intersecting ranges in same partition +INSERT INTO tab SELECT 'D', number+3000000, number, 1, 0 FROM numbers(10000); +INSERT INTO tab SELECT 'D', number+3000000, number + 1, 1, IF(number % 2 = 0, 0, 1) FROM numbers(5000); + +INSERT INTO tab SELECT 'E', number+4000000, number, 1, 0 FROM numbers(100); + +-- Total 10000 (From A & B) + 100 (From C) + 7500 (From D) + 100 (From E) = 17700 rows +SELECT count() +FROM tab FINAL +SETTINGS do_not_merge_across_partitions_select_final=0,split_intersecting_parts_ranges_into_layers_final=0; + +SELECT count() +FROM tab FINAL +SETTINGS do_not_merge_across_partitions_select_final=1,split_intersecting_parts_ranges_into_layers_final=1; + +SYSTEM START MERGES tab; +OPTIMIZE TABLE tab FINAL SETTINGS mutations_sync = 2; + +SELECT count() +FROM tab FINAL; + +DROP TABLE IF EXISTS tab;