Skip to content

Commit

Permalink
Merge pull request #59544 from jrdi/fix-update-insert-deduplication-t…
Browse files Browse the repository at this point in the history
…oken

Fix corner case when passing `update_insert_deduplication_token_in_dependent_materialized_views`
  • Loading branch information
KochetovNicolai committed Feb 5, 2024
2 parents 08c1799 + 87cc319 commit 89bcebf
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 29 deletions.
2 changes: 1 addition & 1 deletion docs/en/operations/settings/settings.md
Expand Up @@ -2097,7 +2097,7 @@ SELECT * FROM test_table

## update_insert_deduplication_token_in_dependent_materialized_views {#update-insert-deduplication-token-in-dependent-materialized-views}

Allows to update `insert_deduplication_token` with table identifier during insert in dependent materialized views, if setting `deduplicate_blocks_in_dependent_materialized_views` is enabled and `insert_deduplication_token` is set.
Allows to update `insert_deduplication_token` with view identifier during insert in dependent materialized views, if setting `deduplicate_blocks_in_dependent_materialized_views` is enabled and `insert_deduplication_token` is set.

Possible values:

Expand Down
72 changes: 44 additions & 28 deletions src/Processors/Transforms/buildPushingToViewsChain.cpp
Expand Up @@ -247,30 +247,6 @@ Chain buildPushingToViewsChain(
{
insert_context->setSetting("insert_deduplicate", Field{false});
}
else if (insert_settings.update_insert_deduplication_token_in_dependent_materialized_views &&
!insert_settings.insert_deduplication_token.value.empty())
{
/** Update deduplication token passed to dependent MV with current table id. So it is possible to properly handle
* deduplication in complex INSERT flows.
*
* Example:
*
* landing -┬--> mv_1_1 ---> ds_1_1 ---> mv_2_1 --┬-> ds_2_1 ---> mv_3_1 ---> ds_3_1
* | |
* └--> mv_1_2 ---> ds_1_2 ---> mv_2_2 --┘
*
* Here we want to avoid deduplication for two different blocks generated from `mv_2_1` and `mv_2_2` that will
* be inserted into `ds_2_1`.
*/
auto insert_deduplication_token = insert_settings.insert_deduplication_token.value;

if (table_id.hasUUID())
insert_deduplication_token += "_" + toString(table_id.uuid);
else
insert_deduplication_token += "_" + table_id.getFullNameNotQuoted();

insert_context->setSetting("insert_deduplication_token", insert_deduplication_token);
}

// Processing of blocks for MVs is done block by block, and there will
// be no parallel reading after (plus it is not a costless operation)
Expand Down Expand Up @@ -327,6 +303,46 @@ Chain buildPushingToViewsChain(
auto & target_name = runtime_stats->target_name;
auto * view_counter_ms = &runtime_stats->elapsed_ms;

const auto & insert_settings = insert_context->getSettingsRef();
ContextMutablePtr view_insert_context = insert_context;

if (!disable_deduplication_for_children &&
insert_settings.update_insert_deduplication_token_in_dependent_materialized_views &&
!insert_settings.insert_deduplication_token.value.empty())
{
/** Update deduplication token passed to dependent MV with current view id. So it is possible to properly handle
* deduplication in complex INSERT flows.
*
* Example:
*
* landing -┬--> mv_1_1 ---> ds_1_1 ---> mv_2_1 --┬-> ds_2_1 ---> mv_3_1 ---> ds_3_1
* | |
* └--> mv_1_2 ---> ds_1_2 ---> mv_2_2 --┘
*
* Here we want to avoid deduplication for two different blocks generated from `mv_2_1` and `mv_2_2` that will
* be inserted into `ds_2_1`.
*
* We are forced to use view id instead of table id because there are some possible INSERT flows where no tables
* are involved.
*
* Example:
*
* landing -┬--> mv_1_1 --┬-> ds_1_1
* | |
* └--> mv_1_2 --┘
*
*/
auto insert_deduplication_token = insert_settings.insert_deduplication_token.value;

if (view_id.hasUUID())
insert_deduplication_token += "_" + toString(view_id.uuid);
else
insert_deduplication_token += "_" + view_id.getFullNameNotQuoted();

view_insert_context = Context::createCopy(insert_context);
view_insert_context->setSetting("insert_deduplication_token", insert_deduplication_token);
}

if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get()))
{
auto lock = materialized_view->tryLockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
Expand Down Expand Up @@ -394,7 +410,7 @@ Chain buildPushingToViewsChain(
insert_columns.emplace_back(column.name);
}

InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false);
InterpreterInsertQuery interpreter(nullptr, view_insert_context, false, false, false);
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);
out.addStorageHolder(view);
out.addStorageHolder(inner_table);
Expand All @@ -404,7 +420,7 @@ Chain buildPushingToViewsChain(
runtime_stats->type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert, storage_header);
}
Expand All @@ -413,13 +429,13 @@ Chain buildPushingToViewsChain(
runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW;
query = window_view->getMergeableQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert);
}
else
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ false,
thread_status_holder, running_group, view_counter_ms, async_insert);

Expand Down
@@ -0,0 +1,5 @@
0
ds_1_1 all_1_1_0 0
ds_1_1 all_2_2_0 0
landing all_1_1_0 0
10
@@ -0,0 +1,53 @@
SET insert_deduplicate = 1;
SET deduplicate_blocks_in_dependent_materialized_views = 1;
SET update_insert_deduplication_token_in_dependent_materialized_views = 1;
SET insert_deduplication_token = 'test';

DROP TABLE IF EXISTS landing;
CREATE TABLE landing
(
timestamp UInt64,
value UInt64
)
ENGINE = MergeTree ORDER BY tuple() SETTINGS non_replicated_deduplication_window = 1000;

DROP TABLE IF EXISTS ds_1_1;
CREATE TABLE ds_1_1
(
t UInt64,
v UInt64
)
ENGINE = MergeTree ORDER BY tuple() SETTINGS non_replicated_deduplication_window = 1000;

DROP VIEW IF EXISTS mv_1_1;
CREATE MATERIALIZED VIEW mv_1_1 TO ds_1_1 as
SELECT
timestamp t, sum(value) v
FROM landing
GROUP BY t;

DROP VIEW IF EXISTS mv_1_2;
CREATE MATERIALIZED VIEW mv_1_2 TO ds_1_1 as
SELECT
timestamp t, sum(value) v
FROM landing
GROUP BY t;

INSERT INTO landing SELECT 1 as timestamp, 1 AS value FROM numbers(10);

SELECT sleep(3);

INSERT INTO landing SELECT 1 as timestamp, 1 AS value FROM numbers(10);

SYSTEM FLUSH LOGS;
SELECT table, name, error FROM system.part_log
WHERE database = currentDatabase()
ORDER BY table, name;

SELECT count() FROM landing;

DROP TABLE landing;

DROP TABLE ds_1_1;
DROP VIEW mv_1_1;
DROP VIEW mv_1_2;

0 comments on commit 89bcebf

Please sign in to comment.