Skip to content

Commit

Permalink
Use a NonClosableMutableURLClassLoader as the classLoader in Isolated…
Browse files Browse the repository at this point in the history
…ClientLoader.
  • Loading branch information
yhuai committed Oct 23, 2015
1 parent a88c66c commit 65f0a4c
Showing 1 changed file with 51 additions and 28 deletions.
Expand Up @@ -30,7 +30,7 @@ import org.apache.commons.io.{FileUtils, IOUtils}

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.{MutableURLClassLoader, Utils}

import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
Expand Down Expand Up @@ -148,39 +148,51 @@ private[hive] class IsolatedClientLoader(
protected def classToPath(name: String): String =
name.replaceAll("\\.", "/") + ".class"

/** The classloader that is used to load an isolated version of Hive. */
private[hive] var classLoader: ClassLoader = if (isolationOn) {
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader.
logDebug(s"shared class: $name")
baseClassLoader.loadClass(name)
/**
* The classloader that is used to load an isolated version of Hive.
* This classloader is a special URLClassLoader that exposes the addURL method.
* So, when we add jar, we can add this new jar directly through the addURL method
* instead of stacking a new URLClassLoader on top of it.
*/
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader.
logDebug(s"shared class: $name")
baseClassLoader.loadClass(name)
}
}
}
} else {
baseClassLoader
}
}
} else {
baseClassLoader
// Right now, we create a URLClassLoader that gives preference to isolatedClassLoader
// over its own URLs when it loads classes and resources.
// We may want to use ChildFirstURLClassLoader based on
// the configuration of spark.executor.userClassPathFirst, which gives preference
// to its own URLs over the parent class loader (see Executor's createClassLoader method).
new NonClosableMutableURLClassLoader(isolatedClassLoader)
}

private[hive] def addJar(path: String): Unit = synchronized {
val jarURL = new java.io.File(path).toURI.toURL
// TODO: we should avoid of stacking classloaders (use a single URLClassLoader and add jars
// to that)
classLoader = new java.net.URLClassLoader(Array(jarURL), classLoader)
classLoader.addURL(jarURL)
}

/** The isolated client interface to Hive. */
Expand Down Expand Up @@ -221,3 +233,14 @@ private[hive] class IsolatedClientLoader(
*/
private[hive] var cachedHive: Any = null
}

/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
* This class loader cannot be closed (its `close` method is a no-op).
*/
private[sql] class NonClosableMutableURLClassLoader(
parent: ClassLoader)
extends MutableURLClassLoader(Array.empty, parent) {

override def close(): Unit = {}
}

0 comments on commit 65f0a4c

Please sign in to comment.