Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13136][SQL] Create a dedicated Broadcast exchange operator #11083

Closed
wants to merge 26 commits into from

Conversation

hvanhovell
Copy link
Contributor

Quite a few Spark SQL join operators broadcast one side of the join to all nodes. The are a few problems with this:

  • This conflates broadcasting (a data exchange) with joining. Data exchanges should be managed by a different operator.
  • All these nodes implement their own (duplicate) broadcasting logic.
  • Re-use of indices is quite hard.

This PR defines both a BroadcastDistribution and BroadcastPartitioning, these contain a BroadcastMode. The BroadcastMode defines the way in which we transform the Array of InternalRow's into an index. We currently support the following BroadcastMode's:

  • IdentityBroadcastMode: This broadcasts the rows in their original form.
  • HashSetBroadcastMode: This applies a projection to the input rows, deduplicates these rows and broadcasts the resulting Set.
  • HashedRelationBroadcastMode: This transforms the input rows into a HashedRelation, and broadcasts this index.

To match this distribution we implement a BroadcastExchange operator which will perform the broadcast for us, and have EnsureRequirements plan this operator. The old Exchange operator has been renamed into ShuffleExchange in order to clearly separate between Shuffled and Broadcasted exchanges. Finally the classes in Exchange.scala have been moved to a dedicated package.

cc @rxin @davies

@SparkQA
Copy link

SparkQA commented Feb 4, 2016

Test build #50775 has finished for PR 11083 at commit c2b7533.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Broadcast(

# Conflicts:
#	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@hvanhovell
Copy link
Contributor Author

Retest this please

@SparkQA
Copy link

SparkQA commented Feb 6, 2016

Test build #50879 has finished for PR 11083 at commit 02a61b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 6, 2016

Test build #50881 has finished for PR 11083 at commit 02a61b8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

Retest this please

@SparkQA
Copy link

SparkQA commented Feb 7, 2016

Test build #50883 has finished for PR 11083 at commit 02a61b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell hvanhovell changed the title [SPARK-13136][SQL] Create a dedicated Broadcast exchange operator [WIP] [SPARK-13136][SQL] Create a dedicated Broadcast exchange operator Feb 7, 2016
@hvanhovell
Copy link
Contributor Author

This one is ready for review.

@SparkQA
Copy link

SparkQA commented Feb 7, 2016

Test build #50897 has finished for PR 11083 at commit c12c8e6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 7, 2016

Test build #50898 has finished for PR 11083 at commit c12c8e6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 7, 2016

Test build #50900 has finished for PR 11083 at commit c12c8e6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

retest this please

case class Broadcast(
f: Iterable[InternalRow] => Any,
child: SparkPlan)
extends UnaryNode with CodegenSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we do include this in generated code of BroadcastHashJoin, I think it's better to not implement CodegenSupport, then we don't need the special case in CollapseCodegenStages

@SparkQA
Copy link

SparkQA commented Feb 8, 2016

Test build #50928 has finished for PR 11083 at commit c12c8e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 8, 2016

Test build #50942 has finished for PR 11083 at commit c7dd7ae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Broadcast(f: Iterable[InternalRow] => Any, child: SparkPlan) extends UnaryNode

# Conflicts:
#	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@SparkQA
Copy link

SparkQA commented Feb 10, 2016

Test build #51039 has finished for PR 11083 at commit e847383.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AssertNotNull(child: Expression, walkedTypePath: Seq[String])
    • case class ReturnAnswer(child: LogicalPlan) extends UnaryNode
    • public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<InternalRow>
    • case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode
    • trait BaseLimit extends UnaryNode
    • case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit
    • case class GlobalLimit(limit: Int, child: SparkPlan) extends BaseLimit
    • case class TakeOrderedAndProject(
    • class FileStreamSource(
    • trait HadoopFsRelationProvider extends StreamSourceProvider

@rxin
Copy link
Contributor

rxin commented Feb 11, 2016

@yhuai if you have some time this wk, can you review this?

* Represents data where tuples are broadcasted to every node. It is quite common that the
* entire set of tuples is transformed into different data structure.
*/
case class BroadcastDistribution(f: Iterable[InternalRow] => Any = identity) extends Distribution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm thinking maybe it's better to just declare that we want a hashed broadcast distribution, and then don't take a closure. The reason it is bad to take a closure is that this won't work if we want to whole-stage codegen the building of the hash table, or if we want to change the internal engine to a push-based model.

@hvanhovell
Copy link
Contributor Author

Retest this please

@SparkQA
Copy link

SparkQA commented Feb 17, 2016

Test build #51418 has finished for PR 11083 at commit c7429bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

# Conflicts:
#	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@SparkQA
Copy link

SparkQA commented Feb 20, 2016

Test build #51596 has finished for PR 11083 at commit b12bbc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -29,22 +29,20 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
* for hash join.
*/
case class LeftSemiJoinBNL(
streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression])
left: SparkPlan, right: SparkPlan, condition: Option[Expression])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you do this change (streamed -> left, broadcast -> right)? this makes the variable name more confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll revert that.

@rxin
Copy link
Contributor

rxin commented Feb 20, 2016

I'm going to review this more carefully tonight.

@rxin
Copy link
Contributor

rxin commented Feb 20, 2016

@hvanhovell when you get a chance, please update the description if it merits any change.

@SparkQA
Copy link

SparkQA commented Feb 21, 2016

Test build #51605 has finished for PR 11083 at commit 54b558d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
* identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
*/
trait BroadcastMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this and IdentityBroadcastMode into a new file.

@rxin
Copy link
Contributor

rxin commented Feb 21, 2016

This looks pretty good actually.

@hvanhovell
Copy link
Contributor Author

@rxin I agree that this is stretching the definitions of both Distribution and Partitioning. We should be able to define the form/shape in which a child node delivers it data to the current node. This would also allow us to pass ColumnarBatches, or could even be used to specify the row type passed. I have created https://issues.apache.org/jira/browse/SPARK-13421 to track this.

@SparkQA
Copy link

SparkQA commented Feb 21, 2016

Test build #51635 has finished for PR 11083 at commit 4b5978b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait BroadcastMode

@SparkQA
Copy link

SparkQA commented Feb 21, 2016

Test build #51637 has finished for PR 11083 at commit c8c175e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Feb 21, 2016

Thanks. I'm going to merge this.

@asfgit asfgit closed this in b6a873d Feb 21, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants