Skip to content

Conversation

@AHeise
Copy link
Contributor

@AHeise AHeise commented Jun 19, 2020

What is the purpose of the change

Adds documentation about unaligned checkpoints.

Brief change log

  • Added Python API to enable unaligned checkpoints.

Doc is split into 3 parts to simulate the description of aligned checkpointing:

  • It's added on conceptual level in stateful-stream-processing.md with new/revised pics. It's written in a way that it could survive 1.12 without change.
  • A small change to dev/stream/state/checkpointing.md to show how it is enabled programmatically in Java/Scala/Python. Might need to be extended for 1.12 when new options become available (depending whether they can be programmatically changed or not).
  • A larger discussion in ops/state/checkpoints.md which includes the current limitations and a small glimpse into the next steps (will be in much more detail in blog post). This part needs to be largely rewritten for 1.12+ to reflect the new options.

Verifying this change

For Python API, added case in test_check_point_config.py.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit d162c92 (Fri Jun 19 12:47:13 UTC 2020)

Warnings:

  • Documentation files were touched, but no .zh.md files: Update Chinese documentation or file Jira ticket.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@AHeise
Copy link
Contributor Author

AHeise commented Jun 19, 2020

For ease of reviewing, I made a screenshot of the page with changed graphics.

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 19, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@morsapaes morsapaes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments. Thanks for sharing, @AHeise , I'll "align" the release blogpost with these docs.

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice graphics :)

I left some comments. Most of them are nits, except for mentioning feature status
concepts/stateful-stream-processing

See [Restart Strategies]({% link dev/task_failure_recovery.md
%}#restart-strategies) for more information.

### Unaligned Checkpointing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention that this is an experimental feature?
I think it should be a separate statement in the end of the section.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that this document is rather an expanded glossary and talks about the concept and not the implementation. Thus, I'd leave the implementation state out of this place. The ops link will directly say that it's experimental in 1.11.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page already goes quite deep into the details so I don't see why it shouldn't be mentioned here. Some users could benefit by ruling out the feature earlier if they are considering Flink or its configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we decided at a different point to drop the experimental label, I'd leave this section as is.

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding some more comments. Sorry for the intermittent review.

Looking at the configuration reference (ops/config.md) I couldn't find any relation between execution.checkpointing.unaligned and execution.checkpointing.max-concurrent-checkpoints. I think we should mention explicitly that when the former is enabled, the latter must be 1.

Comment on lines 135 to 136
- You cannot rescale from unaligned checkpoints. You have to take a savepoint
before rescaling. Savepoints are always aligned independent of the alignment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- You cannot rescale from unaligned checkpoints. You have to take a savepoint
before rescaling. Savepoints are always aligned independent of the alignment
- You cannot rescale or change job graph with unaligned checkpoints. You have to take a savepoint
before rescaling. Savepoints are always aligned independent of the alignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the job graph with current checkpoints? I was always assuming that you need savepoints.

Copy link
Contributor

@rkhachatryan rkhachatryan Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...the current Flink docs says that retained checkpoints:

do not support Flink specific features like rescaling.

...and nothing about the job graph.

Besides that, UC doesn't currently support Local recovery.

Edit:
Local recovery limitation should probably be described in
Tuning Checkpoints section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes technically but its incidental. The community hasn't made any backward compat guaruntees around that behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks.
I think we can leave it as is then.

Copy link
Contributor Author

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback @morsapaes and @rkhachatryan . I followed most of your suggestions, but I have some unresolved issues coming from @rkhachatryan .

See [Restart Strategies]({% link dev/task_failure_recovery.md
%}#restart-strategies) for more information.

### Unaligned Checkpointing
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that this document is rather an expanded glossary and talks about the concept and not the implementation. Thus, I'd leave the implementation state out of this place. The ops link will directly say that it's experimental in 1.11.

Comment on lines 135 to 136
- You cannot rescale from unaligned checkpoints. You have to take a savepoint
before rescaling. Savepoints are always aligned independent of the alignment
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the job graph with current checkpoints? I was always assuming that you need savepoints.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing this down @AHeise, mostly LGTM :)


/**
* Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
* Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure (experimental).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it's stable on our builds, maybe we could label it more production ready?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then just leave out experimental and just link to limitations?

operations can asynchronously snapshot their state.

Since Flink 1.11, checkpoints can be taken with or without alignment. In the
following, we describe aligned checkpoints first.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following section?

Unaligned checkpointing ensures that barriers are arriving at the sink as fast
as possible. It's especially suited for applications with at least one slow
moving data path, where alignment times can reach hours. However, since it's
adding additional I/O pressure to state backends, it doesn't help when the I/O
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I/O pressure to state backends -> I/O pressure, as it's not using state backends per se.

Comment on lines 138 to 140
- Flink currently does not support concurrent unaligned checkpoints. However,
due to the more predictable and shorter checkpointing times, concurrent
checkpoints might not be needed at all.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention here that aligned savepoints also can not happen concurrently to unaligned checkpoint

Comment on lines 143 to 154
Currently, Flink generates the watermark as a first step of recovery instead of
storing the latest watermark in the operators to ease rescaling. In unaligned
checkpoints, that means on recovery, **Flink generates watermarks after it
restores in-flight data**. If your pipeline uses an **operator that applies the
latest watermark on each record**, it will produce **incorrect results** during
recovery if the watermark is not directly or indirectly part of the operator
state. Thus, **SQL OVER operator should not be used with unaligned
checkpoints**, while window operators are safe to use. The workaround is to
store the watermark in the operator state. If rescaling may occur, watermarks
should be stored per key-group in a union-state. We mostly likely will
implement this approach as a general solution (didn't make it into Flink
1.11.0).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this paragraph is a bit too strong. As far as I understand, it's not that the UC will produce incorrect result, just that some records during the reprocessing might not be accounted as late data, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can tone it done, but basically we are breaking with the old assumption that watermarks don't need to be stored at the operator because they are sent first.
I'm especially referring to the OverITCases, which use a weird way to inject watermarks and logically should persist them. But now that I'm thinking about it, it's more a matter of the test setup itself.

Comment on lines 132 to 133
We flagged unaligned checkpoints as experimental as it currently has the
following limitations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also mention that flatMap operators can lead to unbounded spilled data.

Arvid Heise added 5 commits June 24, 2020 20:49
It's split into 3 parts to simulate the description of aligned checkpointing:
- It's added on conceptual level in stateful-stream-processing.md with new/revised pics. It's written in a way that it could survive 1.12 without change.
- A small change to dev/stream/state/checkpointing.md to show how it is enabled programmatically in Java/Scala/Python. Might need to be extended for 1.12 when new options become available (depending whether they can be programmatically changed or not).
- A larger discussion in ops/state/checkpoints.md which includes the current limitations and a small glimpse into the next steps (will be in much more detail in blog post). This part needs to be largely rewritten for 1.12+ to reflect the new options.
@zhijiangW
Copy link
Contributor

Since no pending requests and @pnowojski , @rkhachatryan already approved, I will merge it now.

@zhijiangW zhijiangW merged commit 58c2047 into apache:master Jun 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants