Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Replacing merge tree new engine #41005

Merged
merged 103 commits into from
Feb 16, 2023

Conversation

youennL-cs
Copy link
Contributor

@youennL-cs youennL-cs commented Sep 5, 2022

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Enrichment of the existing ReplacingMergeTree engine to allow duplicates insertion. It leverages the power of both ReplacingMergeTree and CollapsingMergeTree in one mergeTree engine. Deleted data are not returned when queried, but not removed from disk neither.

Change details

Following the discussion with the ClickHouse Support Team (@melvynator and engineering team), we propose an extension of the ReplacingMergeTree engine with collapsing: ReplacingCollapsingMergeTree. In this version, we enrich the existing ReplacingMergeTree engine. However, it could also be a fully new engine.

Our proposal adds an extra sign column (possible values: -1 / 1) to the ReplacingMergeTree.
The goal is to take advantage of ReplacingMergeTree and CollapsingMergeTree features in one mergeTree engine to allow insertion of duplicates.
This extra sign column is an optional column; however, if enabled, the version column becomes mandatory. This “new engine” allows backward compatibility with previous versions of the ReplacingMergeTree by being an option at the creation of the table.

No matter the operation on the data, the version must be increased. If two inserted rows have the same version number, the last inserted one is the one kept.

It allows:

  • to easily "update" a row by inserting row with (arbitrary) greater version ("replacing" merge algorithm collapses all rows with the same key into one row with the greatest version)
  • to easily "delete" a row by inserting row with (arbitrary) greater version and -1 sign ("replacing" merge will leave only one row with sign = -1 and then it will be filtered out)
  • to "update" key columns by deleting a row (as described above) and inserting a new one "Replacing" merge algorithm removes duplicates as well, so it's safe to insert multiple rows with the same version.

From our perspective, deleted data must be filtered out when queried and not removed from disk. When partitioned on several shards, data can have several versions on different partitions. If rows with a -1 sign are deleted, we would lose the information of deleted data which could lead eventually to incorrect KPIs.

Required help

We don't know how to filter out data that has a sign set to "-1", when queried. We don't want to delete data from the disk when a -1 sign is received; we want to filter out data when queried and don't return them, while keeping the information.

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

@robot-ch-test-poll robot-ch-test-poll added the pr-feature Pull request with new product feature label Sep 5, 2022
@youennL-cs youennL-cs marked this pull request as ready for review September 6, 2022 09:30
@Avogar Avogar added the can be tested Allows running workflows for external contributors label Sep 7, 2022
@filimonov filimonov self-assigned this Sep 8, 2022
@filimonov
Copy link
Contributor

Actually, that implements #2074 (comment)

And reimplements in a clearer / more explicit way hackish thing used by MaterializedMySQL - see #4006 (comment) + ReadFinalForExternalReplicaStorage code

Several questions:

Deleted data are not returned when queried

That will require applying FINAL, isn't it? (BTW currently we work on that: #39463 )

We don't want to delete data from the disk when a -1 sign is received;

Why do you want to keep it?

We don't want to delete data from the disk when a -1 sign is received;
we want to filter out data when queried and don't return them

That means the FINAL logic should differ from the merge logic, I'm not sure if it's a good idea.

Also: what do you think about using single version value like described in issue. i.e.

val='a', version=1 (insert)
val='b' version=2 (update)
val='c', version=3 (update)
val='', version=-4 (delete)

Pros: less columns to read, logic should be very simple. Cons: a bit confusing to have negative versions, ordering by absolute value is (a bit) more complex.

@tavplubix
Copy link
Member

tavplubix commented Sep 8, 2022

using single version value like described in #2074

Let's avoid adding negative versions, it's too tricky and misleading

@youennL-cs
Copy link
Contributor Author

That will require applying FINAL, isn't it? (BTW currently we work on that: #39463 )

Yes it would be applied for FINAL

Why do you want to keep it?

For two reasons, first, because it would allow to insert in an out-of-order way.
Second, in the case of several shards, the data can be splited on all shards as well as versions for a same element. So, if we delete data from disk the -1 rows, we would have either to remove all data from all shards for all previous versions too, or we will lost the information that a new version with the -1 sign has been inserted and return a non-updated previous version which would lead to wrong results.

using single version value like described in #2074

Let's avoid adding negative versions, it's too tricky and misleading

Agree, managing version that can be negative will be way too complexe when we would order and filter (and other cases)

@filimonov
Copy link
Contributor

insert in an out-of-order way.

ok.

in the case of several shards, the data can be splited on all shards as well as versions for a same element.

I hope you're aware that Distributed is not able to apply FINAL or do Replacing / Collapsing etc. logic across shards.

@filimonov
Copy link
Contributor

filimonov commented Sep 8, 2022

