From 3c6ec38e4ee4f1451edfe52250855d6686d1b9a4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 15 May 2016 23:33:06 -0700 Subject: [PATCH 1/4] fix --- .../sql/execution/command/SetCommand.scala | 4 ++- .../spark/sql/internal/SessionState.scala | 4 +++ .../spark/sql/internal/SQLConfSuite.scala | 11 +++++++ .../spark/sql/hive/HiveSessionState.scala | 21 +++++++++++-- .../sql/hive/execution/SQLQuerySuite.scala | 31 ++++++++++++++++++- 5 files changed, 67 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 2409b5d203f40..c378cac747c39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -65,7 +65,9 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm // Configures a single property. case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { - sparkSession.conf.set(key, value) + // Use sessionState.setConfString for ensuring all the hive-related conf changes + // are sent to Hive Metastore too. + sparkSession.sessionState.setConfString(key, value) Seq(Row(key, value)) } (keyValueOutput, runFunc) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index f0b8a83dee8ca..9f7ed45d33a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -170,6 +170,10 @@ private[sql] class SessionState(sparkSession: SparkSession) { catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) } + def setConfString(key: String, value: String): Unit = { + conf.setConfString(key, value) + } + def addJar(path: String): Unit = { sparkSession.sparkContext.addJar(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 7ead97bbf6937..0bb7ed199d7bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -126,6 +126,17 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { spark.wrapped.conf.clear() } + test("Change warehouse path at runtime") { + spark.wrapped.conf.clear() + val original = spark.conf.get(SQLConf.WAREHOUSE_PATH) + try{ + sql(s"set ${SQLConf.WAREHOUSE_PATH.key}=/x/y/z/spark-warehouse") + assert(spark.conf.get(SQLConf.WAREHOUSE_PATH) === "/x/y/z/spark-warehouse") + } finally { + sql(s"set ${SQLConf.WAREHOUSE_PATH.key}=$original") + } + } + test("SparkSession can access configs set in SparkConf") { try { sparkContext.conf.set("spark.to.be.or.not.to.be", "my love") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 46579ecd85caa..2d292380e869e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -17,19 +17,20 @@ package org.apache.spark.sql.hive +import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.internal.{SessionState, SQLConf} /** * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive. */ private[hive] class HiveSessionState(sparkSession: SparkSession) - extends SessionState(sparkSession) { + extends SessionState(sparkSession) with Logging { self => @@ -105,6 +106,22 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) // Helper methods, partially leftover from pre-2.0 days // ------------------------------------------------------ + override def setConfString(key: String, value: String): Unit = { + if (key.toLowerCase == SQLConf.WAREHOUSE_PATH.key || + key.toLowerCase == "hive.metastore.warehouse.dir") { + metadataHive.runSqlHive(s"set hive.metastore.warehouse.dir=$value") + conf.setConfString(SQLConf.WAREHOUSE_PATH.key, value) + conf.setConfString("hive.metastore.warehouse.dir", value) + logInfo(s"Changing Hive metastore warehouse path to '$value'") + } else if (key.toLowerCase.startsWith("hive.")) { + // Need to use the sql command to pass the hive-related conf changes to Hive metastore + metadataHive.runSqlHive(s"set $key=$value") + conf.setConfString(key, value) + } else { + conf.setConfString(key, value) + } + } + override def addJar(path: String): Unit = { metadataHive.addJar(path) super.addJar(path) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ac9a3930fd21b..a3a011ee0dfc1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation} +import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils, MetastoreRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -1000,6 +1001,34 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("Change hive warehouse path at runtime") { + spark.wrapped.conf.clear() + val original = spark.conf.get(SQLConf.WAREHOUSE_PATH) + val sessionState = spark.sessionState.asInstanceOf[HiveSessionState] + assert(sessionState.metadataHive.getConf("hive.metastore.warehouse.dir", null) + != "/x/y/z/spark-warehouse") + try{ + sql(s"set ${SQLConf.WAREHOUSE_PATH.key}=/x/y/z/spark-warehouse") + assert(spark.conf.get(SQLConf.WAREHOUSE_PATH) == "/x/y/z/spark-warehouse") + assert(spark.conf.get("hive.metastore.warehouse.dir") == "/x/y/z/spark-warehouse") + assert(sessionState.metadataHive.getConf("hive.metastore.warehouse.dir", null) + == "/x/y/z/spark-warehouse") + } finally { + sql(s"set ${SQLConf.WAREHOUSE_PATH.key}=$original") + sql(s"set hive.metastore.warehouse.dir=$original") + } + try{ + sql(s"set hive.metastore.warehouse.dir=/x/y/z/spark-warehouse") + assert(spark.conf.get(SQLConf.WAREHOUSE_PATH) == "/x/y/z/spark-warehouse") + assert(spark.conf.get("hive.metastore.warehouse.dir") == "/x/y/z/spark-warehouse") + assert(sessionState.metadataHive.getConf("hive.metastore.warehouse.dir", null) + == "/x/y/z/spark-warehouse") + } finally { + sql(s"set ${SQLConf.WAREHOUSE_PATH.key}=$original") + sql(s"set hive.metastore.warehouse.dir=$original") + } + } + test("SPARK-6785: HiveQuerySuite - Date comparison test 2") { checkAnswer( sql("SELECT CAST(CAST(0 AS timestamp) AS date) > CAST(0 AS timestamp) FROM src LIMIT 1"), From d8d6ca32147174615fa51af97b294b70f919aec2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 15 May 2016 23:54:59 -0700 Subject: [PATCH 2/4] fix --- .../main/scala/org/apache/spark/sql/hive/HiveSessionState.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 2d292380e869e..021076cf708ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -113,7 +113,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) conf.setConfString(SQLConf.WAREHOUSE_PATH.key, value) conf.setConfString("hive.metastore.warehouse.dir", value) logInfo(s"Changing Hive metastore warehouse path to '$value'") - } else if (key.toLowerCase.startsWith("hive.")) { + } else if (!key.toLowerCase.startsWith("spark.")) { // Need to use the sql command to pass the hive-related conf changes to Hive metastore metadataHive.runSqlHive(s"set $key=$value") conf.setConfString(key, value) From 52161e4e402a99bad5cca8b42a804df9b5b80596 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 May 2016 19:40:53 -0700 Subject: [PATCH 3/4] fix --- .../src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 2f6aa36f95827..009a51c3396c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -522,6 +522,7 @@ private[hive] class TestHiveSessionState( super.clear() TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) } setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString) + setConfString("spark.sql.warehouse.dir", self.warehousePath.toURI.toString) } } } From 372b492d3d96070f270f88b8304137a2ebd1a440 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 May 2016 21:13:04 -0700 Subject: [PATCH 4/4] one more try --- .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 009a51c3396c7..24c0d770a3d7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -502,6 +502,9 @@ private[hive] class TestHiveSharedState( metastoreTemporaryConf: Map[String, String]) extends HiveSharedState(sc) { + // The value set by HiveSharedState is the default value of SQLConf.WAREHOUSE_PATH + sc.conf.set("hive.metastore.warehouse.dir", warehousePath.toURI.toString) + override lazy val metadataHive: HiveClient = { TestHiveContext.newClientForMetadata( sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf)