[BEAM-1815] Avoid shuffling twice in GABW.#2334
[BEAM-1815] Avoid shuffling twice in GABW.#2334amitsela wants to merge 1 commit intoapache:masterfrom
Conversation
|
R: @aviemzur |
|
Run Spark RunnableOnService |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
aviemzur
left a comment
There was a problem hiding this comment.
Awesome fix and improvement!
A few comments, nothing major.
A general idea: if possible, can write a util to convert a PairFunction to a FlatMapPairFunction
this could decrease the amount of changes in the code (and be useful in similar cases in the future).
| .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction()) | ||
| .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction()) | ||
| .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)); | ||
| // Use the RDD partitioner, if exists. |
There was a problem hiding this comment.
Is this comment correct? Looks like we're creating a HashPartitioner not using the RDD partitioner.
There was a problem hiding this comment.
Leftovers.. I'll fix that.
| Tuple2<ByteArray, Iterable<byte[]>>, Tuple2<K, Iterable<V>>>() { | ||
| @Override | ||
| public Tuple2<K, Iterable<V>> apply(Tuple2<ByteArray, Iterable<byte[]>> t2) { | ||
| K k = fromByteArray(t2._1().getValue(), keyCoder); |
| } | ||
| })); | ||
| public Iterable<Tuple2<K, Iterable<V>>> call( | ||
| Iterator<Tuple2<ByteArray, Iterable<byte[]>>> itr) { |
|
@aviemzur PTAL |
|
Refer to this link for build results (access rights to CI server needed): Failed Tests: 2beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-examples-java: 1beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-runners-apex: 1--none-- |
|
|
|
Run Spark RunnableOnService |
|
Refer to this link for build results (access rights to CI server needed): |
aviemzur
left a comment
There was a problem hiding this comment.
LGTM
Minor comments re: Javadoc and method name suggestions.
|
|
||
| /** A pair to {@link KV} function . */ | ||
| /** {@link KV} to pair flatmap function. */ | ||
| public static <K, V> PairFlatMapFunction<Iterator<KV<K, V>>, K, V> toPairFlatMapFunction() { |
There was a problem hiding this comment.
Rename suggestion: flatMapKVToPair / kvToPairFlatMapFunction?
| toPairByKeyInWindowedValue() { | ||
| return new PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>>() { | ||
| /** A pair to {@link KV} flatmap function . */ | ||
| static <K, V> FlatMapFunction<Iterator<Tuple2<K, V>>, KV<K, V>> fromPairFlatMapFunction() { |
There was a problem hiding this comment.
Rename suggestion: flatMapPairToKV / pairToKVFlatMapFunction?
| }; | ||
| } | ||
|
|
||
| public static <T, K, V> PairFlatMapFunction<Iterator<T>, K, V> pairFunctionToPairFlatMapFunction( |
There was a problem hiding this comment.
This is a v. useful public function, we should probably have a Javadoc for it.
Same comment for functionToFlatMapFunction
…void unnecessary shuffles in the composite GBK implementation. Add Javadoc.
002989e to
9ab77be
Compare
|
Run Spark RunnableOnService |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.