We don't know how to filter out data that has a sign set to "-1", when queried. We don't want to delete data from the disk when a -1 sign is received; we want to filter out data when queried and don't return them, while keeping the information.

We don't want to delete data from the disk when a -1 sign is received; we want to filter out data when queried and don't return them

That means the FINAL logic should differ from the merge logic

That means you can not simply modify ReplacingSortedTransform making it a respect sign unconditionally (similarly to CollapsingSortedTransform to remove last row if it's -1) - because that will impact both merges and select.

But may be you can make it it adjustable, controlled by some flag passed in the constructor of ReplacingSortedTransform/ReplacingSortedAlgorithm. And if ReplacingSortedTransform is called from the merge - it should work like now, and if it's called from the SELECT ... FINAL then it should look additionally at the sign column.

For sure you can't apply that on prewhere stage like there because in that case the last row will be filtered out before replacing logic will be applied, so that may end up with showing the previous state of the row (before it was deleted).

Another alternative - is injecting that to a WHERE stage, but may be ReplacingSortedTransform would be better.

@tavplubix is it ok for you?

@youennL-cs can you try to implement that? And also add merges to your test (i.e. optimize final). Also you will need to check other places where ReplacingSortedAlgorithm / ReplacingSortedAlgorithm is used - for example

return std::make_shared<CollapsingSortedAlgorithm>(
)

@tavplubix tavplubix changed the title Replacing merge tree new engine [RFC] Replacing merge tree new engine Sep 8, 2022
@tavplubix tavplubix added the st-discussion The story requires discussion /research / expert help / design & decomposition before will be taken label Sep 8, 2022
@tavplubix
Copy link
Member

an extra sign column (possible values: -1 / 1)

I would suggest 0 and 1 (1 is for insert and 0 is for delete). It will make filtering faster and more natural.

We don't want to delete data from the disk when a -1 sign is received;

This is really questionable, because deleted rows will never be cleaned up (btw, it's one of major disadvantages of MaterializedMySQL). I didn't get your point about sharding (because there's not sharding is not related to MergeTree at all), please explain it in more details. We should provide a way to cleanup old deleted rows (maybe by something like TTL, or some constraints that limit out-of-order writes, so it will be safe to clear old enough versions).

Another alternative - is injecting that to a WHERE stage

We should avoid AST rewriting during query execution.

@filimonov
Copy link
Contributor

an extra sign column (possible values: -1 / 1)

I would suggest 0 and 1 (1 is for insert and 0 is for delete). It will make filtering faster and more natural.

Also thought about the same. And may be calling that sign is strange in that case?

We don't want to delete data from the disk when a -1 sign is received;

This is really questionable, because deleted rows will never be cleaned up (btw, it's one of major disadvantages of MaterializedMySQL).

Yep.

I didn't get your point about sharding (because there's not sharding is not related to MergeTree at all), please explain it in more details.

I also didn't get the point with shards. But out-of-order things on different replicas can be the issue.

replica 1:
v: 1000, s: 1
v: 1010, s: -1

merge of those -> removes the row

replica 2:
v: 1005, s: 1
v: 1006, s: 1

merge of those -> keeps 1006,

Now they exchange parts, and record with version 1006 returns back.

We should provide a way to cleanup old deleted rows (maybe by something like TTL ... so it will be safe to clear old enough versions).

That would be cool, but not yet sure how to do that w/o overcomplicating things too much. May be based on th part age? I.e. if we merge part older than X minutes / hours we are allowed to eliminate removed rows?

But that cleaning old rows can be added later

@tavplubix
Copy link
Member

But that cleaning old rows can be added later

Authors of MaterializedMySQL promised to add old rows cleanup later... :)

@filimonov
Copy link
Contributor

But that cleaning old rows can be added later

Authors of MaterializedMySQL promised to add old rows cleanup later... :)

I thought about that a bit - actually the only safe scenarios of deletion are:

  1. when inserts come out of order is when ALL parts of the partition are merged into one, AND the partition is old enough to not get new rows anymore. (because the sequence of merges is not guaranteed)
  2. We know for sure that inserts come in order.

Maybe we can just have a switch like keep_deleted_rows for the table, default 1, which can be set to 0 if the end user is sure that inserts go in order.

@gontarzpawel
Copy link

gontarzpawel commented Sep 9, 2022

Hello @filimonov @tavplubix ,

I'm joining the discussion, we're colleagues with @youennL-cs :)

Regarding the confusion around shards and out of orderness. Actually the concept was mixed in and what @youennL-cs meant was the issue that could exist between unmerged parts within partition. We assume that records sharing the same PK reside in the same shard.

Referring to the example:

replica 1:
v: 1000, s: 1
v: 1010, s: -1

merge of those -> removes the row
replica 2:

