Skip to content

Commit

Permalink
[SPARK-32406][SQL][FOLLOWUP] Make RESET fail against static and core …
Browse files Browse the repository at this point in the history
…configs

### What changes were proposed in this pull request?

This followup addresses comments from #29202 (comment)

1. make RESET static SQL configs/spark core configs fail as same as the SET command. Not that, for core ones, they have to be pre-registered, otherwise, they are still able to be SET/RESET

2. add test cases for configurations w/ optional default values

### Why are the changes needed?

behavior change with suggestions from PMCs

### Does this PR introduce _any_ user-facing change?

Yes, RESET will fail after this PR, before it just does nothing because the static ones are static.

### How was this patch tested?

add more tests.

Closes #29297 from yaooqinn/SPARK-32406-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
yaooqinn authored and HyukjinKwon committed Jul 31, 2020
1 parent 8014b0b commit f480040
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
Expand Up @@ -172,15 +172,16 @@ object SetCommand {
case class ResetCommand(config: Option[String]) extends RunnableCommand with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
val conf = sparkSession.sessionState.conf
val defaults = sparkSession.sparkContext.conf
config match {
case Some(key) =>
conf.unsetConf(key)
defaults.getOption(key).foreach(conf.setConfString(key, _))
sparkSession.conf.unset(key)
defaults.getOption(key).foreach(sparkSession.conf.set(key, _))
case None =>
conf.clear()
defaults.getAll.foreach { case (k, v) => conf.setConfString(k, v) }
sparkSession.sessionState.conf.clear()
defaults.getAll.foreach { case (k, v) =>
sparkSession.sessionState.conf.setConfString(k, v)
}
}
Seq.empty[Row]
}
Expand Down
Expand Up @@ -142,9 +142,12 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false)
assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1)
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
sql(s"reset")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL))
assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0)
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
} finally {
sql(s"set ${SQLConf.GROUP_BY_ORDINAL}=$original")
}
Expand Down Expand Up @@ -182,18 +185,42 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-32406: reset - single configuration") {
spark.sessionState.conf.clear()
// spark core conf w/o entry registered
val appId = spark.sparkContext.getConf.getAppId
sql("RESET spark.app.id")
assert(spark.conf.get("spark.app.id") === appId, "Should not change spark core ones")
// spark core conf w/ entry registered
val e1 = intercept[AnalysisException](sql("RESET spark.executor.cores"))
assert(e1.getMessage === "Cannot modify the value of a Spark config: spark.executor.cores;")

// user defined settings
sql("SET spark.abc=xyz")
assert(spark.conf.get("spark.abc") === "xyz")
sql("RESET spark.abc")
intercept[NoSuchElementException](spark.conf.get("spark.abc"))
sql("RESET spark.abc") // ignore nonexistent keys

// runtime sql configs
val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
sql(s"SET ${SQLConf.GROUP_BY_ORDINAL.key}=false")
sql(s"RESET ${SQLConf.GROUP_BY_ORDINAL.key}")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === original)

// runtime sql configs with optional defaults
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
sql(s"RESET ${SQLConf.OPTIMIZER_EXCLUDED_RULES.key}")
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
sql(s"SET ${SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES.key}=abc")
sql(s"RESET ${SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES.key}")
assert(spark.conf.get(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES).isEmpty)

// static sql configs
val e2 = intercept[AnalysisException](sql(s"RESET ${StaticSQLConf.WAREHOUSE_PATH.key}"))
assert(e2.getMessage ===
s"Cannot modify the value of a static config: ${StaticSQLConf.WAREHOUSE_PATH.key};")

}

test("invalid conf value") {
Expand Down

0 comments on commit f480040

Please sign in to comment.