Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9434] Improve Spark runner reshuffle translation to maximize parallelism #11037

Merged
merged 1 commit into from
Apr 7, 2020
Merged

[BEAM-9434] Improve Spark runner reshuffle translation to maximize parallelism #11037

merged 1 commit into from
Apr 7, 2020

Conversation

ecapoccia
Copy link
Contributor

The implemented solution consists in an extension of the FileSystem classes for S3, that allows filtering of the matched objects. The filter consists in a mod hash function of the filename.

This mechanism is exploited by a parallel partitioned read of the files in the bucket, realised in the AvroIO and FileIO classes by means of the new hint . withHintPartitionInto(int)

The net effect is that a controlled number of tasks is spawn to different executors, each having a different partition number; each executor reads the entire list of files in the bucket, but matches only those files whose modhash matches the partition number.

In this way, all the executors in the cluster read in parallel.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@ecapoccia
Copy link
Contributor Author

@iemejia @andeb please review and let me have your comments. I have more evidence of the tests that I've been carried out, and I'm happy with the performance gains. However, I'm keen to understand if the approach is sound. I look forward to hearing from you

@andeb
Copy link
Contributor

andeb commented Mar 4, 2020

Awesome work, it looks good to me

@ecapoccia
Copy link
Contributor Author

R: @lukecwik do you mind having a look and giving me feedback on this PR? Thanks I look forward to hearing from you

@lukecwik
Copy link
Member

lukecwik commented Mar 6, 2020

Adding a filesystem API that all filesystems need to implement is going to raise some questions in the Apache Beam community. Asked some follow-ups on BEAM-9434 to see if the issue is localized to Spark.

@ecapoccia
Copy link
Contributor Author

Adding a filesystem API that all filesystems need to implement is going to raise some questions in the Apache Beam community. Asked some follow-ups on BEAM-9434 to see if the issue is localized to Spark.

@lukecwik Ok I see.

Technically speaking, it is not mandatory to implement in every filesystem, but effectively in order to properly support the hint in every filesystem it is.

I considered a few alternatives:

  • the current one, throwing an UnsupportedOperationException if a filesystem does not support it
  • a default implementation that does a wasteful filtering before returning the results (not scalable)
  • implementing it for all filesystem

The reality is, I haven't got a mean of testing the last option on anything else than S3, otherwise the last option is the best approach imho.

Let me know what are the opinions.

Also, looks like to me that the filesystems classes are internal to the framework, not supposed to be used directly by end users. In which case maybe another option is viable, which is renaming appropriately the new hint, and don't make it mandatory by means of the framework to consider the hint.

In other words, I'm saying that we can hint to use N partitions, but the runtime can just ignore the hint if that's not supported by the underlying filesystem.
I can modify the code in this way if that's viable.

Happy to hear back from you guys, and thanks for the feedback.

@lukecwik
Copy link
Member

lukecwik commented Mar 6, 2020

The expansion for withHintManyFiles uses a reshuffle between the match and the actual reading of the file. The reshuffle allows for the runner to balance the amount of work across as many nodes as it wants. The only thing being reshuffled is file metadata so after that reshuffle the file reading should be distributed to several nodes.

In your reference run, when you say that "the entire reading taking place in a single task/node", was it that the match all happened on a single node or was it that the "read" happened all on a single node?

@ecapoccia
Copy link
Contributor Author

ecapoccia commented Mar 7, 2020

The expansion for withHintManyFiles uses a reshuffle between the match and the actual reading of the file. The reshuffle allows for the runner to balance the amount of work across as many nodes as it wants. The only thing being reshuffled is file metadata so after that reshuffle the file reading should be distributed to several nodes.

In your reference run, when you say that "the entire reading taking place in a single task/node", was it that the match all happened on a single node or was it that the "read" happened all on a single node?