v: 1005, s: 1
v: 1006, s: 1

merge of those -> keeps 1006,

we could face analogical issue across unmerged parts, if replica 1 was part 1 and replica 2 part 2 instead. As a result, merged part 1 would keep one record:

v: 1010, s: -1
and the latter merge would yield no value (v: 1006 < v: 1010).

Therefore, we believe it is important to not remove deleted rows.

If our assumption of data locality is wrong, then indeed the same issue appears also across nodes.

@gontarzpawel
Copy link

I thought about that a bit - actually the only safe scenarios of deletion are:

when inserts come out of order is when ALL parts of the partition are merged into one, AND the partition is old enough to not get new rows anymore. (because the sequence of merges is not guaranteed)
We know for sure that inserts come in order.
Maybe we can just have a switch like keep_deleted_rows for the table, default 1, which can be set to 0 if the end user is sure that inserts go in order.

IMO that could be a nice tradeoff. Maybe the partition is old enough could be also configurable by the user?

@filimonov
Copy link
Contributor

filimonov commented Sep 13, 2022

Maybe the partition is old enough could be also configurable by the user?

Yep, but if inserts to that old partition were going out of order and we merge some subset of parts - it seems like the same problem as above is possible

┌────────────────────────────────────┐   ┌──────────────────────────────┐
│                                    │   │                              │
│                                    │   │                              │
│    ┌──────────┐   ┌──────────┐     │   │ ┌──────────┐   ┌──────────┐  │
│    │2020_1_1_0│   │2020_2_2_0│     │   │ │2020_3_3_0│   │2020_4_4_0│  │
│    ├──────────┤   ├──────────┤     │   │ ├──────────┤   ├──────────┤  │
│    │id: 1     │   │id: 1     │     │   │ │id: 1     │   │id: 1     │  │
│    │ v: 1000  │   │ v: 1010  │     │   │ │ v: 1005  │   │ v: 1006  │  │
│    │ s: 1     │   │ s: -1    │     │   │ │ s: 1     │   │ s: 1     │  │
│    └──────────┘   └──────────┘     │   │ └──────────┘   └──────────┘  │
│                                    │   │                              │
│  MERGE 1                           │   │ MERGE 2                      │
│  last state is deleted             │   │ keeps version 1006 :/        │
│  partition is old, row removed     │   │                              │
└────────────────────────────────────┘   └──────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│    Merge all 4 is safe. Will find the latest version 1010 and   │
│    remove                                                       │
│    ┌──────────┐   ┌──────────┐    ┌──────────┐   ┌──────────┐   │
│    │2020_1_1_0│   │2020_2_2_0│    │2020_3_3_0│   │2020_4_4_0│   │
│    ├──────────┤   ├──────────┤    ├──────────┤   ├──────────┤   │
│    │id: 1     │   │id: 1     │    │id: 1     │   │id: 1     │   │
│    │ v: 1000  │   │ v: 1010  │    │ v: 1005  │   │ v: 1006  │   │
│    │ s: 1     │   │ s: -1    │    │ s: 1     │   │ s: 1     │   │
│    └──────────┘   └──────────┘    └──────────┘   └──────────┘   │
│                                                                 │
│    Only if all parts of the partition are merged                │
│    and we are sure that new inserts will not come to the        │
│    partition we can do the remove                               │
└─────────────────────────────────────────────────────────────────┘

But checking inside the merge if it collected all the parts from the partition is not something natural... :|

Maybe we can do some manual command, like

OPTIMIZE TABLE foo PARTITION ID '2020' FINAL WITH CLEANUP

?

@gontarzpawel
Copy link

Maybe we can do some manual command, like

OPTIMIZE TABLE foo PARTITION ID '2020' FINAL WITH CLEANUP

?

Do you mean letting cancel rows stay on disk unless manually launched OPTIMIZE command with WITH CLEANUP?
Also, is OPTIMIZE with parameter WITH CLEANUP already available?

@filimonov
Copy link
Contributor

Do you mean letting cancel rows stay on disk unless manually launched OPTIMIZE command with WITH CLEANUP?

Yep

Also, is OPTIMIZE with parameter WITH CLEANUP already available?

No

@gontarzpawel
Copy link

gontarzpawel commented Sep 13, 2022

Also, is OPTIMIZE with parameter WITH CLEANUP already available?

No

Okay, it sounds good! Any tips for the implementation? Is everyone involved in the discussion aligned with the strategy?

@hodgesrm
Copy link
Contributor

Hello. Great discussion. Deleting rows is subtle. If there is an OPTIMIZE WITH CLEANUP command I suggest we consider implementing a way to run this automatically after some period of time. After all, if it's safe for users to do it after some [configurable] period of time, it is safe for ClickHouse to do it.

