Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista #1805

Open
mingmwang opened this issue Feb 10, 2022 · 9 comments
Labels
enhancement New feature or request

Comments

@mingmwang
Copy link
Contributor

mingmwang commented Feb 10, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

A new feature enhancement

Describe the solution you'd like

Current Ballista’s shuffle implementation is very similar to Spark’s early version. It’s the hash-based shuffle solution where shuffle data is materialized to disks, each map task produces separate files for each reduce task. For a shuffle operation involves M map tasks and N reduce task, it will generate M * N files. Too many tiny files will cause performance, memory and scalability issues. Later Spark version introduced the sort-based shuffle solution and became the default shuffle implementation. the sort-based shuffle will not generate M*N files, each map task sort the records by the partition id + key and generate a pair of files, all records are consolidate in one data file and an index file is created to manage the data range metadata for different partitions. In Spark 2.0, the hash-based shuffle code was removed. Spark also introduced external shuffle services to serve materialized intermediate shuffle data in order to achieve better fault-tolerance and performance isolation. In the recent Spark 3.2 release, it introduces a push based shuffle solution (SPARK-30602) to further improve the shuffle stability and IO performance. With spark’s push-based shuffle, shuffle is performed at the end of the map tasks and shuffle blocks are pre-merged and pushed to selected reducer nodes or upload to spark external shuffle servers.

Other distributed compute engines like Flink and Presto also support the shuffle operation. But they didn’t materialize the shuffle data to disks, instead, shuffle data is streamingly materialized into an in-memory buffer, the reduce tasks poll the shuffle data from map tasks’ in-memory buffer to minimize the end-to-end latency.

Here, we propose a new streaming style push-based shuffle solution for Ballista. Where shuffle is performed at the end of map tasks. Instead of materializing the intermediate shuffle data to disks and generate M*N files, shuffle data is directly pushed to the reduce tasks via Arrow-Flight gRpc call to achieve very low latency. This is important for low latency queries. The corresponding Stage scheduling will be enhanced to support the All-at-Once scheduling. With all-at-once scheduling, all the stages of a SQL/Job will be scheduled at almost the same time. The distributed DAG of the query is fixed at the beginning, so that the map tasks can streamingly push the shuffle data to downstream reduce tasks.

I will draft a detailed design doc to cover the proposed API changes later.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@mingmwang mingmwang added the enhancement New feature or request label Feb 10, 2022
@mingmwang mingmwang changed the title Streaming style push-based shuffle and All-at-once stage scheduling in Ballista [Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista Feb 10, 2022
@mingmwang
Copy link
Contributor Author

Add a design doc for further discussion.

https://docs.google.com/document/d/17J9H6gGBVktmRAFYNQu-v52QUUPlghRnVLIZC3mFYFY/edit?usp=sharing

@xudong963
Copy link
Member

Hi, @mingmwang , nice work!

You can open an RFC ticket if you are willing to, then we can discuss based on github. After we get the final version, we can merge it and save the RFC in our codebase. FYI, https://github.com/apache/arrow-datafusion/tree/master/docs/source/specification/rfcs

@mingmwang
Copy link
Contributor Author

Hi, @mingmwang , nice work!

You can open an RFC ticket if you are willing to, then we can discuss based on github. After we get the final version, we can merge it and save the RFC in our codebase. FYI, https://github.com/apache/arrow-datafusion/tree/master/docs/source/specification/rfcs

Can we discuss in the google doc or in this thread directly ? Everyone can comment on the google doc.
I will open a PR to cover all the related code changes so that everyone who interest can take a look and give me advice.

@xudong963
Copy link
Member

Can we discuss in the google doc or in this thread directly ? Everyone can comment on the google doc.

Both are ok, just a suggestion.

I will open a PR to cover all the related code changes so that everyone who interest can take a look and give me advice.

👍

@thinkharderdev
Copy link
Contributor

This sounds great!

@houqp
Copy link
Member

houqp commented Feb 21, 2022

The design looks good to me, thanks for writing it up @mingmwang , left a minor question in the doc.

@EricJoy2048
Copy link
Member

It's great!

@heroWang
Copy link

heroWang commented Jul 6, 2023

Any progress on this new feature?

@mingmwang
Copy link
Contributor Author

@heroWang
I will start work on this maybe next month. Recently I'm busy with other stuff and do not get any bandwidth on DataFusion and Ballista

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
6 participants