New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-591] KafkaIO : Improve watermarks and support server side timestamps #4680
Conversation
- Added TimestampPolicy that provides both record timestamps and watermarks. - built in policies for 'LogAppendTime' (server-time) ProcessingTime (default) - Ensure idle partitions don't hold watermark back - deprecated previous API to set functions for custom timestamps and watermarks.
R: @xumingmin |
Thanks @rangadi , is it ready for review? |
Yes. Please review when you get a chance. Thanks @xumingmin. We can hop a hangout chat if that is a faster way to explain some of the changes. A big chunk of the code is javadoc. |
Sounds good, I'm available tomorrow except 2-3pm and 4-5pm. |
Great. Let me know if you like to chat. Most of tomorrow is fine for me.
…On Tue, Feb 20, 2018 at 10:40 AM, XuMingmin ***@***.***> wrote:
Sounds good, I'm available tomorrow except 2-3pm and 4-5pm.
Will read the changes first today.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#4680 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAeq-mmkSJs2P53KNv4Ue1IfaRF2SmxSks5tWxGUgaJpZM4SEies>
.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job to redesign the timestamp/watermark in KafkaIO, it's much easier to use now.
/** | ||
* A function to assign a timestamp to a record. Default is processing timestamp. | ||
* @deprecated as of version 2.4. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from which version will it be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. May be in a couple of months. It is easy to switch to new API.
} | ||
|
||
/* | ||
* TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this function which is based on records in last period( 99% or min for example), will you add it?
|
||
@Override | ||
public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) { | ||
if (record.getTimestampType().equals(KafkaTimestampType.LOG_APPEND_TIME)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this check earlier, before job is actually running?
* seen in last 1 minute of wall clock time (this duration could be configurable). This is | ||
* similar to watermark set by PubsubIO. | ||
* | ||
* public static <K, V> TimestampPolicyFactory<K, V> withCreateTime() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any plan to add a TimestampPolicy
based on KafkaTimestampType.CREATE_TIME
?
Review status: 0 of 10 files reviewed at latest revision, 4 unresolved discussions, some commit checks failed. sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java, line 83 at r2 (raw file): Previously, XuMingmin wrote…
Yes. I plan to implement it, possibly sharing some of the code by PubsubIO. sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java, line 88 at r2 (raw file): Previously, XuMingmin wrote…
Yes. Will add after this. sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java, line 154 at r2 (raw file): Previously, XuMingmin wrote…
We have to read the record, not sure if there is a way to query if the topic has 'LogAppendTime' enabled. Usually better not to have to read the records from the job driver. It could be in a network that can't reach Kafka cluster. If this is a common error user's make we could consider. This exception should be visible to the user if it ever happens. Comments from Reviewable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM,
Feel free to create another PR for the 2 TimestampPolicy if available.
…tamps (apache#4680) * Redesign how timestamps and watermarks are handled in KafkaIO. - Added TimestampPolicy that provides both record timestamps and watermarks. - built in policies for 'LogAppendTime' (server-time) ProcessingTime (default) - Ensure idle partitions don't hold watermark back - deprecated previous API to set functions for custom timestamps and watermarks. * minor
Redesigns how timestamps and watermarks in KafkaIO.