Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client #40224

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,15 @@ private[spark] object TestUtils {
baseClass: String = null,
classpathUrls: Seq[URL] = Seq.empty,
implementsClasses: Seq[String] = Seq.empty,
extraCodeBody: String = ""): File = {
extraCodeBody: String = "",
packageName: Option[String] = None): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val implementsText =
"implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ")
val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
val sourceFile = new JavaSourceFromString(className,
s"""
|$packageText
|public class $className $extendsText $implementsText {
| @Override public String toString() { return "$toStringValue"; }
|
Expand Down
37 changes: 4 additions & 33 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
package org.apache.spark.sql.hive

import java.io.File
import java.net.{URL, URLClassLoader}
import java.net.URL
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.Try

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
Expand All @@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.types._
import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
import org.apache.spark.util.Utils


private[spark] object HiveUtils extends Logging {
Expand Down Expand Up @@ -409,43 +408,15 @@ private[spark] object HiveUtils extends Logging {
s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
}

// We recursively find all jars in the class loader chain,
// starting from the given classLoader.
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
case null => Array.empty[URL]
case childFirst: ChildFirstURLClassLoader =>
childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
case urlClassLoader: URLClassLoader =>
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
case other => allJars(other.getParent)
}

val classLoader = Utils.getContextOrSparkClassLoader
val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
// so it won't match the case in allJars. It no longer exposes URLs of
// the system classpath
Array.empty[URL]
} else {
val loadedJars = allJars(classLoader)
// Verify at least one jar was found
if (loadedJars.length == 0) {
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore. " +
s"Please set ${HIVE_METASTORE_JARS.key}.")
}
loadedJars
}

logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = !isCliSessionState(),
isolationOn = false,
sessionStateIsolationOverride = Some(!isCliSessionState()),
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private[hive] class HiveClientImpl(
// Create an internal session state for this HiveClientImpl.
val state: SessionState = {
val original = Thread.currentThread().getContextClassLoader
if (clientLoader.isolationOn) {
if (clientLoader.sessionStateIsolationOn) {
// Switch to the initClassLoader.
Thread.currentThread().setContextClassLoader(initClassLoader)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ private[hive] object IsolatedClientLoader extends Logging {
* @param config A set of options that will be added to the HiveConf of the constructed client.
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
* true unless loading the version of hive that is on Spark's classloader.
* @param sessionStateIsolationOverride If present, this parameter will specify the value of
* `sessionStateIsolationOn`. If empty (the default), the
* value of `isolationOn` will be used.
* @param baseClassLoader The spark classloader that is used to load shared classes.
*/
private[hive] class IsolatedClientLoader(
Expand All @@ -189,11 +192,19 @@ private[hive] class IsolatedClientLoader(
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
sessionStateIsolationOverride: Option[Boolean] = None,
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
val sharedPrefixes: Seq[String] = Seq.empty,
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {

/**
* This controls whether the generated clients maintain an independent/isolated copy of the
* Hive `SessionState`. If false, the Hive will leverage the global/static copy of
* `SessionState`; if true, it will generate a new copy of the state internally.
*/
val sessionStateIsolationOn: Boolean = sessionStateIsolationOverride.getOrElse(isolationOn)

/** All jars used by the hive specific classloader. */
protected def allJars = execJars.toArray

Expand Down Expand Up @@ -232,51 +243,46 @@ private[hive] class IsolatedClientLoader(
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
if (allJars.isEmpty) {
// See HiveUtils; this is the Java 9+ + builtin mode scenario
baseClassLoader
} else {
val rootClassLoader: ClassLoader =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
val rootClassLoader: ClassLoader =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.hive

import java.io.File
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.ChildFirstURLClassLoader
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}

class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

Expand Down Expand Up @@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
}
}

test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") {
withTempDir { tmpDir =>
val classFile = TestUtils.createCompiledClass(
"Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata"))

val jarFile = new File(tmpDir, "hive-fake.jar")
TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata"))

val conf = new SparkConf
val contextClassLoader = Thread.currentThread().getContextClassLoader
val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader)
try {
Thread.currentThread().setContextClassLoader(loader)
val client = HiveUtils.newClientForMetadata(
conf,
SparkHadoopUtil.newConfiguration(conf),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
client.createDatabase(
CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
ignoreIfExists = true)
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader)
}
}
}

test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
// Test default value
val defaultConf = new Configuration
Expand Down