-
Notifications
You must be signed in to change notification settings - Fork 6.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix materialized column as sharding key #28637
Fix materialized column as sharding key #28637
Conversation
4104105
to
98a7061
Compare
@@ -126,21 +132,6 @@ void DistributedSink::consume(Chunk chunk) | |||
|
|||
auto ordinary_block = getPort().getHeader().cloneWithColumns(chunk.detachColumns()); | |||
|
|||
if (!allow_materialized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here was a previous solution for avoiding the error Cannot insert column, because it is MATERIALIZED column
after a materialized column is calculated and then sent to a remote table. The previous solution just removed materialized columns from blocks before sending them to remote tables. It helped with the error but produced another problem: it made ClickHouse to calculate a materialized column two times - on an initiator host and on a shard, and also made difficult to use a materialized column as a sharding key.
So I decided to try another approach: always send a materialized column to shards and also send the setting insert_allow_materialized_columns
forced to true
(no matter what the value of insert_allow_materialized_columns
on an initiator actually is). Thus a shard will not calculate values for materialized columns by itself, it will always use values calculated on an initiator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here was a previous solution for avoiding the error Cannot insert column, because it is MATERIALIZED column after a materialized column is calculated and then sent to a remote table. The previous solution just removed materialized columns from blocks before sending them to remote tables. It helped with the error but produced another problem: it made ClickHouse to calculate a materialized column two times - on an initiator host and on a shard, and also made difficult to use a materialized column as a sharding key.
Indeed.
So I decided to try another approach: always send a materialized column to shards and also send the setting insert_allow_materialized_columns forced to true (no matter what the value of insert_allow_materialized_columns on an initiator actually is). Thus a shard will not calculate values for materialized columns by itself, it will always use values calculated on an initiator.
The reason for the previous solution was that some local table may has dictGet that will make sense only on local node regardless was it in the INSERT'ed block or not (#23349 (comment)), and after this patch it will not be possible to restrict these, but personally I fine with this change (pretty specific use-case anyway).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So sometimes we may want a materialized column to be calculated in a distributed table, sometimes we may want it to be calculated on a shard. It's getting complicated. Maybe we should add a new setting for distributed tables named calculate_defaults_on_shards
to cover more cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to write possible cases.
insert_allow_materialized_columns=0
Any write of materialized column to any table (including Distributed) will fail.
But the problem is that if Distributed table has the same MATERIALIZED column as underlying table, then it will materialize it and later it will try to write these into underlying table and will fail, w/o #23349
And so in these case MATERIALIZED column indeed will be calculated 2 times, on initiator and on shards, and the result of the generated column on initiator will be simply discarded.
create table data (key Int, value Int materialized 100) engine=Null();
create table dist (key Int, value Int materialized 100) engine=Distributed(test_cluster_two_shards, currentDatabase(), data, key);
INSERT INTO dist VALUES (1);
[localhost] 2021.09.09 16:57:13.641736 [ 22781 ] {64e81fd5-4f69-48c5-aba3-4f776f47b336} <Debug> DistributedBlockOutputStream: default.dist (876d7f85-aa5a-4c1d-876d-7f85aa5a8c1d): column value will be removed, because it is MATERIALIZED
...
insert_allow_materialized_columns=1
With these setting set, MATERIALIZED column will be passed to the underlying storage and so it will not be calculated multiple times, only on initiator.
But there are some caveats in these (mentioned in the previous comments).
distributed_materialize_defaults_on_shards
You suggested calculate_defaults_on_shards
, but I took the liberty and renamed it, since it looks better to me
If these setting is set (distributed_materialize_defaults_on_shards=1
) MATERIALIZED column will be calculated on initiator always only, and so the INSERT will fail w/o insert_allow_materialized_columns=1
.
Maybe these setting should instead forbid INSERT w/o all columns, including materialized (since it expects that Distributed already filled it), these will make it strict and ensure that Distributed table has the same structure (since if the structure will not match, in terms of some new materialized columns, INSERT will fail), thoughts?
P.S. Actually if someone want strict behavior in this it is better to add MATERIALIZED columns only in Distributed or only in the underlying table.
P.S. 00952_insert_into_distributed_with_materialized_colum
should catch you change, but it uses the same default expression in Distributed and underlying table, hence it fails to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed my solution to make it more cautious. I put back your code removing materialized columns if insert_allow_materialized_columns=0
, but now I do that after the sharding key is calculated. It seems the final version must solve all the current problems without breaking anything.
@azat Can you take a look please? |
98a7061
to
954ae9d
Compare
954ae9d
to
1636ee2
Compare
The failures ( |
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed | ||
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from | ||
/// the sample block instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, looks like the comment is missing now, although it looks useful.
node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 DEFAULT x + 100, z Int32 DEFAULT x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)") | ||
node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 DEFAULT x + 200, z Int32 DEFAULT x - y) ENGINE = MergeTree() ORDER BY y") | ||
|
||
for insert_sync in [0, 1]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done in a better way, using @pytest.mark.parametrize
decorator
assert expected_error in node1.query_and_get_error("INSERT INTO TABLE dist (x, y) VALUES (1, 11), (2, 22), (3, 33)", settings=settings) | ||
|
||
|
||
# Almost the same as the previous test `test_materialized_column_disallow_insert_materialized`, but the sharding key has different values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to ensure that partitioning by sharding key indeed works, right?
Backport #28637 to 21.10: Fix materialized column as sharding key
Backport #28637 to 21.9: Fix materialized column as sharding key
Backport #28637 to 21.8: Fix materialized column as sharding key
Backport #28637 to 21.7: Fix materialized column as sharding key
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category:
Changelog entry:
Allow using a materialized column as the sharding key in a distributed table even if
insert_allow_materialized_columns=0
: