Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8191] Fixes potentially large number of tasks on Spark after Flatten.pCollections() #9544

Closed
wants to merge 1 commit into from

Conversation

@pbackx
Copy link

pbackx commented Sep 11, 2019

In the Spark runner (Beam 2.14.0 and 2.15.0), a Flatten.pCollections() PTransform is translated into a Spark union operation. This union will create a potentially large number of partitions in the RDD that can overload the driver.

This PR does a coalesce operation after the union, which will reduce the number of partitions in the RDD at virtually no cost.

See the JIRA ticket for more detail:
https://issues.apache.org/jira/browse/BEAM-8191


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@pbackx

This comment has been minimized.

Copy link
Author

pbackx commented Sep 11, 2019

@iemejia

This comment has been minimized.

Copy link
Member

iemejia commented Sep 24, 2019

Thanks for the contribution @pbackx. and sorry for late answer, @RyanSkraba can you please help me with the review.
R: @RyanSkraba

@@ -353,6 +353,11 @@ public void setName(String name) {
index++;
}
unionRDD = context.getSparkContext().union(rdds);

Partitioner partitioner = getPartitioner(context);

This comment has been minimized.

Copy link
@RyanSkraba

RyanSkraba Sep 24, 2019

Contributor

In principle, I don't see any problem with coalescing partitions to avoid the overhead of too many partitions -- it's certainly something that a developer would do when hand-coding spark jobs.

In this case, I'm not sure about your logic here -- as far as I can tell, this will always coalesce to the sc.defaultParallelism() (or not at all if bundleSize is set in the pipeline options).

As I understand, bundleSize is only relevant for sourceRDD, but the union causing the partition explosion can be anywhere in the DAG.

How about checking the number of partitions in the unionRDD against a threshold, and doing the coalesce if there are "too many" instead? Something like:

if (unionRdd.partitions.size > sc.defaultParallelism() * THRESHOLD) {
  coalesce(sc.defaultParallelism()  * 2) // or three according to the spark tuning recommendations.
}

Even setting the threshold to a large, but non-negligable fixed value like 5-10K might be enough.

What do you think?

This comment has been minimized.

Copy link
@pbackx

pbackx Sep 25, 2019

Author

Hi @RyanSkraba, thanks for reviewing.
The default parallelism is kind of arbitrary, I understand and I also wanted a better option.

I'm not sure about the threshold being that large. On our 150 machine cluster with 13 usable cores in each machine and 2 tasks per core, we have parallelism of 3900. Multiplying that by 5000 is exactly what is causing us troubles at the moment.

Maybe this threshold could be a Spark pipeline option? Something like "spark.max.parallelism"?

Regarding the note about multiplying by 2 or 3. I already take that into account when setting the default parallelism, so I don't think this is needed. This value could maybe be replaced by the max parallelism config option?

Are you guys open for adding an extra pipeline option? That could solve this problem.

  • If I like to keep the maximum parallelism at the default parallelism, I can set both values through the pipeline options.
  • The default value for the max parallelism would be positive infinity. In that case, the behavior is exactly the same as it is today.

Sounds like a good idea?

This comment has been minimized.

Copy link
@RyanSkraba

RyanSkraba Oct 17, 2019

Contributor

This does sound like a good idea! I linked your JIRA to https://issues.apache.org/jira/browse/BEAM-8384 . Before adding some a new pipeline option, it would be great if there were a better "overall" view of how the SparkRunner is managing parallelism.

This seems like it would be a good area to collaborate.

I apologize for forgetting to add this comment to your PR earlier :/

This comment has been minimized.

Copy link
@pbackx

pbackx Oct 24, 2019

Author

Due to changes in priorities, this has moved a bit down my list of todo's, hence the delay in getting back.

Yes, the way parallelism (and partitioning) works in the SparkRunner could use some improvements. The problem is that it is difficult to make this generic. In some parts of the pipeline you may want to have more parallelism, while in others you don't.

Having a maximum is just a first step. I think ideally, you want some way to supply the Spark partitioner yourself.

I do need to study Spark partitioning more before I feel confident in making a proper suggestion. But in the mean time, if you have ideas, I'm certainly not afraid of just trying things out.

@stale

This comment has been minimized.

Copy link

stale bot commented Dec 23, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Dec 23, 2019
@stale

This comment has been minimized.

Copy link

stale bot commented Dec 30, 2019

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Dec 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.