Skip to content
Permalink
Browse files

Revert "[SPARK-23195][SQL] Keep the Hint of Cached Data"

This reverts commit a23f6b1.
  • Loading branch information...
gatorsmile committed Jan 24, 2018
1 parent 9cfe90e commit d656be74b87746efc020d5cae3bfa294f8f98594
@@ -63,7 +63,7 @@ case class InMemoryRelation(
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
statsOfPlanToCache: Statistics)
statsOfPlanToCache: Statistics = null)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -77,7 +77,7 @@ case class InMemoryRelation(
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache
statsOfPlanToCache
} else {
Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints)
Statistics(sizeInBytes = batchStats.value.longValue)
}
}

@@ -139,22 +139,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
}
}

test("broadcast hint is retained in a cached plan") {
Seq(true, false).foreach { materialized =>
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
broadcast(df2).cache()
if (materialized) df2.collect()
val df3 = df1.join(df2, Seq("key"), "inner")
val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
case b: BroadcastHashJoinExec => b
}.size
assert(numBroadCastHashJoin === 1)
}
}
}

private def assertBroadcastJoin(df : Dataset[Row]) : Unit = {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val joined = df1.join(df, Seq("key"), "inner")

0 comments on commit d656be7

Please sign in to comment.
You can’t perform that action at this time.