Skip to content

Commit

Permalink
[SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'b…
Browse files Browse the repository at this point in the history
…uiltin' Hive version for metadata client

### What changes were proposed in this pull request?
When using the 'builtin' Hive version for the Hive metadata client, do not create a separate classloader, and rather continue to use the overall user/application classloader (regardless of Java version). This standardizes the behavior for all Java versions with that of Java 9+. See SPARK-42539 for more details on why this approach was chosen.

### Why are the changes needed?
Please see a much more detailed description in SPARK-42539. The tl;dr is that user-provided JARs (such as `hive-exec-2.3.8.jar`) take precedence over Spark/system JARs when constructing the classloader used by `IsolatedClientLoader` on Java 8 in 'builtin' mode, which can cause unexpected behavior and/or breakages. This violates the expectation that, unless user-first classloader mode is used, Spark JARs should be prioritized over user JARs. It also seems that this separate classloader was unnecessary from the start, since the intent of 'builtin' mode is to use the JARs already existing on the regular classloader (as alluded to [here](#24057 (comment))). The isolated clientloader was originally added in #5876 in 2015. This bit in the PR description is the only mention of the behavior for "builtin":
> attempt to discover the jars that were used to load Spark SQL and use those. This option is only valid when using the execution version of Hive.

I can't follow the logic here; the user classloader clearly has all of the necessary Hive JARs, since that's where we're getting the JAR URLs from, so we could just use that directly instead of grabbing the URLs. When this was initially added, it only used the JARs from the user classloader, not any of its parents, which I suspect was the motivating factor (to try to avoid more Spark classes being duplicated inside of the isolated classloader, I guess). But that was changed a month later anyway in #6435 / #6459, so I think this may have basically been deadcode from the start. It has also caused at least one issue over the years, e.g. SPARK-21428, which disables the new-classloader behavior in the case of running inside of a CLI session.

### Does this PR introduce _any_ user-facing change?
No, except to protect Spark itself from potentially being broken by bad user JARs.

### How was this patch tested?
This includes a new unit test in `HiveUtilsSuite` which demonstrates the issue and shows that this approach resolves it. It has also been tested on a live cluster running Java 8 and Hive communication functionality continues to work as expected.

Closes #40144 from xkrogen/xkrogen/SPARK-42539/hive-isolatedclientloader-builtin-user-jar-conflict-fix/java9strategy.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
  • Loading branch information
xkrogen authored and sunchao committed Feb 27, 2023
1 parent fdb36df commit 27ad583
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 97 deletions.
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,15 @@ private[spark] object TestUtils {
baseClass: String = null,
classpathUrls: Seq[URL] = Seq.empty,
implementsClasses: Seq[String] = Seq.empty,
extraCodeBody: String = ""): File = {
extraCodeBody: String = "",
packageName: Option[String] = None): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val implementsText =
"implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ")
val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
val sourceFile = new JavaSourceFromString(className,
s"""
|$packageText
|public class $className $extendsText $implementsText {
| @Override public String toString() { return "$toStringValue"; }
|
Expand Down
53 changes: 3 additions & 50 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
package org.apache.spark.sql.hive

import java.io.File
import java.net.{URL, URLClassLoader}
import java.net.URL
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.Try

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.util.VersionInfo
import org.apache.hive.common.util.HiveVersionInfo

Expand All @@ -46,7 +44,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.types._
import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
import org.apache.spark.util.Utils


private[spark] object HiveUtils extends Logging {
Expand Down Expand Up @@ -321,22 +319,6 @@ private[spark] object HiveUtils extends Logging {
(commonTimeVars ++ hardcodingTimeVars).toMap
}

/**
* Check current Thread's SessionState type
* @return true when SessionState.get returns an instance of CliSessionState,
* false when it gets non-CliSessionState instance or null
*/
def isCliSessionState(): Boolean = {
val state = SessionState.get
var temp: Class[_] = if (state != null) state.getClass else null
var found = false
while (temp != null && !found) {
found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
temp = temp.getSuperclass
}
found
}

/**
* Create a [[HiveClient]] used for execution.
*
Expand Down Expand Up @@ -409,43 +391,14 @@ private[spark] object HiveUtils extends Logging {
s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
}

// We recursively find all jars in the class loader chain,
// starting from the given classLoader.
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
case null => Array.empty[URL]
case childFirst: ChildFirstURLClassLoader =>
childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
case urlClassLoader: URLClassLoader =>
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
case other => allJars(other.getParent)
}

val classLoader = Utils.getContextOrSparkClassLoader
val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
// so it won't match the case in allJars. It no longer exposes URLs of
// the system classpath
Array.empty[URL]
} else {
val loadedJars = allJars(classLoader)
// Verify at least one jar was found
if (loadedJars.length == 0) {
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore. " +
s"Please set ${HIVE_METASTORE_JARS.key}.")
}
loadedJars
}

logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = !isCliSessionState(),
isolationOn = false,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,51 +232,46 @@ private[hive] class IsolatedClientLoader(
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
if (allJars.isEmpty) {
// See HiveUtils; this is the Java 9+ + builtin mode scenario
baseClassLoader
} else {
val rootClassLoader: ClassLoader =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
val rootClassLoader: ClassLoader =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.hive

import java.io.File
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.ChildFirstURLClassLoader
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}

class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

Expand Down Expand Up @@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
}
}

test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") {
withTempDir { tmpDir =>
val classFile = TestUtils.createCompiledClass(
"Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata"))

val jarFile = new File(tmpDir, "hive-fake.jar")
TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata"))

val conf = new SparkConf
val contextClassLoader = Thread.currentThread().getContextClassLoader
val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader)
try {
Thread.currentThread().setContextClassLoader(loader)
val client = HiveUtils.newClientForMetadata(
conf,
SparkHadoopUtil.newConfiguration(conf),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
client.createDatabase(
CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
ignoreIfExists = true)
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader)
}
}
}

test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
// Test default value
val defaultConf = new Configuration
Expand Down

0 comments on commit 27ad583

Please sign in to comment.