Both.
Reading the metadata wouldn't be a problem, it also happens on every node in the proposed PR.
But the actual reading also happens on one node with unacceptably high reading times.
Maybe what you say applies possibly to the case of "bulky" files.
However, my solution particularly applies to the case where there is a high number of tiny files (I think I explained better in the Jira ticket).
In this latter case, the latency of reading each file from S3 dominates, but no chunking / shuffling happens with the standard Beam.
When I look at the DAG in Spark, I can see only one task there, and if I look at the executors they are all idle, spare the one where all the readings happen.
This is true for both the stage where you read the metadata, and for the stage where you read the data.
With the proposed PR instead the number of tasks and parallel executors in the DAG is the one that you pass in the hint.

@ecapoccia
Copy link
Contributor Author

ecapoccia commented Mar 7, 2020

Screenshot 2020-02-28 at 12 06 47

Screenshot 2020-02-28 at 11 12 03

This is the fundamental difference between the base and the current PR. Notice, in the base case there's only 2 tasks (the entire job is a join of two independent readings) whereas when using 10 partitions there are 20 tasks for doing the same work (the image is a detail of one of the two independent readings, showing its 10 parallel partitions).

This is reflected in the executors being used and in the time to complete (16 minutes with 2 tasks, 2.3 minutes with 20).
See below the comparison of execution data.

Screenshot 2020-03-07 at 10 33 25

Screenshot 2020-02-28 at 11 15 12

@lukecwik
Copy link
Member

lukecwik commented Mar 9, 2020

I understand what your saying and how your solution resolves the problem.

I'm trying to say that the problem shouldn't occur in the first place because AvroIO.read should expand to MatchAll that contains a Reshuffle followed by ReadMatches

Reshuffle should ensure that there is a repartition between MatchAll and ReadMatches, is it missing (it is difficult to tell from your screenshots)? If it isn't missing, they why is the following stage only executing on a single machine (since repartition shouldn't be restricting output to only a single machine)?

@ecapoccia
Copy link
Contributor Author

Unfortunately the problem happens for me, that is why this work started. Let's see if we can understand the root cause for it.

Reshuffle should ensure that there is a repartition between MatchAll and ReadMatches, is it missing (it is difficult to tell from your screenshots)? If it isn't missing, they why is the following stage only executing on a single machine (since repartition shouldn't be restricting output to only a single machine)

It's clearly not missing as in the base case I'm using withHintMatchesManyFiles().
Still what happens is that the entire reading is on one machine (see second last screenshot "summary metrics for 2 completed tasks"). The impression I have is that when the physical plan is created, there is only one task detected that is bound to do the entire reading on one executor. Consider that, I am doing something really plain, just reading from two buckets, joining the records and writing them back to S3. Did you try this yourself to see if you can reproduce the issue?

I had a look at the code of Reshuffle.expand() and Reshuffle.ViaRandomKey, but I have some doubts on what is the expected behaviour in terms of machines / partitions.

How many different partitions shall Reshuffle create? Will there be 1 task per partition? and how are the tasks ultimately assigned to the executors?
Maybe you can help me understand the above / point me to the relevant documentation. That should hopefully help me troubleshoot this.

@ecapoccia
Copy link
Contributor Author

Any thoughts on the above @lukecwik @iemejia ? I did not hear back from you in a couple of days.

@aaltay aaltay requested review from iemejia and lukecwik March 27, 2020 02:22
@lukecwik
Copy link
Member

Sorry about the long delay but Reshuffle should produce as many partitions as the runner thinks is optimal. It is effectively a redistribute operation.

It looks like the spark translation is copying the number of partitions from the upstream transform for the reshuffle translation and in your case this is likely 1.
Translation:

