Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5697] [kinesis] Add periodic per-shard watermark support #6980

Closed
wants to merge 9 commits into from

Conversation

Projects
None yet
6 participants
@tweise
Copy link
Contributor

commented Nov 1, 2018

What is the purpose of the change

Adds support for periodic per-shard watermarks to the Kinesis consumer. This functionality is off by default and can be enabled by setting an optional watermark assigner on the consumer. When enabled, the watermarking also optionally supports idle shard detection based on configurable interval of inactivity.

Brief change log

  • Add watermark assigner to consumer
  • Modify data fetcher to track watermark state per shard
  • Modify emitRecordAndUpdateState to extract timestamp and update watermark
  • Timer driven periodic watermark emit

Verifying this change

This change added tests and can be verified as follows:

Added a unit test and planning to add more test coverage with subsequent work for shared watermark state and emit queue as discussed on ML. This change is ported from Lyft internal codebase that is used in production.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
@tweise

This comment has been minimized.

Copy link
Contributor Author

commented Nov 1, 2018

@tweise tweise closed this Nov 1, 2018

@tweise tweise reopened this Nov 1, 2018

@EronWright

This comment has been minimized.

Copy link
Contributor

commented Nov 1, 2018

There is a caveat with this implementation that the docs should perhaps mention. The caveat is that it may produce spurious late events when processing a backlog of data.

Here's an example of when that may occur. Imagine that subtask 1 is processing shard A and subtask 2 is processing shard B. Shard A has reached 6:00 in event time (as per the assigner), and so the subtask emits the corresponding watermark. At this point, the subtask has made the irrevocable assertion that subsequent events will be past 6:00. Meanwhile, Shard B is at 5:30 and undergoes a split into C/D. If either shard is subsequently assigned to subtask 1, the events will be considered late due to the assertion made earlier.

@tweise

This comment has been minimized.

Copy link
Contributor Author

commented Nov 1, 2018

@EronWright that's correct and I will make sure to document this. Even our planned follow-up work won't be able to address such resharding scenario. I think we will only be able to address that with the new source design that is currently under discussion (which should permit centralized discovery and more sophisticated splitting/shard distribution).

@jgrier

This comment has been minimized.

Copy link
Member

commented Nov 5, 2018

Looks good @tweise.

@tweise

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

@tzulitai

This comment has been minimized.

Copy link
Contributor

commented Nov 6, 2018

@tweise will put this on my backlog and try to get to this as soon as I'm finished with 1.7 remaining work.

@mxm
Copy link
Contributor

left a comment

I think this looks good and almost ready to be merged. Couple of minor comments.

Show resolved Hide resolved .../org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java Outdated
Show resolved Hide resolved .../org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
Show resolved Hide resolved ...che/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java Outdated
Show resolved Hide resolved ...che/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java Outdated
Show resolved Hide resolved ...che/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
* Return the current system time. Allow tests to override this to simulate progress for watermark
* logic.
*
* @return

This comment has been minimized.

Copy link
@mxm

mxm Nov 20, 2018

Contributor

Remove or document.

Show resolved Hide resolved ...che/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
Show resolved Hide resolved ...che/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
Show resolved Hide resolved pom.xml Outdated

tweise and others added some commits Nov 20, 2018

Update flink-connectors/flink-connector-kinesis/src/main/java/org/apa…
…che/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java

Co-Authored-By: tweise <tweise@users.noreply.github.com>
@mxm

mxm approved these changes Nov 21, 2018

@asfgit asfgit closed this in f75c33d Nov 21, 2018

@tweise

This comment has been minimized.

Copy link
Contributor Author

commented Nov 21, 2018

@mxm thanks for the review and merge!

xueyumusic added a commit to xueyumusic/flink that referenced this pull request Nov 24, 2018

[FLINK-5697] [kinesis] Add periodic per-shard watermark support
Adds support for periodic per-shard watermarks to the Kinesis consumer. This
functionality is off by default and can be enabled by setting an optional
watermark assigner on the consumer. When enabled, the watermarking also
optionally supports idle shard detection based on configurable interval of
inactivity.

- Add watermark assigner to consumer
- Modify data fetcher to track watermark state per shard
- Modify emitRecordAndUpdateState to extract timestamp and update watermark
- Timer driven periodic watermark emit

This closes apache#6980.

TisonKun added a commit to TisonKun/flink that referenced this pull request Jan 17, 2019

[FLINK-5697] [kinesis] Add periodic per-shard watermark support
Adds support for periodic per-shard watermarks to the Kinesis consumer. This
functionality is off by default and can be enabled by setting an optional
watermark assigner on the consumer. When enabled, the watermarking also
optionally supports idle shard detection based on configurable interval of
inactivity.

- Add watermark assigner to consumer
- Modify data fetcher to track watermark state per shard
- Modify emitRecordAndUpdateState to extract timestamp and update watermark
- Timer driven periodic watermark emit

This closes apache#6980.

tinder-dthomson added a commit to tinder-dthomson/flink that referenced this pull request Feb 1, 2019

[FLINK-5697] [kinesis] Add periodic per-shard watermark support
Adds support for periodic per-shard watermarks to the Kinesis consumer. This
functionality is off by default and can be enabled by setting an optional
watermark assigner on the consumer. When enabled, the watermarking also
optionally supports idle shard detection based on configurable interval of
inactivity.

- Add watermark assigner to consumer
- Modify data fetcher to track watermark state per shard
- Modify emitRecordAndUpdateState to extract timestamp and update watermark
- Timer driven periodic watermark emit

This closes apache#6980.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.