Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17088][hive] Fix 'sharesHadoopClasses' option when creating client. #20169

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.hive.client

import java.io.{File, PrintStream}
import java.util.Locale
import java.lang.{Iterable => JIterable}
import java.util.{Locale, Map => JMap}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -82,8 +83,9 @@ import org.apache.spark.util.{CircularBuffer, Utils}
*/
private[hive] class HiveClientImpl(
override val version: HiveVersion,
warehouseDir: Option[String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add @param?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the semantics when warehouseDir is None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the code that existed before.

sparkConf: SparkConf,
hadoopConf: Configuration,
hadoopConf: JIterable[JMap.Entry[String, String]],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the bug fix... explained in the PR description.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case was added in this PR!

extraConfig: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
Expand Down Expand Up @@ -130,7 +132,7 @@ private[hive] class HiveClientImpl(
if (ret != null) {
// hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
// instance constructed, we need to follow that change here.
Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
warehouseDir.foreach { dir =>
ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
}
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
Expand All @@ -48,11 +49,12 @@ private[hive] object IsolatedClientLoader extends Logging {
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
barrierPrefixes: Seq[String] = Seq.empty,
sharesHadoopClasses: Boolean = true): IsolatedClientLoader = synchronized {
val resolvedVersion = hiveVersion(hiveMetastoreVersion)
// We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
// with the given version, we will use Hadoop 2.6 and then will not share Hadoop classes.
var sharesHadoopClasses = true
var _sharesHadoopClasses = sharesHadoopClasses
val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
resolvedVersions((resolvedVersion, hadoopVersion))
} else {
Expand All @@ -68,7 +70,7 @@ private[hive] object IsolatedClientLoader extends Logging {
"Hadoop classes will not be shared between Spark and Hive metastore client. " +
"It is recommended to set jars used by Hive metastore client through " +
"spark.sql.hive.metastore.jars in the production environment.")
sharesHadoopClasses = false
_sharesHadoopClasses = false
(downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5")
}
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
Expand All @@ -81,7 +83,7 @@ private[hive] object IsolatedClientLoader extends Logging {
execJars = files,
hadoopConf = hadoopConf,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
sharesHadoopClasses = _sharesHadoopClasses,
sharedPrefixes = sharedPrefixes,
barrierPrefixes = barrierPrefixes)
}
Expand Down Expand Up @@ -249,8 +251,10 @@ private[hive] class IsolatedClientLoader(

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
if (!isolationOn) {
return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -261,7 +265,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
.newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ private[client] object HiveClientBuilder {
def buildClient(
version: String,
hadoopConf: Configuration,
extraConf: Map[String, String] = Map.empty): HiveClient = {
extraConf: Map[String, String] = Map.empty,
sharesHadoopClasses: Boolean = true): HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = new SparkConf(),
hadoopConf = hadoopConf,
config = buildConf(extraConf),
ivyPath = ivyPath).createClient()
ivyPath = ivyPath,
sharesHadoopClasses = sharesHadoopClasses).createClient()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class HiveClientSuite(version: String)
day1 :: day2 :: Nil)
}

test("create client with sharesHadoopClasses = false") {
buildClient(new Configuration(), sharesHadoopClasses = false)
}

private def testMetastorePartitionFiltering(
filterString: String,
expectedDs: Seq[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ import org.apache.spark.sql.hive.HiveUtils
private[client] abstract class HiveVersionSuite(version: String) extends SparkFunSuite {
protected var client: HiveClient = null

protected def buildClient(hadoopConf: Configuration): HiveClient = {
protected def buildClient(
hadoopConf: Configuration,
sharesHadoopClasses: Boolean = true): HiveClient = {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
HiveClientBuilder
.buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
HiveClientBuilder.buildClient(
version,
hadoopConf,
HiveUtils.formatTimeVarsForHiveClient(hadoopConf),
sharesHadoopClasses = sharesHadoopClasses)
}

override def suiteName: String = s"${super.suiteName}($version)"
Expand Down