Skip to content
Permalink
Browse files

[SPARK-23192][SQL] Keep the Hint after Using Cached Data

## What changes were proposed in this pull request?

The hint of the plan segment is lost, if the plan segment is replaced by the cached data.

```Scala
      val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
      val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
      df2.cache()
      val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
```

This PR is to fix it.

## How was this patch tested?
Added a test

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20365 from gatorsmile/fixBroadcastHintloss.

(cherry picked from commit 613c290)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information...
gatorsmile committed Jan 23, 2018
1 parent f8f522c commit 851c303867eb54405f6508919619debe84708933
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.storage.StorageLevel
@@ -170,9 +170,13 @@ class CacheManager extends Logging {
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
case currentFragment =>
lookupCachedData(currentFragment)
.map(_.cachedRepresentation.withOutput(currentFragment.output))
.getOrElse(currentFragment)
lookupCachedData(currentFragment).map { cached =>
val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
currentFragment match {
case hint: ResolvedHint => ResolvedHint(cachedPlan, hint.hints)
case _ => cachedPlan
}
}.getOrElse(currentFragment)
}

newPlan transformAllExpressions {
@@ -109,6 +109,19 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
}
}

test("broadcast hint is retained after using the cached data") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
df2.cache()
val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
case b: BroadcastHashJoinExec => b
}.size
assert(numBroadCastHashJoin === 1)
}
}

test("broadcast hint isn't propagated after a join") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")

0 comments on commit 851c303

Please sign in to comment.
You can’t perform that action at this time.