From 58509565f8ece90b3c915a9ebc8f220073c82426 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 19 Mar 2021 12:51:43 +0800 Subject: [PATCH] [SPARK-34772][SQL] RebaseDateTime loadRebaseRecords should use Spark classloader instead of context ### What changes were proposed in this pull request? Change context classloader to Spark classloader at `RebaseDateTime.loadRebaseRecords` ### Why are the changes needed? With custom `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`. Spark would use date formatter in `HiveShim` that convert `date` to `string`, if we set `spark.sql.legacy.timeParserPolicy=LEGACY` and the partition type is `date` the `RebaseDateTime` code will be invoked. At that moment, if `RebaseDateTime` is initialized the first time then context class loader is `IsolatedClientLoader`. Such error msg would throw: ``` java.lang.IllegalArgumentException: argument "src" is null at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4413) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3157) at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue(ScalaObjectMapper.scala:187) at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue$(ScalaObjectMapper.scala:186) at org.apache.spark.sql.catalyst.util.RebaseDateTime$$anon$1.readValue(RebaseDateTime.scala:267) at org.apache.spark.sql.catalyst.util.RebaseDateTime$.loadRebaseRecords(RebaseDateTime.scala:269) at org.apache.spark.sql.catalyst.util.RebaseDateTime$.(RebaseDateTime.scala:291) at org.apache.spark.sql.catalyst.util.RebaseDateTime$.(RebaseDateTime.scala) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94) at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138) at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661) at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785) at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826) ``` ``` java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.util.RebaseDateTime$ at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94) at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138) at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661) at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785) at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826) at scala.collection.immutable.Stream.flatMap(Stream.scala:493) at org.apache.spark.sql.hive.client.Shim_v0_13.convertFilters(HiveShim.scala:826) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:848) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:749) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273) at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:747) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionsByFilter$1(HiveExternalCatalog.scala:1273) ``` The reproduce steps: 1. `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`. 2. `CREATE TABLE t (c int) PARTITIONED BY (p date)` 3. `SET spark.sql.legacy.timeParserPolicy=LEGACY` 4. `SELECT * FROM t WHERE p='2021-01-01'` ### Does this PR introduce _any_ user-facing change? Yes, bug fix. ### How was this patch tested? pass `org.apache.spark.sql.catalyst.util.RebaseDateTimeSuite` and add new unit test to `HiveSparkSubmitSuite.scala`. Closes #31864 from ulysses-you/SPARK-34772. Authored-by: ulysses-you Signed-off-by: Yuming Wang --- .../sql/catalyst/util/RebaseDateTime.scala | 3 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 40 ++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index 46860ae1771de..2999d475fc8f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -29,6 +29,7 @@ import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.util.Utils /** * The collection of functions for rebasing days and microseconds from/to the hybrid calendar @@ -263,7 +264,7 @@ object RebaseDateTime { // `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because // it is 2 times faster in DateTimeRebaseBenchmark. private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = { - val file = Thread.currentThread().getContextClassLoader.getResource(fileName) + val file = Utils.getSparkClassLoader.getResource(fileName) val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index af8a23d9a8b3f..242a86b1f5ff7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveContext} -import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS +import org.apache.spark.sql.internal.SQLConf.{LEGACY_TIME_PARSER_POLICY, SHUFFLE_PARTITIONS} import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.tags.{ExtendedHiveTest, SlowHiveTest} @@ -338,6 +338,29 @@ class HiveSparkSubmitSuite unusedJar.toString) runSparkSubmit(argsForShowTables) } + + test("SPARK-34772: RebaseDateTime loadRebaseRecords should use Spark classloader " + + "instead of context") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + + // We need to specify the metastore database location in case of conflict with other hive + // versions. + withTempDir { file => + file.delete() + val metastore = s"jdbc:derby:;databaseName=${file.getAbsolutePath};create=true" + + val args = Seq( + "--class", SPARK_34772.getClass.getName.stripSuffix("$"), + "--name", "SPARK-34772", + "--master", "local-cluster[2,1,1024]", + "--conf", s"${LEGACY_TIME_PARSER_POLICY.key}=LEGACY", + "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1", + "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven", + "--conf", s"spark.hadoop.javax.jdo.option.ConnectionURL=$metastore", + unusedJar.toString) + runSparkSubmit(args) + } + } } object SetMetastoreURLTest extends Logging { @@ -847,3 +870,18 @@ object SPARK_18989_DESC_TABLE { } } } + +object SPARK_34772 { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .config(UI_ENABLED.key, "false") + .enableHiveSupport() + .getOrCreate() + try { + spark.sql("CREATE TABLE t (c int) PARTITIONED BY (p date)") + spark.sql("SELECT * FROM t WHERE p='2021-01-01'").collect() + } finally { + spark.sql("DROP TABLE IF EXISTS t") + } + } +}