Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick #59544 to 23.11: Fix corner case when passing update_insert_deduplication_token_in_dependent_materialized_views #59576

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/operations/settings/settings.md
Expand Up @@ -2097,7 +2097,7 @@ SELECT * FROM test_table

## update_insert_deduplication_token_in_dependent_materialized_views {#update-insert-deduplication-token-in-dependent-materialized-views}

Allows to update `insert_deduplication_token` with table identifier during insert in dependent materialized views, if setting `deduplicate_blocks_in_dependent_materialized_views` is enabled and `insert_deduplication_token` is set.
Allows to update `insert_deduplication_token` with view identifier during insert in dependent materialized views, if setting `deduplicate_blocks_in_dependent_materialized_views` is enabled and `insert_deduplication_token` is set.

Possible values:

Expand Down
72 changes: 44 additions & 28 deletions src/Processors/Transforms/buildPushingToViewsChain.cpp
Expand Up @@ -247,30 +247,6 @@ Chain buildPushingToViewsChain(
{
insert_context->setSetting("insert_deduplicate", Field{false});
}
else if (insert_settings.update_insert_deduplication_token_in_dependent_materialized_views &&
!insert_settings.insert_deduplication_token.value.empty())
{
/** Update deduplication token passed to dependent MV with current table id. So it is possible to properly handle
* deduplication in complex INSERT flows.
*
* Example:
*
* landing -┬--> mv_1_1 ---> ds_1_1 ---> mv_2_1 --┬-> ds_2_1 ---> mv_3_1 ---> ds_3_1
* | |
* └--> mv_1_2 ---> ds_1_2 ---> mv_2_2 --┘
*
* Here we want to avoid deduplication for two different blocks generated from `mv_2_1` and `mv_2_2` that will
* be inserted into `ds_2_1`.
*/
auto insert_deduplication_token = insert_settings.insert_deduplication_token.value;

if (table_id.hasUUID())
insert_deduplication_token += "_" + toString(table_id.uuid);
else
insert_deduplication_token += "_" + table_id.getFullNameNotQuoted();

insert_context->setSetting("insert_deduplication_token", insert_deduplication_token);
}

// Processing of blocks for MVs is done block by block, and there will
// be no parallel reading after (plus it is not a costless operation)
Expand Down Expand Up @@ -327,6 +303,46 @@ Chain buildPushingToViewsChain(
auto & target_name = runtime_stats->target_name;
auto * view_counter_ms = &runtime_stats->elapsed_ms;

const auto & insert_settings = insert_context->getSettingsRef();
ContextMutablePtr view_insert_context = insert_context;

if (!disable_deduplication_for_children &&
insert_settings.update_insert_deduplication_token_in_dependent_materialized_views &&
!insert_settings.insert_deduplication_token.value.empty())
{
/** Update deduplication token passed to dependent MV with current view id. So it is possible to properly handle
* deduplication in complex INSERT flows.
*
* Example:
*
* landing -┬--> mv_1_1 ---> ds_1_1 ---> mv_2_1 --┬-> ds_2_1 ---> mv_3_1 ---> ds_3_1
* | |
* └--> mv_1_2 ---> ds_1_2 ---> mv_2_2 --┘
*
* Here we want to avoid deduplication for two different blocks generated from `mv_2_1` and `mv_2_2` that will
* be inserted into `ds_2_1`.
*
* We are forced to use view id instead of table id because there are some possible INSERT flows where no tables
* are involved.
*
* Example:
*
* landing -┬--> mv_1_1 --┬-> ds_1_1
* | |
* └--> mv_1_2 --┘
*
*/
auto insert_deduplication_token = insert_settings.insert_deduplication_token.value;

if (view_id.hasUUID())
insert_deduplication_token += "_" + toString(view_id.uuid);
else
insert_deduplication_token += "_" + view_id.getFullNameNotQuoted();

view_insert_context = Context::createCopy(insert_context);
view_insert_context->setSetting("insert_deduplication_token", insert_deduplication_token);
}

if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get()))
{
auto lock = materialized_view->tryLockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
Expand Down Expand Up @@ -394,7 +410,7 @@ Chain buildPushingToViewsChain(
insert_columns.emplace_back(column.name);
}

InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false);
InterpreterInsertQuery interpreter(nullptr, view_insert_context, false, false, false);
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);
out.addStorageHolder(view);
out.addStorageHolder(inner_table);
Expand All @@ -404,7 +420,7 @@ Chain buildPushingToViewsChain(
runtime_stats->type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert, storage_header);
}
Expand All @@ -413,13 +429,13 @@ Chain buildPushingToViewsChain(
runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW;
query = window_view->getMergeableQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert);
}
else
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ false,
thread_status_holder, running_group, view_counter_ms, async_insert);

Expand Down
@@ -0,0 +1,5 @@
0
ds_1_1 all_1_1_0 0
ds_1_1 all_2_2_0 0
landing all_1_1_0 0
10
@@ -0,0 +1,53 @@
SET insert_deduplicate = 1;
SET deduplicate_blocks_in_dependent_materialized_views = 1;
SET update_insert_deduplication_token_in_dependent_materialized_views = 1;
SET insert_deduplication_token = 'test';

DROP TABLE IF EXISTS landing;
CREATE TABLE landing
(
timestamp UInt64,
value UInt64
)
ENGINE = MergeTree ORDER BY tuple() SETTINGS non_replicated_deduplication_window = 1000;

DROP TABLE IF EXISTS ds_1_1;
CREATE TABLE ds_1_1
(
t UInt64,
v UInt64
)
ENGINE = MergeTree ORDER BY tuple() SETTINGS non_replicated_deduplication_window = 1000;

DROP VIEW IF EXISTS mv_1_1;
CREATE MATERIALIZED VIEW mv_1_1 TO ds_1_1 as
SELECT
timestamp t, sum(value) v
FROM landing
GROUP BY t;

DROP VIEW IF EXISTS mv_1_2;
CREATE MATERIALIZED VIEW mv_1_2 TO ds_1_1 as
SELECT
timestamp t, sum(value) v
FROM landing
GROUP BY t;

INSERT INTO landing SELECT 1 as timestamp, 1 AS value FROM numbers(10);

SELECT sleep(3);

INSERT INTO landing SELECT 1 as timestamp, 1 AS value FROM numbers(10);

SYSTEM FLUSH LOGS;
SELECT table, name, error FROM system.part_log
WHERE database = currentDatabase()
ORDER BY table, name;

SELECT count() FROM landing;

DROP TABLE landing;

DROP TABLE ds_1_1;
DROP VIEW mv_1_1;
DROP VIEW mv_1_2;