[BEAM-3648] Support Splittable DoFn in Flink Batch Runner#4640
[BEAM-3648] Support Splittable DoFn in Flink Batch Runner#4640aljoscha wants to merge 2 commits intoapache:masterfrom
Conversation
5ba84eb to
22bc9e8
Compare
|
Run Flink ValidatesRunner |
22bc9e8 to
2456f68
Compare
2456f68 to
a78bea0
Compare
|
Run Flink ValidatesRunner |
jkff
left a comment
There was a problem hiding this comment.
Awesome, thanks! Sorry for the delay - was preparing for the Strata talk...
|
|
||
| } | ||
|
|
||
| private static class GBKIntoKeyedWorkItemsTranslator<K, InputT> |
There was a problem hiding this comment.
Does this duplicate code with the streaming codepath? If so, can it be deduplicated into a shared location?
There was a problem hiding this comment.
There is no code duplication since the streaming codepath uses the Flink DataStream API while the batch codepath uses the DataSet API.
Flink is interesting in that the underlying execution engine is a streaming execution engine but there are two different APIs on top of that. Seems somewhat the opposite of what Dataflow was/is like.
There was a problem hiding this comment.
Oh okay - then no objections :)
| } | ||
| } | ||
|
|
||
| private static class SplittableParDoTranslatorBatch< |
| import org.joda.time.Instant; | ||
|
|
||
| /** | ||
| * A {@link RichGroupReduceFunction} for splittable {@link DoFn} in Flink Batch Runner. |
There was a problem hiding this comment.
Clarify how it differs from the streaming one?
There was a problem hiding this comment.
I'll add a comment.
| ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( | ||
| Executors.defaultThreadFactory()); | ||
|
|
||
| ((SplittableParDoViaKeyedWorkItems.ProcessFn) dofn) |
There was a problem hiding this comment.
Yes! I realised we also don't have the generic type info. I'll try and fix that.
| }, | ||
| sideInputReader, | ||
| executorService, | ||
| 10000, |
There was a problem hiding this comment.
Is there any reason to limit the output and duration in batch? Maybe we can just run the whole SDF.
There was a problem hiding this comment.
No reason, if there's work left it will just call again when the processing-time timer fires. I'll set this to Integer.MAX_VALUE, Duration.millis(Long.MAX_VALUE), though.
There was a problem hiding this comment.
Yeah that should be better in batch because we're optimizing for throughput rather than latency so bigger bundles are better. Depending on how Flink streaming runner works, it may be reasonable to choose different parameter values for streaming runner too, but the default ones may also be good.
|
Please let me know when ready for another round. |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This is built on top of #4639 so only the last commit is relevant here.