From f0f6ceeaf8c5cfde83ce3c3067b9238c4af255d9 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 18 Feb 2016 17:02:23 -0800 Subject: [PATCH] SPARK-13403: Pass hadoopConfiguration to HiveConf constructors. sc.hadoopConfiguration may contain Hadoop-specific configuration properties that are not used by SparkSQL's HiveContext because it is not passed when constructing instances of HiveConf. --- .../org/apache/spark/sql/hive/HiveContext.scala | 6 +++++- .../spark/sql/hive/client/HiveClientImpl.scala | 4 +++- .../sql/hive/client/IsolatedClientLoader.scala | 8 ++++++-- .../spark/sql/hive/HiveCatalogSuite.scala | 4 +++- .../spark/sql/hive/client/VersionsSuite.scala | 17 +++++++++++++++++ 5 files changed, 34 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 05fc569588658..4238ad1ad41fa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -210,6 +210,7 @@ class HiveContext private[hive]( version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), sparkConf = sc.conf, execJars = Seq(), + hadoopConf = sc.hadoopConfiguration, config = newTemporaryConfiguration(useInMemoryDerby = true), isolationOn = false, baseClassLoader = Utils.getContextOrSparkClassLoader) @@ -239,7 +240,7 @@ class HiveContext private[hive]( // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options // into the isolated client loader - val metadataConf = new HiveConf() + val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf]) val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir") logInfo("default warehouse location is " + defaultWarehouseLocation) @@ -279,6 +280,7 @@ class HiveContext private[hive]( version = metaVersion, sparkConf = sc.conf, execJars = jars.toSeq, + hadoopConf = sc.hadoopConfiguration, config = allConfig, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, @@ -291,6 +293,7 @@ class HiveContext private[hive]( hiveMetastoreVersion = hiveMetastoreVersion, hadoopVersion = VersionInfo.getVersion, sparkConf = sc.conf, + hadoopConf = sc.hadoopConfiguration, config = allConfig, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) @@ -320,6 +323,7 @@ class HiveContext private[hive]( version = metaVersion, sparkConf = sc.conf, execJars = jars.toSeq, + hadoopConf = sc.hadoopConfiguration, config = allConfig, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c108750c383cc..ac322657e5f79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -22,6 +22,7 @@ import java.io.{File, PrintStream} import scala.collection.JavaConverters._ import scala.language.reflectiveCalls +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.cli.CliSessionState import org.apache.hadoop.hive.conf.HiveConf @@ -61,6 +62,7 @@ import org.apache.spark.util.{CircularBuffer, Utils} private[hive] class HiveClientImpl( override val version: HiveVersion, sparkConf: SparkConf, + hadoopConf: Configuration, config: Map[String, String], initClassLoader: ClassLoader, val clientLoader: IsolatedClientLoader) @@ -114,7 +116,7 @@ private[hive] class HiveClientImpl( // so we should keep `conf` and reuse the existing instance of `CliSessionState`. originalState } else { - val initialConf = new HiveConf(classOf[SessionState]) + val initialConf = new HiveConf(hadoopConf, classOf[SessionState]) // HiveConf is a Hadoop Configuration, which has a field of classLoader and // the initial value will be the current thread's context class loader // (i.e. initClassLoader at here). diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 024f4dfeba9d8..932402a5f32f2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -26,6 +26,7 @@ import scala.language.reflectiveCalls import scala.util.Try import org.apache.commons.io.{FileUtils, IOUtils} +import org.apache.hadoop.conf.Configuration import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.SparkSubmitUtils @@ -42,6 +43,7 @@ private[hive] object IsolatedClientLoader extends Logging { hiveMetastoreVersion: String, hadoopVersion: String, sparkConf: SparkConf, + hadoopConf: Configuration, config: Map[String, String] = Map.empty, ivyPath: Option[String] = None, sharedPrefixes: Seq[String] = Seq.empty, @@ -79,6 +81,7 @@ private[hive] object IsolatedClientLoader extends Logging { hiveVersion(hiveMetastoreVersion), sparkConf, execJars = files, + hadoopConf = hadoopConf, config = config, sharesHadoopClasses = sharesHadoopClasses, sharedPrefixes = sharedPrefixes, @@ -149,6 +152,7 @@ private[hive] object IsolatedClientLoader extends Logging { private[hive] class IsolatedClientLoader( val version: HiveVersion, val sparkConf: SparkConf, + val hadoopConf: Configuration, val execJars: Seq[URL] = Seq.empty, val config: Map[String, String] = Map.empty, val isolationOn: Boolean = true, @@ -238,7 +242,7 @@ private[hive] class IsolatedClientLoader( /** The isolated client interface to Hive. */ private[hive] def createClient(): HiveClient = { if (!isolationOn) { - return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this) + return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") @@ -249,7 +253,7 @@ private[hive] class IsolatedClientLoader( classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head - .newInstance(version, sparkConf, config, classLoader, this) + .newInstance(version, sparkConf, hadoopConf, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala index 2809f9439b823..e87e3e84d6931 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.VersionInfo import org.apache.spark.SparkConf @@ -33,7 +34,8 @@ class HiveCatalogSuite extends CatalogTestCases { IsolatedClientLoader.forVersion( hiveMetastoreVersion = HiveContext.hiveExecutionVersion, hadoopVersion = VersionInfo.getVersion, - sparkConf = new SparkConf()).createClient() + sparkConf = new SparkConf(), + hadoopConf = new Configuration()).createClient() } protected override val tableInputFormat: String = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 6292f6c3af02b..81ffde6490cbf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client import java.io.File +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkConf, SparkFunSuite} @@ -62,12 +63,26 @@ class VersionsSuite extends SparkFunSuite with Logging { hiveMetastoreVersion = HiveContext.hiveExecutionVersion, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, + hadoopConf = new Configuration(), config = buildConf(), ivyPath = ivyPath).createClient() val db = new CatalogDatabase("default", "desc", "loc", Map()) badClient.createDatabase(db, ignoreIfExists = true) } + test("hadoop configuration preserved") { + val hadoopConf = new Configuration(); + hadoopConf.set("test", "success") + val client = IsolatedClientLoader.forVersion( + hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hadoopVersion = VersionInfo.getVersion, + sparkConf = sparkConf, + hadoopConf = hadoopConf, + config = buildConf(), + ivyPath = ivyPath).createClient() + assert("success" === client.getConf("test", null)) + } + private def getNestedMessages(e: Throwable): String = { var causes = "" var lastException = e @@ -97,6 +112,7 @@ class VersionsSuite extends SparkFunSuite with Logging { hiveMetastoreVersion = "13", hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, + hadoopConf = new Configuration(), config = buildConf(), ivyPath = ivyPath).createClient() } @@ -117,6 +133,7 @@ class VersionsSuite extends SparkFunSuite with Logging { hiveMetastoreVersion = version, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, + hadoopConf = new Configuration(), config = buildConf(), ivyPath = ivyPath).createClient() }