Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support broadcast exchange #342

Open
Dandandan opened this issue Oct 11, 2022 · 6 comments
Open

Support broadcast exchange #342

Dandandan opened this issue Oct 11, 2022 · 6 comments
Labels
enhancement New feature or request performance

Comments

@Dandandan
Copy link
Contributor

Dandandan commented Oct 11, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Broadcasting partitions helps for when joins on the build side are small. In that case we can transform partitioned joins to broadcast joins.

Describe the solution you'd like
We should support broadcasts in the physical plan.

Broadcasting means copying the entire dataset to each worker.

This could be used in broadcast joins, i.e. by broadcasting smaller dataframes to every worker, which can provide big speedups as the other (big) side doesn't have to be shuffled.

Describe alternatives you've considered

Additional context

Probably we can reuse some heuristics from Spark for conditions when to perform broadcasting for joins.

@Dandandan Dandandan added the enhancement New feature or request label Oct 11, 2022
@Dandandan Dandandan changed the title Support broadcast (join/etc.) Support broadcast (join) Oct 11, 2022
@Dandandan Dandandan changed the title Support broadcast (join) Support broadcast exchange (and broadcast join) Oct 12, 2022
@mingmwang
Copy link
Contributor

mingmwang commented Oct 12, 2022

One quick question regarding this, after those dataset are copied to each executor, should they kept in-memory or spilled to disk, if keep them in memory for a while, memory usage might be a concern.

@Dandandan
Copy link
Contributor Author

@mingmwang I think for broadcasting exchange the same thing applies as normal exchanges, they are spilled to disk by default and might be maintained in memory if memory budget allows. I believe Ballista doesn't support the latter yet(?). A limit may be chosen, like 100MB, so it will fit in memory most often, but even when written to disk for joins it might give impressive speedups as the other side of the join could be way larger.

@mingmwang
Copy link
Contributor

@Dandandan Sounds nice.
There are couple of things need to do to support the broadcasting exchange.

  1. Rpc protocols to efficiently do broadcasting, something similar to Spark's TorrentBroadcast
  2. Executor memory management as you just mentioned.
  3. Broadcast lifecycle management, after SQL finish, those broadcasts should be cleaned.
  4. Planner need more accurate stats to choose broadcast join or partitioned hash join or even SortMergeJoin,
    build side selection etc. Need to enhance the stats collection.
  5. Broadcast exchange reuse within SQL?
  6. Broadcast reuse between SQLs ?

@mingmwang
Copy link
Contributor

Today, For partitioned hash join, DataFusion already support CollectLeft model, I think it is similar to the Broadcast HashJoin. I do not get a chance to test it on Ballista yet, but I think it should work in the distribution model. The downside is
the Left side might cause lots of duplicate re-computations.


  fn required_input_distribution(&self) -> Vec<Distribution> {
        match self.mode {
            PartitionMode::CollectLeft => vec![
                Distribution::SinglePartition,
                Distribution::UnspecifiedDistribution,
            ],
            PartitionMode::Partitioned => {
                let (left_expr, right_expr) = self
                    .on
                    .iter()
                    .map(|(l, r)| {
                        (
                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
                        )
                    })
                    .unzip();
                vec![
                    Distribution::HashPartitioned(left_expr),
                    Distribution::HashPartitioned(right_expr),
                ]
            }
        }
    }

@Dandandan
Copy link
Contributor Author

That's a good observation @mingmwang !
The difference with CollectLeft is that that mode collects the left side to one partition, whereas with broadcast we would broadcast the output of the left side to each worker.

Indeed, I think the trade off is that doing a bit more on the left side (i.e. building the hash table in each worker) we save the work on the right side (shuffle).

@Dandandan Dandandan changed the title Support broadcast exchange (and broadcast join) Support broadcast exchange Oct 14, 2022
@Dandandan
Copy link
Contributor Author

I added some details for implementing a broadcast join optimization rule here: #348 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance
Projects
None yet
Development

No branches or pull requests

2 participants