New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes for storage S3Queue
#54422
Fixes for storage S3Queue
#54422
Conversation
This is an automated comment for commit f90e31e with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page Successful checks
|
87b9150
to
7338b56
Compare
7a7d900
to
220a67e
Compare
2a59edf
to
3b54b6b
Compare
/// Is is possible that we created an ephemeral processing node | ||
/// but session expired and someone other created an ephemeral processing node. | ||
/// To avoid deleting this new node, check processing_id. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't quite understand when it's possible. Can you explain with more details?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let' say we have clickhouse-server1
and clickhouse-server2
, create S3Queue
table on both servers pointing to the same keeper path. Let's say server 1 started processing file1
, but got keeper session expired before finishing. Expired session leads to expiration of ephemeral "processing" node. Server 2 sees that there is no "processing" nor "failed" nor "processed" node for file1
, so it will create a "processing" node itself for file1
. Then server1 restores keeper connection, it knows from memory that it was processing file1
and did not finalize the state in keeper. So it try to finalize it with either "failed" or "finished" state. But it will be incorrect if the described scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we need to think about deduplication here. If I understood correctly, the solution is to modify read offset in zk (or read number of rows, see my comments below) after each read block in source, so if we cannot modify it because of session expired, we don't insert this block. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, also there is a case that if we will update the counter in keeper after we read the block and before we pushed the block to mv, then there could be an exception during push and we get incorrect state in keeper, then to make things even worse - keeper session expires - so we cannot fix keeper state without a potential race with another server having s3queue starting processing the same file, so it is again not straightforward what to do here.
So (at least for now) I'd better document that the user is strongly recommended to use destination table of S3Queue MV with table engine which supports deduplication. Then occasional duplicated is not a problem.
/// Anyway we cannot do anything in case of SIGTERM, so destination table must anyway support deduplication, | ||
/// so here we will rely on it here as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also in case of exception during parsing we could already insert some data into destination table and after retries there will be duplications. We should defenitely add a note about it in the documentation. And I don't think we can solve it by saving file offsets that we processed because of formats with metadata and random access like Parquet/ORC/Arrow. Maybe we can store processed rows for each file and when we start reading the file again, we can skip already processed rows by just ignoring them here after reader->pull()
(also in future we can add optimization for it and add method skipRows(size_t rows) for input formats because most formats can skip rows fast enough). Or combine both methods (saving offset and saving processed rows) and use one of them depending on format (like, for CSV/TSV/JSONEachRow/etc we can save offset and start reading from it, and for Parquet/ORC/etc we can save previrously read rows and skip them before returning blocks (and for such formats skipping rows can be optimized using their metadata).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can store processed rows for each file and when we start reading the file again, we can skip already processed rows by just ignoring them here after reader->pull() (also in future we can add optimization for it and add method skipRows(size_t rows) for input formats because most formats can skip rows fast enough).
Good idea. Though if we update processed rows count in keeper for every block, it will result in too many keeper requests...
We should defenitely add a note about it in the documentation.
Will do 👌🏻 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though if we update processed rows count in keeper for every block, it will result in too many keeper requests...
Yeah, for sure. Maybe we can do it under a setting disabled by default and say to the users that deduplication can result in too many keeper requests. Or we can try to find other solution (maybe we can ask other opinions in a weekly meeting or in dev chat)
/// This is possible to achieve in case of parallel processing | ||
/// but for local processing we explicitly disable parallel mode and do everything in a single thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also create two S3Queue storages with the same zk path on one instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have other problems except retries with parallel processing in Ordered mode?
If only retries, maybe instead of setting s3queue_processing_threads_num
always to 1, we can set it to 1 only if retries were enabled? Or even throw an exception if retries are enabled in Ordered mode with a message that it can be broken in distributed scenario (and allow to force retries with ordered mode, so we will set threads_num to 1 and won't care about distributed case). Or maybe you already know how to solve this problem and we can just keep it as is and fix later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe you already know how to solve this problem and we can just keep it as is and fix later.
Yeah, I have an idea. I think this can be solved if ordered mode was implemented as a combo of ordered and unordered modes. Let's say there is a setting processing_window
, equal to for, example, 100. This window defines a range of files. Within this window we process files in parallel and do not process any other files until all files within this window are marked as either unretriably failed or processed. While we process a window of files we keep track of them in keeper like we track files in unordered mode. But once the full window is processed, we change the information in keeper to contain only max_processed_file (what we aim to do in ordered mode). And go to the next window afterwards.
Do we have other problems except retries with parallel processing in Ordered mode?
If server got terminated with, for example, SIGABRT and we did not finish processing file file1
, but another thread already processed file2
, then on restart we'll think we already processed file1
(even though we didn't). But this issue can be also solved by my idea described above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I have an idea. ...
Started implementing it. I think it will be better to be put in a separate PR, after this one is merged, for easier review.
test_zookeeper_config
test_delayed_replica_failover/test.py::test
test_postgresql_replica_database_engine_2 - #55772
02479_race_condition_between_insert_and_droppin_mv |
test_delayed_replica_failover/test.py::test
00385_storage_file_and_clickhouse-local_app_long
02479_race_condition_between_insert_and_droppin_mv |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Rewrited storage S3Queue completely: changed the way we keep information in zookeeper which allows to make less zookeeper requests, added caching of zookeeper state in cases when we know the state will not change, improved the polling from s3 process to make it less aggressive, changed the way ttl and max set for trached files is maintained, now it is a background process. Added
system.s3queue
andsystem.s3queue_log
tables.Closes #54998.