Skip to content

Commit

Permalink
[KYUUBI-150][FOLLOWUP]using the classLoader in IsolatedClassLoader (#166
Browse files Browse the repository at this point in the history
)

* reset class loader

* do not add jar twice in one create function cmd

* the classloader should be closeable

* the classloader should be closeable revert

* Revert "the classloader should be closeable revert"

This reverts commit 35602a0.

* add ut

* add ut

* add ut

* fix ut

fix #150
  • Loading branch information
yaooqinn committed Mar 19, 2019
1 parent 13c68b5 commit ba9b424
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 32 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
.settings
build/apache-maven*
build/scala*
build/test
kyuubi-server/build
kyuubi-server/*example*
target/
dist/
kyuubi-*-bin-*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,24 @@ private[hive] class IsolatedClientLoader(
assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)

/**
* The classloader that is used to load an isolated version of Hive.
* (Kent Yao) Different with Spark internal which use an isolated classloader to support different
* Hive versions, Kyuubi believe that the hive 1.2.1 is capable to support 1.2 or higher version
* Hive metastore servers and the elder hive client versions are not worth to support.
*
* ANOTHER reason here we close the isolation is because Spark don't expose authorization
* functions in [[HiveClient]], which is unable to invoke these methods in different classloaders
*
* Besides, [[HiveClient]] in normal Spark applications is globally one instance, so this
* classloader could/should be non-closeable. But in Kyuubi, this is a session level object
* associated with one KyuubiSession/SparkSession, thus, this classloader should be closeable to
* support class unloading.
*
* This classloader is a special URLClassLoader that exposes the addURL method.
* So, when we add jar, we can add this new jar directly through the addURL method
* instead of stacking a new URLClassLoader on top of it.
*/
private[hive] val classLoader: MutableURLClassLoader = {
new NonClosableMutableURLClassLoader(baseClassLoader)
new MutableURLClassLoader(Array.empty, baseClassLoader)
}

private[hive] def addJar(path: URL): Unit = {
Expand All @@ -88,15 +99,15 @@ private[hive] class IsolatedClientLoader(
sparkConf,
hadoopConf,
config,
baseClassLoader,
classLoader,
this).asInstanceOf[HiveClientImpl]
} else {
ctor.newInstance(
version,
sparkConf,
hadoopConf,
config,
baseClassLoader,
classLoader,
this).asInstanceOf[HiveClientImpl]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.AddJarCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{AddFileCommand, AddJarCommand, CreateFunctionCommand}
import org.apache.spark.sql.types._

import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
Expand Down Expand Up @@ -308,7 +309,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
}

private def localizeAndAndResource(path: String): Unit = try {
private def localizeAndAndResource(path: String): Option[String] = try {
if (isResourceDownloadable(path)) {
val src = new Path(path)
val destFileName = src.getName
Expand All @@ -317,12 +318,37 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
val fs = src.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
fs.copyToLocalFile(src, new Path(destFile))
FileUtil.chmod(destFile, "ugo+rx", true)
AddJarCommand(destFile).run(session.sparkSession)
Some(destFile)
} else {
None
}
} catch {
case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e)
}

private[operation] def transform(plan: LogicalPlan): LogicalPlan = plan match {
case c: CreateFunctionCommand =>
val resources =
ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]]
resources.foreach {
case FunctionResource(JarResource, uri) =>
localizeAndAndResource(uri).map(path => AddJarCommand(path).run(sparkSession))
case FunctionResource(FileResource, uri) =>
localizeAndAndResource(uri).map(path => AddFileCommand(path).run(sparkSession))
case o =>
throw new KyuubiSQLException(s"Resource Type '${o.resourceType}' is not supported.")
}
if (resources.isEmpty) {
c
} else {
ReflectUtils.setFieldValue(c, "resources", Seq.empty[FunctionResource])
c
}
case a: AddJarCommand => localizeAndAndResource(a.path).map(AddJarCommand).getOrElse(a)
case a: AddFileCommand => localizeAndAndResource(a.path).map(AddFileCommand).getOrElse(a)
case _ => plan
}

