Skip to content

Commit

Permalink
[SPARK-13403][SQL] Pass hadoopConfiguration to HiveConf constructors.
Browse files Browse the repository at this point in the history
This commit updates the HiveContext so that sc.hadoopConfiguration is used to instantiate its internal instances of HiveConf.

I tested this by overriding the S3 FileSystem implementation from spark-defaults.conf as "spark.hadoop.fs.s3.impl" (to avoid [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810)).

Author: Ryan Blue <blue@apache.org>

Closes #11273 from rdblue/SPARK-13403-new-hive-conf-from-hadoop-conf.
  • Loading branch information
rdblue authored and rxin committed Mar 17, 2016
1 parent de1a84e commit 5faba9f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class HiveContext private[hive](
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
sparkConf = sc.conf,
execJars = Seq(),
hadoopConf = sc.hadoopConfiguration,
config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false,
baseClassLoader = Utils.getContextOrSparkClassLoader)
Expand Down Expand Up @@ -239,7 +240,7 @@ class HiveContext private[hive](

// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
// into the isolated client loader
val metadataConf = new HiveConf()
val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf])

val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
logInfo("default warehouse location is " + defaultWarehouseLocation)
Expand Down Expand Up @@ -279,6 +280,7 @@ class HiveContext private[hive](
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
hadoopConf = sc.hadoopConfiguration,
config = allConfig,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
Expand All @@ -291,6 +293,7 @@ class HiveContext private[hive](
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sc.conf,
hadoopConf = sc.hadoopConfiguration,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
Expand Down Expand Up @@ -320,6 +323,7 @@ class HiveContext private[hive](
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
hadoopConf = sc.hadoopConfiguration,
config = allConfig,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
Expand Down Expand Up @@ -62,6 +63,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
private[hive] class HiveClientImpl(
override val version: HiveVersion,
sparkConf: SparkConf,
hadoopConf: Configuration,
config: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
Expand Down Expand Up @@ -115,7 +117,7 @@ private[hive] class HiveClientImpl(
// so we should keep `conf` and reuse the existing instance of `CliSessionState`.
originalState
} else {
val initialConf = new HiveConf(classOf[SessionState])
val initialConf = new HiveConf(hadoopConf, classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.language.reflectiveCalls
import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkSubmitUtils
Expand All @@ -42,6 +43,7 @@ private[hive] object IsolatedClientLoader extends Logging {
hiveMetastoreVersion: String,
hadoopVersion: String,
sparkConf: SparkConf,
hadoopConf: Configuration,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
Expand Down Expand Up @@ -79,6 +81,7 @@ private[hive] object IsolatedClientLoader extends Logging {
hiveVersion(hiveMetastoreVersion),
sparkConf,
execJars = files,
hadoopConf = hadoopConf,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
sharedPrefixes = sharedPrefixes,
Expand Down Expand Up @@ -149,6 +152,7 @@ private[hive] object IsolatedClientLoader extends Logging {
private[hive] class IsolatedClientLoader(
val version: HiveVersion,
val sparkConf: SparkConf,
val hadoopConf: Configuration,
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
Expand Down Expand Up @@ -238,7 +242,7 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -249,7 +253,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, sparkConf, config, classLoader, this)
.newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.SparkConf
Expand All @@ -33,7 +34,8 @@ class HiveCatalogSuite extends CatalogTestCases {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = new SparkConf()).createClient()
sparkConf = new SparkConf(),
hadoopConf = new Configuration()).createClient()
}

protected override val utils: CatalogTestUtils = new CatalogTestUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client

import java.io.File

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -63,12 +64,26 @@ class VersionsSuite extends SparkFunSuite with Logging {
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
val db = new CatalogDatabase("default", "desc", "loc", Map())
badClient.createDatabase(db, ignoreIfExists = true)
}

test("hadoop configuration preserved") {
val hadoopConf = new Configuration();
hadoopConf.set("test", "success")
val client = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
assert("success" === client.getConf("test", null))
}

private def getNestedMessages(e: Throwable): String = {
var causes = ""
var lastException = e
Expand Down Expand Up @@ -98,6 +113,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand All @@ -118,6 +134,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
}
Expand Down

0 comments on commit 5faba9f

Please sign in to comment.