Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35332][SQL] Make cache plan disable configs configurable #32482

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING =
buildConf("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning")
.internal()
.doc(s"When false, some configs are disabled during executing cache plan that is to avoid " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

Whether to forcibly enable some optimization rules that can change the output partitioning
of a cached query when executing it for caching. If it is set to true, 
queries may need an extra shuffle to read the cached data. This configuration is disabled by default. 
Currently, the optimization rules enabled by this configuration are
${ADAPTIVE_EXECUTION_ENABLED.key}  and ${AUTO_BUCKETED_SCAN_ENABLED.key}.

"performance regression if other queries hit the cached plan. Currently, the disabled " +
s"configs include: ${ADAPTIVE_EXECUTION_ENABLED.key} and " +
s"${AUTO_BUCKETED_SCAN_ENABLED.key}.")
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
.version("3.2.0")
.booleanConf
.createWithDefault(false)

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff(
spark, forceDisableConfigs)
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val inMemoryRelation = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache)
InMemoryRelation(
Expand Down Expand Up @@ -223,8 +222,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
}
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff(
spark, forceDisableConfigs)
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val newCache = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(cd.plan)
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
Expand Down Expand Up @@ -328,4 +326,15 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (needToRefresh) fileIndex.refresh()
needToRefresh
}

/**
* If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is disabled, just return original session.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is disabled" -> do we mean enabled?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, forgot to fix the comment, create followup #32543.

*/
private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = {
if (session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
session
} else {
SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}

test("cache supports for intervals") {
withTable("interval_cache") {
withTable("interval_cache", "t1") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related this pr, but affected the new added test with t1.

Seq((1, "1 second"), (2, "2 seconds"), (2, null))
.toDF("k", "v").write.saveAsTable("interval_cache")
sql("CACHE TABLE t1 AS SELECT k, cast(v as interval) FROM interval_cache")
Expand Down Expand Up @@ -1554,4 +1554,67 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(!spark.catalog.isCached(viewName))
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check AQE") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {

withTempView("t1", "t2", "t3") {
withCache("t1", "t2", "t3") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withTempView drops caches, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, removed it.

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)")
assert(spark.table("t1").rdd.partitions.length == 2)

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)")
assert(spark.table("t2").rdd.partitions.length == 1)

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)")
assert(spark.table("t3").rdd.partitions.length == 2)
}
}
}
}
}
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check bucket scan") {
withTable("t1", "t2", "t3") {
Seq(1, 2, 3).foreach { i =>
spark.range(1, 2)
.write
.format("parquet")
.bucketBy(2, "id")
.saveAsTable(s"t$i")
}

withCache("t1", "t2", "t3") {
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true",
SQLConf.FILES_MIN_PARTITION_NUM.key -> "1",
SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
sql("CACHE TABLE t1")
assert(spark.table("t1").rdd.partitions.length == 2)

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2")
assert(spark.table("t2").rdd.partitions.length == 1)

withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3")
assert(spark.table("t3").rdd.partitions.length == 2)
}
}
}
}
}
}
}