private def execute(): Unit = {
try {
statementId = UUID.randomUUID().toString
Expand All @@ -344,19 +370,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
KyuubiSparkUtil.setActiveSparkContext(sparkSession.sparkContext)

val parsedPlan = SparkSQLUtils.parsePlan(sparkSession, statement)
parsedPlan match {
case c if c.nodeName == "CreateFunctionCommand" =>
val resources =
ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]]
resources.foreach { case FunctionResource(_, uri) =>
localizeAndAndResource(uri)
}
case a if a.nodeName == "AddJarCommand" =>
val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String]
localizeAndAndResource(path)
case _ =>
}
result = SparkSQLUtils.toDataFrame(sparkSession, parsedPlan)
result = SparkSQLUtils.toDataFrame(sparkSession, transform(parsedPlan))
KyuubiServerMonitor.getListener(session.getUserName).foreach {
_.onStatementParsed(statementId, result.queryExecution.toString())
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf}
trait SecuredFunSuite {

var kdc: MiniKdc = null
val baseDir = KyuubiSparkUtil.createTempDir("kyuubi-kdc")
val baseDir = KyuubiSparkUtil.createTempDir(
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc")
try {
val kdcConf = MiniKdc.createConf()
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,21 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.command.CreateFunctionCommand
import org.apache.spark.sql.internal.SQLConf
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar

import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.FetchOrientation.FETCH_NEXT
import yaooqinn.kyuubi.schema.ColumnBasedSet
import yaooqinn.kyuubi.session.{KyuubiSession, SessionManager}
import yaooqinn.kyuubi.spark.SparkSessionWithUGI
import yaooqinn.kyuubi.utils.ReflectUtils

class KyuubiOperationSuite extends SparkFunSuite {
class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {

val conf = new SparkConf(loadDefaults = true).setAppName("operation test")
KyuubiSparkUtil.setupCommonConfig(conf)
Expand Down Expand Up @@ -151,4 +158,36 @@ class KyuubiOperationSuite extends SparkFunSuite {
assert(!KyuubiOperation.isResourceDownloadable("file://a/b/c.jar"))
assert(!KyuubiOperation.isResourceDownloadable("dfs://a/b/c.jar"))
}

test("transform plan") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)

val parser = new SparkSqlParser(new SQLConf)
val plan0 = parser.parsePlan("create temporary function a as 'a.b.c'")
assert(op.transform(plan0) === plan0)

val plan1 = parser.parsePlan(
"create temporary function a as 'a.b.c' using file 'hdfs://a/b/c.jar'")
val e1 = intercept[KyuubiSQLException](op.transform(plan1))
assert(e1.getMessage.startsWith("Failed to read external resource"))

val plan2 = parser.parsePlan(
"create temporary function a as 'a.b.c' using jar 'hdfs://a/b/c.jar'")
val e2 = intercept[KyuubiSQLException](op.transform(plan2))
assert(e2.getMessage.startsWith("Failed to read external resource"))

val resources = mock[Seq[FunctionResource]]
when(resources.isEmpty).thenReturn(false)

val command = plan2.asInstanceOf[CreateFunctionCommand].copy(resources = resources)
val plan4 = op.transform(command)
assert(plan4 === command)
assert(plan4.asInstanceOf[CreateFunctionCommand].resources !== resources)

val plan5 = parser.parsePlan(
"create temporary function a as 'a.b.c' using archive 'hdfs://a/b/c.jar'")

val e3 = intercept[KyuubiSQLException](op.transform(plan5))
assert(e3.getMessage.startsWith("Resource Type"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.operation

import java.io.File

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster}
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.internal.SQLConf

import yaooqinn.kyuubi.utils.ReflectUtils

class KyuubiOperationWithHDFSSuite extends KyuubiOperationSuite {
val hdfsConf = new HdfsConfiguration
var cluster: MiniDFSCluster = new MiniDFSCluster.Builder(hdfsConf).build()
cluster.waitClusterUp()
val fs = cluster.getFileSystem
val homeDirectory: Path = fs.getHomeDirectory
private val fileName = "example-1.0.0-SNAPSHOT.jar"
private val remoteUDFFile = new Path(homeDirectory, fileName)

override def beforeAll(): Unit = {
val file = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation + fileName)
val localUDFFile = new Path(file.getPath)
fs.copyFromLocalFile(localUDFFile, remoteUDFFile)
super.beforeAll()
}

override def afterAll(): Unit = {
fs.delete(remoteUDFFile, true)
fs.close()
cluster.shutdown()
super.afterAll()
}

test("transform logical plan") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
val parser = new SparkSqlParser(new SQLConf)
val plan0 = parser.parsePlan(
s"create temporary function a as 'a.b.c' using file '$remoteUDFFile'")
val plan1 = op.transform(plan0)
assert(plan0 === plan1)
assert(
ReflectUtils.getFieldValue(plan1, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty)

val plan2 = parser.parsePlan(
s"create temporary function a as 'a.b.c' using jar '$remoteUDFFile'")
val plan3 = op.transform(plan2)
assert(plan3 === plan2)
assert(
ReflectUtils.getFieldValue(plan3, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class KyuubiServerSuite extends SparkFunSuite with BeforeAndAfterEach {
test("disable fs caches for secured cluster") {

var kdc: MiniKdc = null
val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc")
val baseDir = KyuubiSparkUtil.createTempDir(
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc")
try {
val kdcConf = MiniKdc.createConf()
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,38 @@

package yaooqinn.kyuubi.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.{ApplicationId, ContainerLaunchContext, Resource, YarnApplicationState}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.SparkFunSuite
import org.scalatest.BeforeAndAfterEach

class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach {

private var cluster: MiniYARNCluster = _
private val cluster: MiniYARNCluster =
new MiniYARNCluster(this.getClass.getSimpleName, 1, 1, 1, 1)
private val yarnClient = YarnClient.createYarnClient()

override def beforeAll(): Unit = {
cluster = new MiniYARNCluster(this.getClass.getSimpleName, 1, 1, 1, 1)
val hadoopConf = new Configuration()
cluster.init(hadoopConf)
val yarnConf = new YarnConfiguration()
yarnConf.set(YarnConfiguration.IS_MINI_YARN_CLUSTER, "true")
cluster.init(yarnConf)
cluster.start()
yarnClient.init(hadoopConf)
yarnClient.init(yarnConf)
yarnClient.start()
super.beforeAll()
}

override def afterAll(): Unit = {
yarnClient.stop()
cluster.stop()
super.afterAll()
}

override def beforeEach(): Unit = {

super.beforeEach()
}

Expand Down

0 comments on commit ba9b424

Please sign in to comment.