Skip to content

Conversation

@jhalaria
Copy link

@jhalaria jhalaria commented Mar 12, 2019


The PR makes the CBK consistent with GBK where we convert the keys to a ByteArray before calling group/combine functions.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@jhalaria jhalaria force-pushed the SparkHashPartitioner branch from b0f74d0 to 86e9bad Compare March 12, 2019 22:07
@jhalaria
Copy link
Author

jhalaria commented Mar 12, 2019

@iemejia - Please review.

@jhalaria jhalaria force-pushed the SparkHashPartitioner branch 5 times, most recently from 9094205 to a5dcf2c Compare March 14, 2019 18:48
return (bundleSize > 0)
? null
: new HashPartitioner(context.getSparkContext().defaultParallelism());
? CustomSparkHashPartitioner.of(context.getSparkContext().defaultParallelism())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be accidentally flipped?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, this logic is odd, I don't think EvalutionContext is enough to properly resolve paralellism for the shuffle operation

In case bundleSize > 0, we should reuse paralellism of the input RDD. This way we would get rid of nullable case and we can simplify GBK and CPK translations.

Spark's default partitioner does the same thing: org.apache.spark.Partitioner#defaultPartitioner

@jhalaria jhalaria force-pushed the SparkHashPartitioner branch from a5dcf2c to 9254352 Compare March 14, 2019 19:22
@jhalaria
Copy link
Author

@iemejia : Please take as lot this when you get a chance. Thanks.

@iemejia
Copy link
Member

iemejia commented Mar 20, 2019

Sorry for the delay, have not forgotten about this one, just being swamped in other PRs. @dmvk could you help me review this one please.

@iemejia iemejia requested a review from dmvk March 20, 2019 14:34
@dmvk
Copy link
Member

dmvk commented Mar 20, 2019

Run Portable_Python PreCommit

@dmvk
Copy link
Member

dmvk commented Mar 20, 2019

Nice catch! This problem goes way deeper, we should not ever shuffle raw user data in the first place (we should always use beam coder to serialize data, before partitioning even happens). If you take a look at GBK implementation, it uses ByteArray instead of byte[] (which has correct hashCode implementation).

I guess tests did not catch this because they use primitive types (which spark can serialize on its own) as keys. Also tests are run in a single JVM, therefore hashCode is stable.

Correct fix would be:

  • Changing CombinePerKey implementation to convert keys to ByteArray prior calling combineByKey
  • In case we want to use custom partitioner, we should always check on the first getPartition call, whether hashCode == systemIdentityHashCode and throw an exception if it does; or make sure that we shuflle byte[] only (which may be less expensive than ByteArray object)

Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great start! It would be awesome if we could cover all cases by properly using Beam's coders for all shuffle operations as expected.

Thanks you for the contribution! ;)

@jhalaria jhalaria force-pushed the SparkHashPartitioner branch from 9254352 to ace220a Compare March 20, 2019 19:00
@jhalaria
Copy link
Author

@dmvk - Thank you for looking at the PR. I removed the custom partitioner logic for now. I made the CPK similar to GBK where we convert the key to a ByteArray. Will create another PR that provides the ability to add a custom partitioner.

@jhalaria jhalaria changed the title [BEAM-6812]: Create a custom hash partitioner that deals with arrays during spark combines [BEAM-6812]: Convert keys to ByteArray in Combine.perKey to make sure hashCode is consistent Mar 20, 2019
Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rather keep the getPartition mechanism the way it was for now. Otherwise it is good to merge! Thanks ;)

? null
: new HashPartitioner(context.getSparkContext().defaultParallelism());
? new HashPartitioner(context.getSparkContext().defaultParallelism())
: null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second thoughts, it was correct.

https://github.com/apache/beam/pull/6884/files#r246077919

This was in order to maintain old functionality (bundleSize == 0, which basically means to use predefined parallelism).

I think the old functionality doesn't make much sense as it doesn't scale with input data. I guess someone may use this in order to re-scale "downstream" stage, but there should be a better mechanism to do this.

Any thoughts? @timrobertson100 @kyle-winkelman

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I reverted back the changes made to getPartition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the old functionality seems strange, but I remember (when I had the logic backwards) that the performance tests for the spark runner were impacted. I think the impact was in streaming mode because if you don't use this HashPartitioner then it actually does a double shuffle of the data. I tried to clean this up but my I never finished getting PR #6511 merged.

@jhalaria jhalaria force-pushed the SparkHashPartitioner branch from ace220a to 18820bb Compare March 20, 2019 19:48
@dmvk
Copy link
Member

dmvk commented Mar 20, 2019

Run Spark ValidatesRunner

@dmvk dmvk merged commit 32bc6da into apache:master Mar 20, 2019
@jhalaria jhalaria deleted the SparkHashPartitioner branch March 21, 2019 16:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants