Skip to content
Permalink
Browse files

[SPARK-27674][SQL] the hint should not be dropped after cache lookup

## What changes were proposed in this pull request?

This is a followup of #20365 .

#20365 fixed this problem when the hint node is a root node. This PR fixes this problem for all the cases.

## How was this patch tested?

a new test

Closes #24580 from cloud-fan/bug.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information...
cloud-fan authored and gatorsmile committed May 15, 2019
1 parent 02c3369 commit 3e30a988102e162f2702ae223312763a0bdc15eb
@@ -56,7 +56,7 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
* in this method will be cleaned up later by this rule, and may emit warnings depending on the
* configurations.
*/
private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
private[sql] def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
plan match {
case h: ResolvedHint =>
val (plan, hints) = extractHintsFromPlan(h.child)
@@ -23,7 +23,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
@@ -212,17 +213,18 @@ class CacheManager extends Logging {
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
case command: IgnoreCachedData => command
// Do not lookup the cache by hint node. Hint node is special, we should ignore it when
// canonicalizing plans, so that plans which are same except hint can hit the same cache.
// However, we also want to keep the hint info after cache lookup. Here we skip the hint
// node, so that the returned caching plan won't replace the hint node and drop the hint info
// from the original plan.
case hint: ResolvedHint => hint

case currentFragment =>
lookupCachedData(currentFragment)
.map(_.cachedRepresentation.withOutput(currentFragment.output))
.getOrElse(currentFragment)
lookupCachedData(currentFragment).map { cached =>
// After cache lookup, we should still keep the hints from the input plan.
val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2
val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
// The returned hint list is in top-down order, we should create the hint nodes from
// right to left.
hints.foldRight[LogicalPlan](cachedPlan) { case (hint, p) =>
ResolvedHint(p, hint)
}
}.getOrElse(currentFragment)
}

newPlan transformAllExpressions {
@@ -26,7 +26,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join}
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -938,23 +938,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
}

test("Cache should respect the broadcast hint") {
val df = broadcast(spark.range(1000)).cache()
val df2 = spark.range(1000).cache()
df.count()
df2.count()
test("Cache should respect the hint") {
def testHint(df: Dataset[_], expectedHint: JoinStrategyHint): Unit = {
val df2 = spark.range(2000).cache()
df2.count()

// Test the broadcast hint.
val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
val hint = joinPlan.collect {
case Join(_, _, _, _, hint) => hint
def checkHintExists(): Unit = {
// Test the broadcast hint.
val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
val joinHints = joinPlan.collect {
case Join(_, _, _, _, hint) => hint
}
assert(joinHints.size == 1)
assert(joinHints(0).leftHint.get.strategy.contains(expectedHint))
assert(joinHints(0).rightHint.isEmpty)
}

// Make sure the hint does exist when `df` is not cached.
checkHintExists()

df.cache()
try {
df.count()
// Make sure the hint still exists when `df` is cached.
checkHintExists()
} finally {
// Clean-up
df.unpersist()
}
}
assert(hint.size == 1)
assert(hint(0).leftHint.get.strategy.contains(BROADCAST))
assert(hint(0).rightHint.isEmpty)

// Clean-up
df.unpersist()
// The hint is the root node
testHint(broadcast(spark.range(1000)), BROADCAST)
// The hint is under subquery alias
testHint(broadcast(spark.range(1000)).as("df"), BROADCAST)
// The hint is under filter
testHint(broadcast(spark.range(1000)).filter($"id" > 100), BROADCAST)
// If there are 2 adjacent hints, the top one takes effect.
testHint(
spark.range(1000)
.hint("SHUFFLE_MERGE")
.hint("SHUFFLE_HASH")
.as("df"),
SHUFFLE_HASH)
}

test("analyzes column statistics in cached query") {

0 comments on commit 3e30a98

Please sign in to comment.
You can’t perform that action at this time.