From 4bdc9ad5506a6055b9bb0b82ad4cfb1af09708c0 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 29 Jul 2017 18:48:02 +0800 Subject: [PATCH 1/6] Fix set hive.exec.max.dynamic.partitions lose effect. --- .../sql/catalyst/catalog/ExternalCatalog.scala | 2 ++ .../sql/catalyst/catalog/InMemoryCatalog.scala | 3 ++- .../spark/sql/execution/command/SetCommand.scala | 1 + .../apache/spark/sql/hive/HiveExternalCatalog.scala | 3 +++ .../apache/spark/sql/hive/client/HiveClient.scala | 3 +++ .../spark/sql/hive/client/HiveClientImpl.scala | 5 +++++ .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 13 +++++++++++++ 7 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 6000d483db209..c559865db71aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -355,6 +355,8 @@ abstract class ExternalCatalog def listFunctions(db: String, pattern: String): Seq[String] + def setConf(key: String, value: String): Unit + override protected def doPostEvent( listener: ExternalCatalogEventListener, event: ExternalCatalogEvent): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 37e9eeadaad47..fa44a578d88cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -609,5 +609,6 @@ class InMemoryCatalog( requireDbExists(db) StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) } - + override def setConf(key: String, value: String): Unit = + throw new UnsupportedOperationException("in-memory catalog does not support setConf.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 5f12830ee621f..19aaeb54613cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -88,6 +88,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { sparkSession.conf.set(key, value) + sparkSession.sharedState.externalCatalog.setConf(key, value) Seq(Row(key, value)) } (keyValueOutput, runFunc) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 70d7dd23d908a..38d48a4704653 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1175,6 +1175,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.listFunctions(db, pattern) } + override def setConf(key: String, value: String): Unit = withClient { + client.setConf(key, value) + } } object HiveExternalCatalog { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 16a80f9fff452..6d86965790d40 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -38,6 +38,9 @@ private[hive] trait HiveClient { /** Returns the configuration for the given key in the current session. */ def getConf(key: String, defaultValue: String): String + /** Set the given configuration property. */ + def setConf(key: String, value: String): Unit + /** * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will * result in one string. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bde9a81c65a4e..c0d3559961be9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -215,6 +215,11 @@ private[hive] class HiveClientImpl( conf.get(key, defaultValue) } + override def setConf(key: String, value: String): Unit = { + clientLoader.cachedHive = null + conf.set(key, value) + } + // We use hive's conf for compatibility. private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES) private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index cc80f2e481cbf..83d779a1bbdcf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -459,6 +459,19 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } + test("set hive.exec.max.dynamic.partitions lose effect") { + val cnt = 1001 + val sourceTable = "sourceTable" + val targetTable = "targetTable" + (0 until cnt).map(i => (i, i)).toDF("c1", "c2").createOrReplaceTempView(sourceTable) + sql(s"create table $targetTable(c1 int) PARTITIONED BY(c2 int)") + sql("set hive.exec.dynamic.partition.mode=nonstrict") + sql(s"set hive.exec.max.dynamic.partitions=$cnt") + sql(s"insert overwrite table $targetTable partition(c2) select * from $sourceTable") + + checkAnswer(sql(s"SELECT count(*) FROM $targetTable"), Row(cnt)) + } + testPartitionedTable("insertInto() should reject missing columns") { tableName => sql("CREATE TABLE t (a INT, b INT)") From cee6838e6f188c1d751ef6d3bc2a191057f45e34 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 30 Jul 2017 13:55:00 +0800 Subject: [PATCH 2/6] Set conf to catalog when it's hive. --- .../org/apache/spark/sql/execution/command/SetCommand.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 19aaeb54613cb..8e192f0296336 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -88,7 +89,9 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { sparkSession.conf.set(key, value) - sparkSession.sharedState.externalCatalog.setConf(key, value) + if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) { + sparkSession.sharedState.externalCatalog.setConf(key, value) + } Seq(Row(key, value)) } (keyValueOutput, runFunc) From 506050aef9013c6452d11affbf5337bc60a65fb3 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 1 Aug 2017 16:52:01 +0800 Subject: [PATCH 3/6] Point out user to set hive config before SparkSession is initialized. --- .../sql/catalyst/catalog/ExternalCatalog.scala | 2 -- .../sql/catalyst/catalog/InMemoryCatalog.scala | 3 +-- .../spark/sql/execution/command/SetCommand.scala | 8 +++++--- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 3 --- .../apache/spark/sql/hive/client/HiveClient.scala | 3 --- .../spark/sql/hive/client/HiveClientImpl.scala | 5 ----- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 13 ------------- 7 files changed, 6 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index c559865db71aa..6000d483db209 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -355,8 +355,6 @@ abstract class ExternalCatalog def listFunctions(db: String, pattern: String): Seq[String] - def setConf(key: String, value: String): Unit - override protected def doPostEvent( listener: ExternalCatalogEventListener, event: ExternalCatalogEvent): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index fa44a578d88cd..37e9eeadaad47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -609,6 +609,5 @@ class InMemoryCatalog( requireDbExists(db) StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) } - override def setConf(key: String, value: String): Unit = - throw new UnsupportedOperationException("in-memory catalog does not support setConf.") + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 8e192f0296336..14c80674df786 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -88,10 +88,12 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm // Configures a single property. case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { - sparkSession.conf.set(key, value) - if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) { - sparkSession.sharedState.externalCatalog.setConf(key, value) + if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") + && key.startsWith("hive.")) { + logWarning(s"Please set hive config through " + + s"--conf spark.hadoop.${key}=${value} before SparkSession is initialized.") } + sparkSession.conf.set(key, value) Seq(Row(key, value)) } (keyValueOutput, runFunc) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 38d48a4704653..70d7dd23d908a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1175,9 +1175,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.listFunctions(db, pattern) } - override def setConf(key: String, value: String): Unit = withClient { - client.setConf(key, value) - } } object HiveExternalCatalog { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 6d86965790d40..16a80f9fff452 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -38,9 +38,6 @@ private[hive] trait HiveClient { /** Returns the configuration for the given key in the current session. */ def getConf(key: String, defaultValue: String): String - /** Set the given configuration property. */ - def setConf(key: String, value: String): Unit - /** * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will * result in one string. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c0d3559961be9..bde9a81c65a4e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -215,11 +215,6 @@ private[hive] class HiveClientImpl( conf.get(key, defaultValue) } - override def setConf(key: String, value: String): Unit = { - clientLoader.cachedHive = null - conf.set(key, value) - } - // We use hive's conf for compatibility. private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES) private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 83d779a1bbdcf..cc80f2e481cbf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -459,19 +459,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } - test("set hive.exec.max.dynamic.partitions lose effect") { - val cnt = 1001 - val sourceTable = "sourceTable" - val targetTable = "targetTable" - (0 until cnt).map(i => (i, i)).toDF("c1", "c2").createOrReplaceTempView(sourceTable) - sql(s"create table $targetTable(c1 int) PARTITIONED BY(c2 int)") - sql("set hive.exec.dynamic.partition.mode=nonstrict") - sql(s"set hive.exec.max.dynamic.partitions=$cnt") - sql(s"insert overwrite table $targetTable partition(c2) select * from $sourceTable") - - checkAnswer(sql(s"SELECT count(*) FROM $targetTable"), Row(cnt)) - } - testPartitionedTable("insertInto() should reject missing columns") { tableName => sql("CREATE TABLE t (a INT, b INT)") From 2eb39cb691a68500bd839f43f49346a5060a6103 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 2 Aug 2017 15:42:54 +0800 Subject: [PATCH 4/6] improve log --- .../org/apache/spark/sql/execution/command/SetCommand.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 14c80674df786..fa879ce96e297 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -90,8 +90,10 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm val runFunc = (sparkSession: SparkSession) => { if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") && key.startsWith("hive.")) { - logWarning(s"Please set hive config through " + - s"--conf spark.hadoop.${key}=${value} before SparkSession is initialized.") + logWarning(s"SET $key=$value doesn't work, " + + s"because Spark doesn't support set hive config dynamically. " + + s"Please set hive config through " + + s"--conf spark.hadoop.$key=$value before SparkSession is initialized.") } sparkSession.conf.set(key, value) Seq(Row(key, value)) From 8787cbc8fc25d7d3b13510ed8db631ee71c74b15 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 6 Aug 2017 14:24:10 +0800 Subject: [PATCH 5/6] Update log message --- .../spark/sql/execution/command/SetCommand.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index fa879ce96e297..77e3f9ea2ffc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -88,12 +88,13 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm // Configures a single property. case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { - if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") - && key.startsWith("hive.")) { - logWarning(s"SET $key=$value doesn't work, " + - s"because Spark doesn't support set hive config dynamically. " + - s"Please set hive config through " + - s"--conf spark.hadoop.$key=$value before SparkSession is initialized.") + if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") && + key.startsWith("hive.")) { + logWarning(s"'SET $key=$value' might not work, since Spark doesn't support changing " + + s"the Hive config dynamically. Please passing the Hive-specific config by adding the " + + s"prefix spark.hadoop (e.g.,spark.hadoop.$key) when starting a Spark application. " + + s"For details, see the link: https://spark.apache.org/docs/latest/configuration.html#" + + s"dynamically-loading-spark-properties.") } sparkSession.conf.set(key, value) Seq(Row(key, value)) From 03f5753cd04705ae75129f47f125150428f12ec8 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 6 Aug 2017 14:33:26 +0800 Subject: [PATCH 6/6] Remove redundant s --- .../spark/sql/execution/command/SetCommand.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 77e3f9ea2ffc5..7477d025dfe89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -89,12 +89,12 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") && - key.startsWith("hive.")) { + key.startsWith("hive.")) { logWarning(s"'SET $key=$value' might not work, since Spark doesn't support changing " + - s"the Hive config dynamically. Please passing the Hive-specific config by adding the " + - s"prefix spark.hadoop (e.g.,spark.hadoop.$key) when starting a Spark application. " + - s"For details, see the link: https://spark.apache.org/docs/latest/configuration.html#" + - s"dynamically-loading-spark-properties.") + "the Hive config dynamically. Please passing the Hive-specific config by adding the " + + s"prefix spark.hadoop (e.g., spark.hadoop.$key) when starting a Spark application. " + + "For details, see the link: https://spark.apache.org/docs/latest/configuration.html#" + + "dynamically-loading-spark-properties.") } sparkSession.conf.set(key, value) Seq(Row(key, value))