The alternative is that users will need to implement it themselves, like PostgreSQL VACUUM in days of yore. It was a major headache. There are use cases like running TRUNCATE on an upstream table that can generate very large numbers of deletes that will just hang around in ClickHouse. Users should not have to think about this case.

@gontarzpawel
Copy link

Hello. Great discussion. Deleting rows is subtle. If there is an OPTIMIZE WITH CLEANUP command I suggest we consider implementing a way to run this automatically after some period of time. After all, if it's safe for users to do it after some [configurable] period of time, it is safe for ClickHouse to do it.

The alternative is that users will need to implement it themselves, like PostgreSQL VACUUM in days of yore. It was a major headache. There are use cases like running TRUNCATE on an upstream table that can generate very large numbers of deletes that will just hang around in ClickHouse. Users should not have to think about this case.

I agree with this approach.
We'll give it a try to implement the first version of OPTIMIZE with WITH CLEANUP. After that, once you guys have a first look, we'll automatise it and make execution of those optimize processes configurable.

@filimonov
Copy link
Contributor

Is everyone involved in the discussion aligned with the strategy?

@tavplubix ? wrap up:

-- is deleted is simple 0/1 UInt8 flag, simpler than sign
CREATE TABLE (...) Engine=ReplacingMergeTree(version, is_deleted) 
SETTINGS clean_deleted_rows='never';
-- 'never' is default /  'always' - can be enabled if inserts come in order,
-- in the future other modes can be added */ 


OPTIMIZE TABLE ... FINAL WITH CLEANUP 

@filimonov
Copy link
Contributor

Hello. Great discussion. Deleting rows is subtle. If there is an OPTIMIZE WITH CLEANUP command I suggest we consider implementing a way to run this automatically after some period of time. After all, if it's safe for users to do it after some [configurable] period of time, it is safe for ClickHouse to do it.

Would love to have something like that also... The problem is that currently, there is not straight-forward & generic way of getting the deletion timestamp... We can not base it on part timestamp (they are shifted with every merge), we could try to use somehow the value of version column (and for example to allow automatic deletion of the rows if their version is much older that current version - can work if the timestamp or offer in replication log is used as a version, but for that we need to extract / know the 'latest' version somehow - can be stored on the table level, it's quite easy to fill it if inserts are coming, but can be problematic to get it after server restart without rereading the column).

@gontarzpawel
Copy link

Would love to have something like that also... The problem is that currently, there is not straight-forward & generic way of getting the deletion timestamp... We can not base it on part timestamp (they are shifted with every merge), we could try to use somehow the value of version column (and for example to allow automatic deletion of the rows if their version is much older that current version - can work if the timestamp or offer in replication log is used as a version, but for that we need to extract / know the 'latest' version somehow - can be stored on the table level, it's quite easy to fill it if inserts are coming, but can be problematic to get it after server restart without rereading the column).

@filimonov It seems to me that it's not that easy to implement and likely will bring some interesting edge cases to investigate. Maybe we could consider it a V2 iteration? Keeping this one as an experimental feature? Maybe more ideas will land on the table too.

@filimonov
Copy link
Contributor

Also related: #41817

Copy link
Member

@tavplubix tavplubix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general LGTM, but needs to fix failed integration tests

CMakeLists.txt Outdated Show resolved Hide resolved
programs/server/config.xml Outdated Show resolved Hide resolved
if (arg_cnt - arg_num == 2 && !engine_args[arg_cnt - 1]->as<ASTLiteral>() && is_extended_storage_def)
{
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.is_deleted_column))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "is_deleted column name must be an unquoted string {}", verbose_help_message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw Exception(ErrorCodes::BAD_ARGUMENTS, "is_deleted column name must be an unquoted string {}", verbose_help_message);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "is_deleted column name must be an identifier {}", verbose_help_message);

@youennL-cs
Copy link
Contributor Author

@tavplubix , is this stress test considered as a blocker too ?
I can't find what's wrong with the Hung check and consequently if it's related or not.
Otherwise, all other tests passed.

@tavplubix
Copy link
Member

No, this failure is unrelated - #45372

@tavplubix tavplubix merged commit 6526c2a into ClickHouse:master Feb 16, 2023
@filimonov
Copy link
Contributor

@filimonov It seems to me that it's not that easy to implement and likely will bring some interesting edge cases to investigate. Maybe we could consider it a V2 iteration? Keeping this one as an experimental feature? Maybe more ideas will land on the table too.

@youennL-cs @gontarzpawel maybe let's think about cleanup in older parts? Maybe something similar to #35836 can work.

@filimonov
Copy link
Contributor

Also, can you please send the PR with updated documentation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
can be tested Allows running workflows for external contributors pr-feature Pull request with new product feature st-discussion The story requires discussion /research / expert help / design & decomposition before will be taken
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants