Skip to content

Commit

Permalink
[SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actu…
Browse files Browse the repository at this point in the history
…al metastore not a dummy one

## What changes were proposed in this pull request?

While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too.

## How was this patch tested?
existing ut

cc cloud-fan jiangxb1987

Author: Kent Yao <yaooqinn@hotmail.com>

Closes #19068 from yaooqinn/SPARK-21428-FOLLOWUP.
  • Loading branch information
yaooqinn authored and cloud-fan committed Sep 19, 2017
1 parent 1bc17a6 commit 581200a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 10 deletions.
Expand Up @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.log4j.{Level, Logger}
import org.apache.thrift.transport.TSocket

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
Expand Down Expand Up @@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
System.exit(1)
}

val sparkConf = new SparkConf(loadDefaults = true)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)

val cliConf = new HiveConf(classOf[SessionState])
// Override the location of the metastore since this is only used for local execution.
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
case (key, value) => cliConf.set(key, value)
(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
case (k, v) =>
cliConf.set(k, v)
}

val sessionState = new CliSessionState(cliConf)

sessionState.in = System.in
Expand Down
Expand Up @@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging {
}

/**
* Configurations needed to create a [[HiveClient]].
* Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format.
*/
private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = {
private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = {
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
// compatibility when users are trying to connecting to a Hive metastore of lower version,
Expand Down Expand Up @@ -280,7 +280,7 @@ private[spark] object HiveUtils extends Logging {
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
val configurations = hiveClientConfigurations(hadoopConf)
val configurations = formatTimeVarsForHiveClient(hadoopConf)
newClientForMetadata(conf, hadoopConf, configurations)
}

Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
Expand Down Expand Up @@ -132,14 +133,24 @@ private[hive] class HiveClientImpl(
// in hive jars, which will turn off isolation, if SessionSate.detachSession is
// called to remove the current state after that, hive client created later will initialize
// its own state by newState()
Option(SessionState.get).getOrElse(newState())
val ret = SessionState.get
if (ret != null) {
// hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
// instance constructed, we need to follow that change here.
Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
}
ret
} else {
newState()
}
}
}

// Log the default warehouse location.
logInfo(
s"Warehouse location for Hive client " +
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")

private def newState(): SessionState = {
val hiveConf = new HiveConf(classOf[SessionState])
Expand Down
Expand Up @@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
hadoopConf.set("hive.metastore.schema.verification", "false")
}
HiveClientBuilder
.buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
.buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
}

override def suiteName: String = s"${super.suiteName}($version)"
Expand Down
Expand Up @@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
if (versionSpark != null) versionSpark.reset()
versionSpark = TestHiveVersion(client)
assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
Expand Down

0 comments on commit 581200a

Please sign in to comment.