Skip to content

Conversation

@tejasapatil
Copy link
Contributor

@tejasapatil tejasapatil commented Oct 24, 2016

What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-18067 for discussion. Putting out a PR to get some feedback about the approach.

Assume that there are two tables with columns key and value both hash partitioned over key. Assume these are the partitions for the children:

partitions child 1 child 2
partition 0 [0, 0, 0, 3] [0, 0, 3, 3]
partition 1 [1, 4, 4] [4]
partition 2 [2, 2] [2, 5, 5, 5]

Since we have all the same values of key in a given partition, we can evaluate other join predicates like (tableA.value = tableB.value) right there without needing any shuffle.

What is previously being done i.e. HashPartitioning(key, value) expects over rows with same value of pmod( hash(key, value)) to be in the same partition and does not take advantage of the fact that we already have rows with same key packed together.

This PR uses PartitioningCollection instead of HashPartitioning for expected partitioning.

Query:

val df = (0 until 16).map(i => (i, i * 2)).toDF("i", "j").coalesce(1)
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "i").sortBy("i").saveAsTable("tableA")
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "i").sortBy("i").saveAsTable("tableB")

hc.sql("SELECT * FROM tableA a JOIN tableB b ON a.i=b.i AND a.j=b.j").explain(true)

Before:

*SortMergeJoin [i#38, j#39], [i#40, j#41], Inner
:- *Sort [i#38 ASC NULLS FIRST, j#39 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#38, j#39, 200)
:     +- *Project [i#38, j#39]
:        +- *Filter (isnotnull(i#38) && isnotnull(j#39))
:           +- *FileScan orc default.tablea[i#38,j#39] Batched: false, Format: ORC, Location: ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark-2/spark/spark-warehouse/tablea], PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema: struct<i:int,j:int>
+- *Sort [i#40 ASC NULLS FIRST, j#41 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#40, j#41, 200)
      +- *Project [i#40, j#41]
         +- *Filter (isnotnull(i#40) && isnotnull(j#41))
            +- *FileScan orc default.tableb[i#40,j#41] Batched: false, Format: ORC, Location: ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark-2/spark/spark-warehouse/tableb], PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema: struct<i:int,j:int>

After:

== Physical Plan ==
*SortMergeJoin [i#38, j#39], [i#40, j#41], Inner
:- *Sort [i#38 ASC NULLS FIRST, j#39 ASC NULLS FIRST], false, 0
:  +- *Project [i#38, j#39]
:     +- *Filter (isnotnull(j#39) && isnotnull(i#38))
:        +- *FileScan orc default.tablea[i#38,j#39] Batched: false, Format: ORC, Location: ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/tablea], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(i)], ReadSchema: struct<i:int,j:int>
+- *Sort [i#40 ASC NULLS FIRST, j#41 ASC NULLS FIRST], false, 0
   +- *Project [i#40, j#41]
      +- *Filter (isnotnull(j#41) && isnotnull(i#40))
         +- *FileScan orc default.tableb[i#40,j#41] Batched: false, Format: ORC, Location: ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/tableb], PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(i)], ReadSchema: struct<i:int,j:int>

How was this patch tested?

WIP. I need to add tests for:

  • Check if the planner is not introducing extra Shuffle for such query
  • Check if the compatibility among PartitioningCollection and HashPartitioning makes sense.

@tejasapatil
Copy link
Contributor Author

tejasapatil commented Oct 24, 2016

cc @hvanhovell for feedback on the approach. I will polish and add tests if you are fine with this approach

@SparkQA
Copy link

SparkQA commented Oct 24, 2016

Test build #67431 has finished for PR 15605 at commit dac2f49.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

@tejasapatil this didn't work?

@tejasapatil
Copy link
Contributor Author

@hvanhovell : It got closed accidentally. There are test case failures that I have to still debug. Happy to hear any comments about the approach.

@tejasapatil tejasapatil reopened this Oct 24, 2016
@SparkQA
Copy link

SparkQA commented Oct 24, 2016

Test build #67460 has finished for PR 15605 at commit dac2f49.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil
Copy link
Contributor Author

Most of the tests are failing because ShuffleExchange currently does not handle PartitioningCollection. That can be fixed but as I looked at this more, I found a flaw in the current approach.

The objective of this PR is to make SortMergeJoin (SMJ) avoid shuffle in below scenarios:

  1. If every children's output is hash partitioned on all join predicates. This is already happening.
  2. If every children's output partitioning is hash partitioned on a given single join column.

Fixing that might need some change which I might changing some core behavior. I want to get feedback or better alternatives before jumping and putting out a change for review.

Lets look at createPartitioning(). ClusteredDistribution is mapped to HashPartitioning :

case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions)

HashPartitioning(i, j, k) does satisfy ClusteredDistribution(i, j, k). But so do each one of HashPartitioning(i), HashPartitioning(j) and HashPartitioning(k). So, one would want to have all the children of SMJ operator to satisfy any one of [HashPartitioning(i, j, k), HashPartitioning(i), HashPartitioning(j), HashPartitioning(k)].

PartitioningCollection provides a way to pack these several partitioning in a single place. But it does not track which exact partitioning is satisfied by a given child. eg. table_A can be partitioned on hash(i) and table_B can be partitioned on hash(j) OR hash(i,j) where i and j and join predicates. The current version of PR would skip shuffle over both children in such case which is wrong.

The fix would be to track which one of the partitioning in the PartitioningCollection is satisfied by each children and make sure its the same across all its children. Based on that, also change the outputPartitioning of the SMJ node during plan generation. So far from what I have seen, manipulating the partitioning / distribution for SparkPlan is not supported. Its probably for the good as it will mutate the properties of the plan tree (constants are good). If plan optimizations are done bottom-up, then this might not cause problems.

would like to hear opinions about this approach. If there are better alternatives, feel free to share.

@tejasapatil
Copy link
Contributor Author

@hvanhovell @cloud-fan @gatorsmile @yhuai : Any opinions about the approach described in my previous comment ?

@tejasapatil
Copy link
Contributor Author

This is superseded by #19054 Closing

@tejasapatil tejasapatil deleted the SPARK-18067_smb_join_pred_avoid_shuffle branch August 25, 2017 15:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants