Skip to content

Commit

Permalink
[SPARK-34772][SQL] RebaseDateTime loadRebaseRecords should use Spark …
Browse files Browse the repository at this point in the history
…classloader instead of context

Change context classloader to Spark classloader at `RebaseDateTime.loadRebaseRecords`

With custom `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`.

Spark would use date formatter in `HiveShim` that convert `date` to `string`, if we set `spark.sql.legacy.timeParserPolicy=LEGACY` and the partition type is `date` the `RebaseDateTime` code will be invoked. At that moment, if `RebaseDateTime` is initialized the first time then context class loader is `IsolatedClientLoader`. Such error msg would throw:

```
java.lang.IllegalArgumentException: argument "src" is null
  at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4413)
  at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3157)
  at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue(ScalaObjectMapper.scala:187)
  at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue$(ScalaObjectMapper.scala:186)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$$anon$1.readValue(RebaseDateTime.scala:267)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.loadRebaseRecords(RebaseDateTime.scala:269)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.<init>(RebaseDateTime.scala:291)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.<clinit>(RebaseDateTime.scala)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94)
  at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138)
  at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785)
  at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826)
```

```
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.util.RebaseDateTime$
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94)
  at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138)
  at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785)
  at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826)
  at scala.collection.immutable.Stream.flatMap(Stream.scala:493)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convertFilters(HiveShim.scala:826)
  at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:848)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:749)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:747)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionsByFilter$1(HiveExternalCatalog.scala:1273)
```

The reproduce steps:
1. `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`.
2. `CREATE TABLE t (c int) PARTITIONED BY (p date)`
3. `SET spark.sql.legacy.timeParserPolicy=LEGACY`
4. `SELECT * FROM t WHERE p='2021-01-01'`

Yes, bug fix.

pass `org.apache.spark.sql.catalyst.util.RebaseDateTimeSuite` and add new unit test to `HiveSparkSubmitSuite.scala`.

Closes apache#31864 from ulysses-you/SPARK-34772.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 5850956)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
  • Loading branch information
ulysses-you authored and wangyum committed Mar 19, 2021
1 parent c2629a7 commit 9918568
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper

import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.util.Utils

/**
* The collection of functions for rebasing days and microseconds from/to the hybrid calendar
Expand Down Expand Up @@ -263,7 +264,7 @@ object RebaseDateTime {
// `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because
// it is 2 times faster in DateTimeRebaseBenchmark.
private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = {
val file = Thread.currentThread().getContextClassLoader.getResource(fileName)
val file = Utils.getSparkClassLoader.getResource(fileName)
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveContext}
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
import org.apache.spark.sql.internal.SQLConf.{LEGACY_TIME_PARSER_POLICY, SHUFFLE_PARTITIONS}
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.tags.{ExtendedHiveTest, SlowHiveTest}
Expand Down Expand Up @@ -337,6 +337,29 @@ class HiveSparkSubmitSuite
unusedJar.toString)
runSparkSubmit(argsForShowTables)
}

test("SPARK-34772: RebaseDateTime loadRebaseRecords should use Spark classloader " +
"instead of context") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)

// We need to specify the metastore database location in case of conflict with other hive
// versions.
withTempDir { file =>
file.delete()
val metastore = s"jdbc:derby:;databaseName=${file.getAbsolutePath};create=true"

val args = Seq(
"--class", SPARK_34772.getClass.getName.stripSuffix("$"),
"--name", "SPARK-34772",
"--master", "local-cluster[2,1,1024]",
"--conf", s"${LEGACY_TIME_PARSER_POLICY.key}=LEGACY",
"--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1",
"--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven",
"--conf", s"spark.hadoop.javax.jdo.option.ConnectionURL=$metastore",
unusedJar.toString)
runSparkSubmit(args)
}
}
}

object SetMetastoreURLTest extends Logging {
Expand Down Expand Up @@ -845,3 +868,18 @@ object SPARK_18989_DESC_TABLE {
}
}
}

object SPARK_34772 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config(UI_ENABLED.key, "false")
.enableHiveSupport()
.getOrCreate()
try {
spark.sql("CREATE TABLE t (c int) PARTITIONED BY (p date)")
spark.sql("SELECT * FROM t WHERE p='2021-01-01'").collect()
} finally {
spark.sql("DROP TABLE IF EXISTS t")
}
}
}

0 comments on commit 9918568

Please sign in to comment.