Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4783] Fix invalid parameter to set the partitioner in Spark GbK #6884

Merged
merged 1 commit into from Oct 30, 2018

Conversation

@iemejia
Copy link
Member

iemejia commented Oct 30, 2018

Code was refactored too, to make it less error-prone.

Please add a meaningful description for your change here


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
Build Status --- --- ---
Code was refactored too, to make it less error-prone.
@iemejia

This comment has been minimized.

Copy link
Member Author

iemejia commented Oct 30, 2018

Run Spark ValidatesRunner

@iemejia iemejia requested a review from timrobertson100 Oct 30, 2018
.get()
.as(SparkPipelineOptions.class)
.getBundleSize()
> 0);

This comment has been minimized.

Copy link
@timrobertson100

timrobertson100 Oct 30, 2018

Contributor

For anyone observing: this is the real issue where it should have been ==0 to pass in defaultParallelism==true if no explicit bundle size was set.

The refactor removes the likelihood of error by explicitly taking a Partitioner (which could potentially be of other type that a HashPartitioner). Nice

@timrobertson100

This comment has been minimized.

Copy link
Contributor

timrobertson100 commented Oct 30, 2018

Code LGTM but Jenkins is going down

@timrobertson100 timrobertson100 merged commit cac06ce into apache:master Oct 30, 2018
5 checks passed
5 checks passed
Apache Spark Runner ValidatesRunner Tests SUCCESS
Details
Go ("Run Go PreCommit") SUCCESS
Details
Java ("Run Java PreCommit") SUCCESS
Details
Python ("Run Python PreCommit") SUCCESS
Details
RAT ("Run RAT PreCommit") SUCCESS
Details
@iemejia iemejia deleted the iemejia:BEAM-4783-performance-fix branch Oct 30, 2018
@kyle-winkelman

This comment has been minimized.

Copy link
Contributor

kyle-winkelman commented Jan 8, 2019

I believe this refactor actually does the opposite of what it was supposed to. Previously the HashPartitioner was used in all cases. I wanted to get rid of it but @iemejia was concerned it might bring back an old issue in which the SparkRunner when in streaming mode would shuffle the data twice. I therefore only removed the HashPartitioner in the case that bundleSize was specified. Can someone check if a streaming workflow with a groupByKey has a double shuffle? If not we can remove most of this code and always call rdd.groupByKey() without the HashPartitioner. If it does we need to flip all of this to do the opposite.

@@ -130,17 +132,14 @@ public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());

// --- group by key only.
Long bundleSize =
context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize();
Partitioner partitioner =

This comment has been minimized.

Copy link
@kyle-winkelman

kyle-winkelman Jan 8, 2019

Contributor

It should be flipped to maintain old functionality:

        Partitioner partitioner =
            (bundleSize > 0)
                ? null
                : new HashPartitioner(context.getSparkContext().defaultParallelism());

This comment has been minimized.

Copy link
@kyle-winkelman

kyle-winkelman Jan 8, 2019

Contributor

I also think it will have basically the same effect if we always use null in the batch context. If we split a source on bundleSize and it has n partitions or split it on defaultParallelism and it has defaultParallelism partitions, either way going forward we want to maintain that many partitions which is what no HashPartitioner does. Only case where this is weird is if we try to split on defaultParallelism but the source doesn't support splitting that much causing < defaultParallelism partitions.

@@ -433,7 +432,7 @@ public String toNativeString() {
WindowedValue.FullWindowedValueCoder.of(kvCoder.getValueCoder(), windowCoder);

JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupRDD =
GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, true);
GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, null);

This comment has been minimized.

Copy link
@kyle-winkelman

kyle-winkelman Jan 8, 2019

Contributor

I believe this is actually correct and I did it wrong originally.

@@ -301,7 +301,7 @@ public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKeyStream =
dStream.transform(
rdd ->
GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, true));
GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, null));

This comment has been minimized.

Copy link
@kyle-winkelman

kyle-winkelman Jan 8, 2019

Contributor

To maintain old functionality:

GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, new HashPartitioner(context.getSparkContext().defaultParallelism()));
@kyle-winkelman

This comment has been minimized.

Copy link
Contributor

kyle-winkelman commented Jan 14, 2019

@iemejia Were you able to confirm if this change fixed the performance issues seen in the Spark Runner? My code was poorly written and believe that your way is much more clear but if you take a look at the state before my PR you will what I am talking about. Specifically this comment.

@kyle-winkelman

This comment has been minimized.

Copy link
Contributor

kyle-winkelman commented Jan 29, 2019

@timrobertson100 @aaltay @kennknowles Can any of you confirm that the performance of the Spark Runner has gone back to levels before PR #6181 and if it does so without reintroducing BEAM-1815? You can see my concerns in the comments above.

@kyle-winkelman kyle-winkelman mentioned this pull request Jan 31, 2019
0 of 2 tasks complete
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.