-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-158] add support for bounded sources in streaming #104
Conversation
@@ -73,8 +69,9 @@ public void configure(Configuration configuration) {} | |||
|
|||
@Override | |||
public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { | |||
options = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems inefficient to decode pipeline options several times.
Is this to protect the user from mutating it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This will be called on every input split. We can move the deserialization code to the configure method.
Incorporated the suggestions. Would like to merge later on. |
WindowedValue.of(value, | ||
BoundedWindow.TIMESTAMP_MIN_VALUE, | ||
GlobalWindow.INSTANCE, | ||
PaneInfo.NO_FIRING)); | ||
} | ||
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know nothing here, just want to confirm that it's okay to use an "IngestionTimeExtractor" for a collection where all elements have timestamps of MIN_VALUE.
LGTM. I left one comment, but merge as you see fit! |
(And a gentle reminder to squash CLs as appropriate) |
Hi @mxm, just a ping that (AFAIK) this is ready for you to merge. |
Hi @mxm, just a ping that this is ready for you to rebase and merge. |
Hi @dhalperi, I was completely knocked out for a week. Will merge this later on. |
Rebased and merged accordingly. |
Hope you're feel better! |
Thanks, much better :) |
#! Add contact information to README.md
Apart from a few improvements, this PR introduces bounded sources in streaming. The BoundedSource wrapper (
SourceInputFormat
) is the same as for the batch part of the runner. The translator assigns ingestion time watermarks and processing time timestamps upon reading from the source. We could make this more flexible in terms of watermark generation if we had an UnboundedSource wrapper for a BoundedSource.Perhaps we could have common utility for runners to deal with serialization of PipelineOptions. At some point, they have to be shipped. I had to change the serialization code because I was experiencing a serialization bug which led to a serialization loop. Debugging this was almost impossible because the stack trace doesn't show all serialization calls due to some magic in the VM. I didn't find any cyclic references between the PipelineOptions and Flink components. I'm assuming this is a bug and the workaround using byte array serialization of the options is fair enough. See
SourceInputFormat
.