Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Try to cleanup lingering transactions when restoring in exactly-once mode #271

Merged
merged 5 commits into from
Sep 1, 2023

Conversation

banmoy
Copy link
Collaborator

@banmoy banmoy commented Aug 28, 2023

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Which issues of this PR fixes :

Fixes #

Problem Summary(Required) :

What's the problem

When using exactly-once, the connector will not abort the PREPARED transactions when the flink job failovers or exits because it's 2PC mechanism. Some of those PREPARED transactions may be in a successful checkpoint, and will be committed when the job restores from the checkpoint, but some of them are just useless, and should be aborted, otherwise they will be lingering in StarRocks until timeout which maybe make StarRocks unstable. We should try to abort those lingering transactions when restoring

How to solve it

When flink job restores, the connector will try to find those lingering transactions, and abort them. The key is how to find those transactions because the labels of them are not stored in the checkpoint. Here we design a label generator (ExactlyOnceLabelGenerator) to solve it

  1. the user must set option sink.label-prefix which is used as the prefix of the labels, and it must be unique across all the ingestions, including flink connector, broker load and routine load, running on the same StarRocks cluster
  2. the connector will generate label in the format {labelPrefix}-{tableName}-{subtaskIndex}-{id}.
    • the subtaskIndex will make the label unique across subtasks if the sink writes parallel
    • id is incremental, and it will make the label unique across different transactions in a subtask
  3. when checkpointing, current id will be stored as the state in the checkpoint, and the labels whose ids are less than the current id must be successful, and only those labels whose ids are equal or larger than the current id can be lingering
  4. when restoring, read the current id from the checkpoint, construct the label with the id, and get label status from StarRocks. The transaction is lingering if it's in PREPARED state, and should abort it

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr will affect users' behaviors
  • This pr needs user documentation (for new or modified features or behaviors)
  • I have added documentation for my new feature or new function

@dyp12
Copy link

dyp12 commented Aug 29, 2023

This PREPARED transaction is because the import encountered an exception. So when the import encountered an exception,Should the PREPARED transaction be closed and then restarted ? Each task needs to be set with sink.label-prefix, which makes it easy to repeat

@banmoy
Copy link
Collaborator Author

banmoy commented Aug 29, 2023

@dyp12 Thanks for your comments

  1. The transaction is set to PREPARED because a checkpoint is triggered, and we need to prepare it, see StarRocksDynamicSinkFunctionV2#snapshotState(). It will be committed finally when StarRocksDynamicSinkFunctionV2#notifyCheckpointComplete() is called which indicates the Flink checkpoint is successful globally. This is the two-phase-commit mechanism of Flink to implement exactly-once. Before notifyCheckpointComplete, we can not abort it even an exception happens because it may lead to data loss if the checkpoint is successful globally but has not notified this subtask.
  2. It brings burden for users to keep sink.label-prefix unique, but it seems there is not a better solution currently. The solution is similar to that of Flink Kafka connector, see sink.transactional-id-prefix

Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
@banmoy banmoy merged commit 83919ff into StarRocks:main Sep 1, 2023
3 of 4 checks passed
banmoy added a commit to banmoy/starrocks-connector-for-apache-flink that referenced this pull request Sep 2, 2023
… exactly-once mode (StarRocks#271)

Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
banmoy added a commit to banmoy/starrocks-connector-for-apache-flink that referenced this pull request Sep 2, 2023
… exactly-once mode (StarRocks#271)

Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
banmoy added a commit that referenced this pull request Sep 11, 2023
… exactly-once mode (#271)

Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
banmoy added a commit that referenced this pull request Sep 11, 2023
… exactly-once mode (#271)

Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants