Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,18 @@ static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionFor
return {std::make_shared<ExpressionActions>(std::move(actions)), sign_filter->getColumnName()};
}

static std::pair<std::shared_ptr<ExpressionActions>, String> createExpressionForIsDeleted(const String & is_deleted_column_name, const Block & header, const ContextPtr & context)
{
ASTPtr is_deleted_identifier = std::make_shared<ASTIdentifier>(is_deleted_column_name);
ASTPtr is_deleted_filter = makeASTFunction("equals", is_deleted_identifier, std::make_shared<ASTLiteral>(Field(static_cast<Int8>(0))));

const auto & is_deleted_column = header.getByName(is_deleted_column_name);

auto syntax_result = TreeRewriter(context).analyze(is_deleted_filter, {{is_deleted_column.name, is_deleted_column.type}});
auto actions = ExpressionAnalyzer(is_deleted_filter, syntax_result, context).getActionsDAG(false);
return {std::make_shared<ExpressionActions>(std::move(actions)), is_deleted_filter->getColumnName()};
}

bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
{
const auto & settings = context->getSettingsRef();
Expand Down Expand Up @@ -1354,7 +1366,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
bool no_merging_final = do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0 &&
data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
!reader_settings.read_in_order;

if (no_merging_final)
{
Expand Down Expand Up @@ -1386,11 +1398,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
info.use_uncompressed_cache);
};

/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column,
/// so we have to process all ranges. It would be more optimal to remove this flag and add an extra filtering step.
/// Parts of non-zero level still may contain duplicate PK values to merge on FINAL if there's is_deleted column.
/// Non-intersecting ranges will just go through extra filter added by createExpressionForIsDeleted() to filter
/// deleted rows.
bool split_parts_ranges_into_intersecting_and_non_intersecting_final
= settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final]
&& data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
= settings[Setting::split_parts_ranges_into_intersecting_and_non_intersecting_final] &&
!reader_settings.read_in_order;

SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
storage_snapshot->metadata->getPrimaryKey(),
Expand Down Expand Up @@ -1477,6 +1490,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
return std::make_shared<FilterTransform>(header, expression, filter_name, true);
});
}
else if (!data.merging_params.is_deleted_column.empty())
{
auto columns_with_is_deleted = origin_column_names;
if (std::ranges::find(columns_with_is_deleted, data.merging_params.is_deleted_column) == columns_with_is_deleted.end())
columns_with_is_deleted.push_back(data.merging_params.is_deleted_column);

pipe = spreadMarkRangesAmongStreams(
std::move(non_intersecting_parts_by_primary_key), num_streams, columns_with_is_deleted);
auto [expression, filter_name] = createExpressionForIsDeleted(data.merging_params.is_deleted_column, pipe.getHeader(), context);

pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(header, expression, filter_name, true);
});
}
else
{
pipe = spreadMarkRangesAmongStreams(std::move(non_intersecting_parts_by_primary_key), num_streams, origin_column_names);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
10000
9950
0
10000
17700
17700
17700
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-- Test for FINAL query on ReplacingMergeTree + is_deleted makes use of optimizations.

DROP TABLE IF EXISTS tab;

CREATE TABLE tab (
pkey String,
id Int32,
v Int32,
version UInt64,
is_deleted UInt8
) Engine = ReplacingMergeTree(version,is_deleted)
PARTITION BY pkey ORDER BY id
SETTINGS index_granularity=512;

-- insert 10000 rows in partition 'A' and delete half of them and merge the 2 parts
INSERT INTO tab SELECT 'A', number, number, 1, 0 FROM numbers(10000);
INSERT INTO tab SELECT 'A', number, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000);

OPTIMIZE TABLE tab SETTINGS mutations_sync = 2;

SYSTEM STOP MERGES tab;

-- insert 10000 rows in partition 'B' and delete half of them, but keep 2 parts
INSERT INTO tab SELECT 'B', number+1000000, number, 1, 0 FROM numbers(10000);
INSERT INTO tab SELECT 'B', number+1000000, number + 1, 2, IF(number % 2 = 0, 0, 1) FROM numbers(10000);

SET do_not_merge_across_partitions_select_final=1;

-- verify : 10000 rows expected
SELECT count()
FROM tab FINAL;

-- add a filter : 9950 rows expected
SELECT count()
FROM tab FINAL
WHERE id >= 100;

-- only even id's are left - 0 rows expected
SELECT count()
FROM tab FINAL
WHERE (id % 2) = 1;

-- 10000 rows expected
SELECT count()
FROM tab FINAL
WHERE (id % 2) = 0;

-- create some more partitions
INSERT INTO tab SELECT 'C', number+2000000, number, 1, 0 FROM numbers(100);

-- insert and delete some rows to get intersecting/non-intersecting ranges in same partition
INSERT INTO tab SELECT 'D', number+3000000, number, 1, 0 FROM numbers(10000);
INSERT INTO tab SELECT 'D', number+3000000, number + 1, 1, IF(number % 2 = 0, 0, 1) FROM numbers(5000);

INSERT INTO tab SELECT 'E', number+4000000, number, 1, 0 FROM numbers(100);

-- Total 10000 (From A & B) + 100 (From C) + 7500 (From D) + 100 (From E) = 17700 rows
SELECT count()
FROM tab FINAL
SETTINGS do_not_merge_across_partitions_select_final=0,split_intersecting_parts_ranges_into_layers_final=0;

SELECT count()
FROM tab FINAL
SETTINGS do_not_merge_across_partitions_select_final=1,split_intersecting_parts_ranges_into_layers_final=1;

SYSTEM START MERGES tab;
OPTIMIZE TABLE tab FINAL SETTINGS mutations_sync = 2;

SELECT count()
FROM tab FINAL;

DROP TABLE IF EXISTS tab;