Skip to content

Commit

Permalink
[SPARK-34772][SQL] RebaseDateTime loadRebaseRecords should use Spark …
Browse files Browse the repository at this point in the history
…classloader instead of context

Change context classloader to Spark classloader at `RebaseDateTime.loadRebaseRecords`

With custom `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`.

Spark would use date formatter in `HiveShim` that convert `date` to `string`, if we set `spark.sql.legacy.timeParserPolicy=LEGACY` and the partition type is `date` the `RebaseDateTime` code will be invoked. At that moment, if `RebaseDateTime` is initialized the first time then context class loader is `IsolatedClientLoader`. Such error msg would throw:

```
java.lang.IllegalArgumentException: argument "src" is null
  at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4413)
  at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3157)
  at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue(ScalaObjectMapper.scala:187)
  at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue$(ScalaObjectMapper.scala:186)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$$anon$1.readValue(RebaseDateTime.scala:267)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.loadRebaseRecords(RebaseDateTime.scala:269)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.<init>(RebaseDateTime.scala:291)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.<clinit>(RebaseDateTime.scala)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94)
  at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138)
  at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785)
  at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826)
```

```
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.util.RebaseDateTime$
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94)
  at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138)
  at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785)
  at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826)
  at scala.collection.immutable.Stream.flatMap(Stream.scala:493)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convertFilters(HiveShim.scala:826)
  at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:848)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:749)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:747)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionsByFilter$1(HiveExternalCatalog.scala:1273)
```

The reproduce steps:
1. `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`.
2. `CREATE TABLE t (c int) PARTITIONED BY (p date)`
3. `SET spark.sql.legacy.timeParserPolicy=LEGACY`
4. `SELECT * FROM t WHERE p='2021-01-01'`

Yes, bug fix.

pass `org.apache.spark.sql.catalyst.util.RebaseDateTimeSuite` and add new unit test to `HiveSparkSubmitSuite.scala`.

Closes apache#31864 from ulysses-you/SPARK-34772.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 5850956)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
  • Loading branch information
ulysses-you authored and 839224346 committed Oct 27, 2021
1 parent 2389b0d commit 2b4aebc
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper

import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.util.Utils

/**
* The collection of functions for rebasing days and microseconds from/to the hybrid calendar
Expand Down Expand Up @@ -263,7 +264,7 @@ object RebaseDateTime {
// `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because
// it is 2 times faster in DateTimeRebaseBenchmark.
private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = {
val file = Thread.currentThread().getContextClassLoader.getResource(fileName)
val file = Utils.getSparkClassLoader.getResource(fileName)
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveContext}
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
import org.apache.spark.sql.internal.SQLConf.{LEGACY_TIME_PARSER_POLICY, SHUFFLE_PARTITIONS}
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.tags.{ExtendedHiveTest, SlowHiveTest}
Expand Down Expand Up @@ -154,7 +154,7 @@ class HiveSparkSubmitSuite
// For more detail, see sql/hive/src/test/resources/regression-test-SPARK-8489/*scala.
// TODO: revisit for Scala 2.13 support
val version = Properties.versionNumberString match {
case v if v.startsWith("2.12") || v.startsWith("2.13") => v.substring(0, 4)
case v if v.startsWith("2.12") => v.substring(0, 4)
case x => throw new Exception(s"Unsupported Scala Version: $x")
}
val jarDir = getTestResourcePath("regression-test-SPARK-8489")
Expand Down Expand Up @@ -433,11 +433,10 @@ object SetWarehouseLocationTest extends Logging {

}

val qualifiedWHPath = FileUtils.makeQualified(
new Path(expectedWarehouseLocation), sparkSession.sparkContext.hadoopConfiguration).toString
if (sparkSession.conf.get(WAREHOUSE_PATH.key) != qualifiedWHPath) {
if (sparkSession.conf.get(WAREHOUSE_PATH.key) != expectedWarehouseLocation) {
throw new Exception(
s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location $qualifiedWHPath.")
s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location " +
s"$expectedWarehouseLocation.")
}

val catalog = sparkSession.sessionState.catalog
Expand All @@ -450,7 +449,7 @@ object SetWarehouseLocationTest extends Logging {
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
val expectedLocation =
CatalogUtils.stringToURI(s"$qualifiedWHPath/testlocation")
CatalogUtils.stringToURI(s"file:${expectedWarehouseLocation.toString}/testlocation")
val actualLocation = tableMetadata.location
if (actualLocation != expectedLocation) {
throw new Exception(
Expand All @@ -466,7 +465,7 @@ object SetWarehouseLocationTest extends Logging {
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
val expectedLocation = CatalogUtils.stringToURI(
s"$qualifiedWHPath/testlocationdb.db/testlocation")
s"file:${expectedWarehouseLocation.toString}/testlocationdb.db/testlocation")
val actualLocation = tableMetadata.location
if (actualLocation != expectedLocation) {
throw new Exception(
Expand Down Expand Up @@ -737,7 +736,7 @@ object SPARK_9757 extends QueryTest {
val df =
hiveContext
.range(10)
.select(call_udf("struct", ($"id" + 0.2) cast DecimalType(10, 3)) as "dec_struct")
.select(callUDF("struct", ($"id" + 0.2) cast DecimalType(10, 3)) as "dec_struct")
df.write.option("path", dir.getCanonicalPath).mode("overwrite").saveAsTable("t")
checkAnswer(hiveContext.table("t"), df)
}
Expand Down Expand Up @@ -796,6 +795,8 @@ object SPARK_14244 extends QueryTest {
val hiveContext = new TestHiveContext(sparkContext)
spark = hiveContext.sparkSession

import hiveContext.implicits._

try {
val window = Window.orderBy("id")
val df = spark.range(2).select(cume_dist().over(window).as("cdist")).orderBy("cdist")
Expand Down Expand Up @@ -871,18 +872,3 @@ object SPARK_18989_DESC_TABLE {
}
}
}

object SPARK_34772 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config(UI_ENABLED.key, "false")
.enableHiveSupport()
.getOrCreate()
try {
spark.sql("CREATE TABLE t (c int) PARTITIONED BY (p date)")
spark.sql("SELECT * FROM t WHERE p='2021-01-01'").collect()
} finally {
spark.sql("DROP TABLE IF EXISTS t")
}
}
}

0 comments on commit 2b4aebc

Please sign in to comment.