private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() {

Copying partitions:

@iemejia Shouldn't we be using a much larger value for partitions, e.g. the number of nodes?

@ecapoccia
Copy link
Contributor Author

Sorry about the long delay

My time to apology @lukecwik. I was busy on something else as well as trying to settling in this new life of working from home due to the virus

It looks like the spark translation is copying the number of partitions from the upstream transform for the reshuffle translation and in your case this is likely 1

Gotcha. And yes, I can confirm that the value for the partitions is 1, not only in my case. Fact of the matter, the number of partitions is calculated (in a bizarre way) only for the root RDD (Create), containing only the pattern for the s3 files -- a string like s3://my-bucket-name/*.avro. From that moment onwards it is copied all the way through. So with one pattern is always one.
This confirms my initial impression when I wrote:

The impression I have is that when the physical plan is created, there is only one task detected that is bound to do the entire reading on one executor

I have changed the PR, reverting the original one and now - after your analysis - I am setting the number of the partitions in the reshuffle transform translator.
I am using the value of the default parallelism for Spark, already available in the Spark configuration options for Beam.
So essentially with this PR the Spark configuration:
--conf spark.default.parallelism=10 is the replacement for the hint I wrote initially.

I have tested this PR with the same configuration as the initial one, and the performance is identical. I can now see all the executors and nodes processing a partition of the read, as one expects. I also did a back-to-back run with the vanilla Beam and I can confirm the problem is still there.

I deem this implementation is superior to the first one. Let me have your opinions on it. Also paging in @iemejia

I have seen the previous build failing, I think the failing tests were unrelated to the changes; keen to see a new build with these code changes.

@lukecwik
Copy link
Member

lukecwik commented Apr 6, 2020

Thanks for the follow-up. I also have been adjusting to the new WFH lifestyle.

Comment on lines 698 to 699
int numPartitions =
Math.max(context.getSparkContext().defaultParallelism(), inRDD.getNumPartitions());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should do the same thing in the streaming and batch portable pipeline translators as well.

Copy link
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @ecapoccia for the investigation and fix! Can you please do the suggested changes, squash your commits into a single commit with title [BEAM-9434] Improve Spark runner reshuffle translation to maximize parallelism and do a git pull origin master --rebase to rebase against latest master.
I will be glad to merge once ready and tests green.

@@ -184,11 +184,11 @@

/** An implementation of {@link Reshuffle} for the Spark runner. */
public static <T> JavaRDD<WindowedValue<T>> reshuffle(
JavaRDD<WindowedValue<T>> rdd, WindowedValueCoder<T> wvCoder) {
JavaRDD<WindowedValue<T>> rdd, WindowedValueCoder<T> wvCoder, int numPartitions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please let this method how it was before and better, and do int numPartitions = Math.max(rdd.context().defaultParallelism(), rdd.getNumPartitions()); inside of it. That way you don't need to replicate the change in the different translators..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iemejia @lukecwik ok will do the changes. The only reason why I opted to change only the batch pipeline for Spark is that it is the only one I am in a position to thoroughly test for real. However, analysing the code suggests that changing all pipelines does not harm, so I'll go for the suggested changes.
Will submit the squashed PR soon. In the meantime, thanks for the review.

@iemejia
Copy link
Member

iemejia commented Apr 6, 2020

Run Spark ValidatesRunner

@iemejia
Copy link
Member

iemejia commented Apr 6, 2020

Run Spark Runner Nexmark Tests

@iemejia
Copy link
Member

iemejia commented Apr 6, 2020

Run Java Spark PortableValidatesRunner Batch

@iemejia
Copy link
Member

iemejia commented Apr 6, 2020

Run Python Spark ValidatesRunner

@iemejia
Copy link
Member

iemejia commented Apr 6, 2020

Run Go Spark ValidatesRunner

@iemejia iemejia changed the title [BEAM-9434] performance improvements reading many Avro files in S3 [BEAM-9434] Improve Spark runner reshuffle translation to maximize parallelism Apr 6, 2020
@ecapoccia ecapoccia requested a review from iemejia April 6, 2020 23:46
Copy link
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@iemejia iemejia merged commit 5e6ba4f into apache:master Apr 7, 2020
@iemejia
Copy link
Member

iemejia commented Apr 7, 2020

Thanks @ecapoccia nice work!

@ecapoccia
Copy link
Contributor Author

Thanks @ecapoccia nice work!

Thank you guys @iemejia @lukecwik it was good for me to learn a bit about the internals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants