From 2b4aebcef5e9daabdcdf378ce44fb3bd7074ff4f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 19 Mar 2021 12:51:43 +0800 Subject: [PATCH] [SPARK-34772][SQL] RebaseDateTime loadRebaseRecords should use Spark classloader instead of context Change context classloader to Spark classloader at `RebaseDateTime.loadRebaseRecords` With custom `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`. Spark would use date formatter in `HiveShim` that convert `date` to `string`, if we set `spark.sql.legacy.timeParserPolicy=LEGACY` and the partition type is `date` the `RebaseDateTime` code will be invoked. At that moment, if `RebaseDateTime` is initialized the first time then context class loader is `IsolatedClientLoader`. Such error msg would throw: ``` java.lang.IllegalArgumentException: argument "src" is null at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4413) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3157) at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue(ScalaObjectMapper.scala:187) at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue$(ScalaObjectMapper.scala:186) at org.apache.spark.sql.catalyst.util.RebaseDateTime$$anon$1.readValue(RebaseDateTime.scala:267) at org.apache.spark.sql.catalyst.util.RebaseDateTime$.loadRebaseRecords(RebaseDateTime.scala:269) at org.apache.spark.sql.catalyst.util.RebaseDateTime$.(RebaseDateTime.scala:291) at org.apache.spark.sql.catalyst.util.RebaseDateTime$.(RebaseDateTime.scala) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94) at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138) at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661) at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785) at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826) ``` ``` java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.util.RebaseDateTime$ at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95) at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94) at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138) at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661) at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785) at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826) at scala.collection.immutable.Stream.flatMap(Stream.scala:493) at org.apache.spark.sql.hive.client.Shim_v0_13.convertFilters(HiveShim.scala:826) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:848) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:749) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273) at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:747) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionsByFilter$1(HiveExternalCatalog.scala:1273) ``` The reproduce steps: 1. `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`. 2. `CREATE TABLE t (c int) PARTITIONED BY (p date)` 3. `SET spark.sql.legacy.timeParserPolicy=LEGACY` 4. `SELECT * FROM t WHERE p='2021-01-01'` Yes, bug fix. pass `org.apache.spark.sql.catalyst.util.RebaseDateTimeSuite` and add new unit test to `HiveSparkSubmitSuite.scala`. Closes #31864 from ulysses-you/SPARK-34772. Authored-by: ulysses-you Signed-off-by: Yuming Wang (cherry picked from commit 58509565f8ece90b3c915a9ebc8f220073c82426) Signed-off-by: Yuming Wang --- .../sql/catalyst/util/RebaseDateTime.scala | 3 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 34 ++++++------------- 2 files changed, 12 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index 46860ae1771de..2999d475fc8f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -29,6 +29,7 @@ import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.util.Utils /** * The collection of functions for rebasing days and microseconds from/to the hybrid calendar @@ -263,7 +264,7 @@ object RebaseDateTime { // `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because // it is 2 times faster in DateTimeRebaseBenchmark. private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = { - val file = Thread.currentThread().getContextClassLoader.getResource(fileName) + val file = Utils.getSparkClassLoader.getResource(fileName) val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 02f5cde4f3e38..b395bd6006f4e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveContext} -import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS +import org.apache.spark.sql.internal.SQLConf.{LEGACY_TIME_PARSER_POLICY, SHUFFLE_PARTITIONS} import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.tags.{ExtendedHiveTest, SlowHiveTest} @@ -154,7 +154,7 @@ class HiveSparkSubmitSuite // For more detail, see sql/hive/src/test/resources/regression-test-SPARK-8489/*scala. // TODO: revisit for Scala 2.13 support val version = Properties.versionNumberString match { - case v if v.startsWith("2.12") || v.startsWith("2.13") => v.substring(0, 4) + case v if v.startsWith("2.12") => v.substring(0, 4) case x => throw new Exception(s"Unsupported Scala Version: $x") } val jarDir = getTestResourcePath("regression-test-SPARK-8489") @@ -433,11 +433,10 @@ object SetWarehouseLocationTest extends Logging { } - val qualifiedWHPath = FileUtils.makeQualified( - new Path(expectedWarehouseLocation), sparkSession.sparkContext.hadoopConfiguration).toString - if (sparkSession.conf.get(WAREHOUSE_PATH.key) != qualifiedWHPath) { + if (sparkSession.conf.get(WAREHOUSE_PATH.key) != expectedWarehouseLocation) { throw new Exception( - s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location $qualifiedWHPath.") + s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location " + + s"$expectedWarehouseLocation.") } val catalog = sparkSession.sessionState.catalog @@ -450,7 +449,7 @@ object SetWarehouseLocationTest extends Logging { val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("default"))) val expectedLocation = - CatalogUtils.stringToURI(s"$qualifiedWHPath/testlocation") + CatalogUtils.stringToURI(s"file:${expectedWarehouseLocation.toString}/testlocation") val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( @@ -466,7 +465,7 @@ object SetWarehouseLocationTest extends Logging { val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB"))) val expectedLocation = CatalogUtils.stringToURI( - s"$qualifiedWHPath/testlocationdb.db/testlocation") + s"file:${expectedWarehouseLocation.toString}/testlocationdb.db/testlocation") val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( @@ -737,7 +736,7 @@ object SPARK_9757 extends QueryTest { val df = hiveContext .range(10) - .select(call_udf("struct", ($"id" + 0.2) cast DecimalType(10, 3)) as "dec_struct") + .select(callUDF("struct", ($"id" + 0.2) cast DecimalType(10, 3)) as "dec_struct") df.write.option("path", dir.getCanonicalPath).mode("overwrite").saveAsTable("t") checkAnswer(hiveContext.table("t"), df) } @@ -796,6 +795,8 @@ object SPARK_14244 extends QueryTest { val hiveContext = new TestHiveContext(sparkContext) spark = hiveContext.sparkSession + import hiveContext.implicits._ + try { val window = Window.orderBy("id") val df = spark.range(2).select(cume_dist().over(window).as("cdist")).orderBy("cdist") @@ -871,18 +872,3 @@ object SPARK_18989_DESC_TABLE { } } } - -object SPARK_34772 { - def main(args: Array[String]): Unit = { - val spark = SparkSession.builder() - .config(UI_ENABLED.key, "false") - .enableHiveSupport() - .getOrCreate() - try { - spark.sql("CREATE TABLE t (c int) PARTITIONED BY (p date)") - spark.sql("SELECT * FROM t WHERE p='2021-01-01'").collect() - } finally { - spark.sql("DROP TABLE IF EXISTS t") - } - } -}