Skip to content

Commit

Permalink
Just apply Spark ClassLoader to the test as only controlling thread c…
Browse files Browse the repository at this point in the history
…lassloader for tests in SQLQuerySuite didn't work
  • Loading branch information
HeartSaVioR committed Dec 28, 2019
1 parent 02188e0 commit b801ac5
Showing 1 changed file with 33 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
Expand Down Expand Up @@ -70,17 +71,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext._
import spark.implicits._

private var threadContextClassLoader: ClassLoader = _

override protected def beforeEach(): Unit = {
threadContextClassLoader = Thread.currentThread().getContextClassLoader
}

override protected def afterEach(): Unit = {
Thread.currentThread().setContextClassLoader(threadContextClassLoader)
}


test("query global temp view") {
val df = Seq(1).toDF("i1")
df.createGlobalTempView("tbl1")
Expand Down Expand Up @@ -2505,40 +2495,44 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

test("SPARK-26560 Spark should be able to run Hive UDF using jar regardless of " +
"current thread context classloader") {
withUserDefinedFunction("udtf_count3" -> false) {
val oldClassLoader = Thread.currentThread().getContextClassLoader
// force to use Spark classloader as other test (even in other test suites) may change the
// current thread's context classloader to jar classloader
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
withUserDefinedFunction("udtf_count3" -> false) {
val sparkClassLoader = Thread.currentThread().getContextClassLoader

// This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly
// modified version of GenericUDTFCount2 in hive/contrib, which emits the count for
// three times.
val jarPath = "src/test/TestUDTF-dynamicload.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

// This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly
// modified version of GenericUDTFCount2 in hive/contrib, which emits the count for
// three times.
val jarPath = "src/test/TestUDTF-dynamicload.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

sql(
s"""
|CREATE FUNCTION udtf_count3
|AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3'
|USING JAR '$jarURL'
sql(
s"""
|CREATE FUNCTION udtf_count3
|AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3'
|USING JAR '$jarURL'
""".stripMargin)

assert(Thread.currentThread().getContextClassLoader eq oldClassLoader)
assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)

// JAR will be loaded at first usage, and it will change the current thread's
// context classloader to jar classloader in sharedState.
// See SessionState.addJar for details.
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)
// JAR will be loaded at first usage, and it will change the current thread's
// context classloader to jar classloader in sharedState.
// See SessionState.addJar for details.
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)

assert(Thread.currentThread().getContextClassLoader ne oldClassLoader)
assert(Thread.currentThread().getContextClassLoader eq
spark.sqlContext.sharedState.jarClassLoader)
assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
assert(Thread.currentThread().getContextClassLoader eq
spark.sqlContext.sharedState.jarClassLoader)

// Roll back to the original classloader and run query again.
Thread.currentThread().setContextClassLoader(oldClassLoader)
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)
// Roll back to the original classloader and run query again.
Thread.currentThread().setContextClassLoader(sparkClassLoader)
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)
}
}
}
}

0 comments on commit b801ac5

Please sign in to comment.