Skip to content

Commit

Permalink
[SPARK-35332][SQL] Make cache plan disable configs configurable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add a new config to make cache plan disable configs configurable.

### Why are the changes needed?

The disable configs of cache plan if to avoid the perfermance regression, but not all the query will slow than before due to AQE or bucket scan enabled. It's useful to make a new config so that user can decide if some configs should be disabled during cache plan.

### Does this PR introduce _any_ user-facing change?

Yes, a new config.

### How was this patch tested?

Add test.

Closes #32482 from ulysses-you/SPARK-35332.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ulysses-you authored and cloud-fan committed May 13, 2021
1 parent 02c99f1 commit 6f63057
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
Expand Up @@ -1090,6 +1090,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING =
buildConf("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning")
.internal()
.doc("Whether to forcibly enable some optimization rules that can change the output " +
"partitioning of a cached query when executing it for caching. If it is set to true, " +
"queries may need an extra shuffle to read the cached data. This configuration is " +
"disabled by default. Currently, the optimization rules enabled by this configuration " +
s"are ${ADAPTIVE_EXECUTION_ENABLED.key} and ${AUTO_BUCKETED_SCAN_ENABLED.key}.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down
Expand Up @@ -114,8 +114,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff(
spark, forceDisableConfigs)
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val inMemoryRelation = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache)
InMemoryRelation(
Expand Down Expand Up @@ -223,8 +222,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
}
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff(
spark, forceDisableConfigs)
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val newCache = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(cd.plan)
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
Expand Down Expand Up @@ -328,4 +326,15 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (needToRefresh) fileIndex.refresh()
needToRefresh
}

/**
* If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is disabled, just return original session.
*/
private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = {
if (session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
session
} else {
SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs)
}
}
}
Expand Up @@ -1176,7 +1176,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}

test("cache supports for intervals") {
withTable("interval_cache") {
withTable("interval_cache", "t1") {
Seq((1, "1 second"), (2, "2 seconds"), (2, null))
.toDF("k", "v").write.saveAsTable("interval_cache")
sql("CACHE TABLE t1 AS SELECT k, cast(v as interval) FROM interval_cache")
Expand Down Expand Up @@ -1569,4 +1569,65 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(!spark.catalog.isCached(viewName))
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check AQE") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {

withTempView("t1", "t2", "t3") {
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)")
assert(spark.table("t1").rdd.partitions.length == 2)
}

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)")
assert(spark.table("t2").rdd.partitions.length == 1)
}

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)")
assert(spark.table("t3").rdd.partitions.length == 2)
}
}
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check bucket scan") {
withTable("t1", "t2", "t3") {
Seq(1, 2, 3).foreach { i =>
spark.range(1, 2)
.write
.format("parquet")
.bucketBy(2, "id")
.saveAsTable(s"t$i")
}

withCache("t1", "t2", "t3") {
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true",
SQLConf.FILES_MIN_PARTITION_NUM.key -> "1",
SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
sql("CACHE TABLE t1")
assert(spark.table("t1").rdd.partitions.length == 2)

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2")
assert(spark.table("t2").rdd.partitions.length == 1)
}

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3")
assert(spark.table("t3").rdd.partitions.length == 2)
}
}
}
}
}
}

0 comments on commit 6f63057

Please sign in to comment.