Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed May 10, 2021
1 parent 5b65d8a commit 7625677
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val CACHE_DISABLE_CONFIGS_ENABLED =
buildConf("spark.sql.cache.disableConfigs.enabled")
.internal()
.doc(s"When true, some configs are disabled during executing cache plan that is to avoid " +
s"performance regression if other queries hit the cached plan. Currently, the disabled " +
s"configs include: ${ADAPTIVE_EXECUTION_ENABLED.key} and " +
s"${AUTO_BUCKETED_SCAN_ENABLED.key}.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down
23 changes: 14 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1069,21 +1069,26 @@ object SparkSession extends Logging {
}

/**
* Returns a cloned SparkSession with all specified configurations disabled, or
* the original SparkSession if all configurations are already disabled.
* When CACHE_DISABLE_CONFIGS_ENABLED is enabled, returns a cloned SparkSession with all
* specified configurations disabled, or the original SparkSession if all configurations
* are already disabled.
*/
private[sql] def getOrCloneSessionWithConfigsOff(
session: SparkSession,
configurations: Seq[ConfigEntry[Boolean]]): SparkSession = {
val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_))
if (configsEnabled.isEmpty) {
if (!session.sessionState.conf.getConf(SQLConf.CACHE_DISABLE_CONFIGS_ENABLED)) {
session
} else {
val newSession = session.cloneSession()
configsEnabled.foreach(conf => {
newSession.sessionState.conf.setConf(conf, false)
})
newSession
val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_))
if (configsEnabled.isEmpty) {
session
} else {
val newSession = session.cloneSession()
configsEnabled.foreach(conf => {
newSession.sessionState.conf.setConf(conf, false)
})
newSession
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}

test("cache supports for intervals") {
withTable("interval_cache") {
withTable("interval_cache", "t1") {
Seq((1, "1 second"), (2, "2 seconds"), (2, null))
.toDF("k", "v").write.saveAsTable("interval_cache")
sql("CACHE TABLE t1 AS SELECT k, cast(v as interval) FROM interval_cache")
Expand Down Expand Up @@ -1554,4 +1554,66 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(!spark.catalog.isCached(viewName))
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check AQE") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {

withTempView("t1", "t2", "t3") {
withCache("t1", "t2", "t3") {
sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)")
assert(spark.table("t1").rdd.partitions.length == 2)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false")
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)")
assert(spark.table("t2").rdd.partitions.length == 1)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)")
assert(spark.table("t3").rdd.partitions.length == 2)

sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}")
}
}
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check bucket scan") {
withTable("t1", "t2", "t3") {
Seq(1, 2, 3).foreach { i =>
spark.range(1, 2)
.write
.format("parquet")
.bucketBy(2, "id")
.saveAsTable(s"t$i")
}

withCache("t1", "t2", "t3") {
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true",
SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") {
sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
sql("CACHE TABLE t1")
assert(spark.table("t1").rdd.partitions.length == 2)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false")
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2")
assert(spark.table("t2").rdd.partitions.length == 1)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3")
assert(spark.table("t3").rdd.partitions.length == 2)

sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}")
}
}
}
}
}

0 comments on commit 7625677

Please sign in to comment.