Skip to content

Comments

[SPARK-38437][SQL] Lenient serialization of datetime from datasource#35756

Closed
MaxGekk wants to merge 5 commits intoapache:masterfrom
MaxGekk:dynamic-serializer-java-ts
Closed

[SPARK-38437][SQL] Lenient serialization of datetime from datasource#35756
MaxGekk wants to merge 5 commits intoapache:masterfrom
MaxGekk:dynamic-serializer-java-ts

Conversation

@MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Mar 7, 2022

What changes were proposed in this pull request?

In the PR, I propose to support the lenient mode by the row serializer used by datasources to converts rows received from scans. Spark SQL will be able to accept:

  • java.time.Instant and java.sql.Timestamp for the TIMESTAMP type, and
  • java.time.LocalDate and java.sql.Date for the DATE type

independently from the current value of the SQL config spark.sql.datetime.java8API.enabled.

Why are the changes needed?

A datasource might not aware of the Spark SQL config spark.sql.datetime.java8API.enabled if this datasource was developed before the config was introduced by Spark version 3.0.0. In that case, it always return "legacy" timestamps/dates of the types java.sql.Timestamp/java.sql.Date even if an user enabled Java 8 API. As Spark expects java.time.Instant or java.time.LocalDate but gets java.time.Timestamp or java.sql.Date, the user observes the exception:

ERROR SparkExecuteStatementOperation: Error executing query with ac61b10a-486e-463b-8726-3b61da58582e, currentState RUNNING,  
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 8) (10.157.1.194 executor 0): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.sql.Timestamp is not a valid external type for schema of timestamp  
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, instantToMicros, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, loan_perf_date), TimestampType), true, false) AS loan_perf_date#1125  
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:239)  

This PR fixes the issue above. And after the changes, users can use legacy datasource connecters with new Spark versions even when they need to enable Java 8 API.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

By running the affected test suites:

$ build/sbt "test:testOnly *CodeGenerationSuite"
$ build/sbt "test:testOnly *ObjectExpressionsSuite"

and new tests:

$ build/sbt "test:testOnly *RowEncoderSuite"
$ build/sbt "test:testOnly *TableScanSuite"

@github-actions github-actions bot added the SQL label Mar 7, 2022
@xkrogen
Copy link
Contributor

xkrogen commented Mar 7, 2022

This PR seems to me to be moving in the wrong direction. Previously we have compile/analysis-time checks which generate code specifically tailored to either SQL or Java-native types. After this PR, we would relax that compile-time check and instead perform per-row runtime checks on the object type. I would expect this to be detrimental to performance, and is generally contradictory to the approach of performing more analysis at query compile-time to avoid having to do checks at runtime. LMK if I'm missing anything.

@MaxGekk
Copy link
Member Author

MaxGekk commented Mar 8, 2022

Previously we have compile/analysis-time checks which generate code specifically tailored to either SQL or Java-native types.

@xkrogen This PR doesn't weak any compile/analysis-time checks. The goal is to improve user experience with Spark SQL, and make it more flexible to user's input. Currently, Spark support 2 external Java types for Catalyst's timestamp type: java.sql.Timestamp and java.time.Instant, and users/datasource connector can use both but Spark accepts only one based its config spark.sql.datetime.java8API.enabled. Let's imagine the situation when a datasource connector is going to be re-used with new Spark version when the config has been already added and enabled. The datasource doesn't aware of new config, and still pushes old java.sql.Timestamp to Spark but Spark rejects them even it can properly handle it.

After this PR, we would relax that compile-time check and instead perform per-row runtime checks on the object type.

No, it doesn't relax any compile-time checks. We declare that Spark support both Java types for timestamps depending on Spark SQL config. After the PR, Spark will accept both independently from the config.

cc @cloud-fan

@cloud-fan
Copy link
Contributor

Can we narrow the scope down to only data source scan? For dataset, the type is known at compile-time and we can still generate precise code to process either java.sql.Timestamp or java.time.Instant. It's only a problem in data source scan that we don't know the datetime jave type at the compile time.

@MaxGekk
Copy link
Member Author

MaxGekk commented Mar 8, 2022

Can we narrow the scope down to only data source scan?

Sure. I am trying this now.

@MaxGekk MaxGekk changed the title [WIP][SPARK-38437][SQL] Dynamic serialization of Java datetime objects to micros/days [WIP][SPARK-38437][SQL] Lenient serialization of datetime from datasource Mar 8, 2022
@MaxGekk MaxGekk marked this pull request as ready for review March 8, 2022 18:48
@MaxGekk MaxGekk changed the title [WIP][SPARK-38437][SQL] Lenient serialization of datetime from datasource [SPARK-38437][SQL] Lenient serialization of datetime from datasource Mar 8, 2022
@xkrogen
Copy link
Contributor

xkrogen commented Mar 8, 2022

Perhaps I should have been more careful with my wording. I agree that no checks are weakened. However, we previously generated code that was specialized to a single type, either Java native or SQL type. Now, we perform a type-check at runtime, meaning we have to do an instanceof call for every single record. My understanding is that instanceof is pretty fast on modern JVMs, but still, it is additional overhead on a per-record basis, so there is a performance degradation.

I think the PR looks much more reasonable now that it is scoped to only the Datasource API, and I do understand the problem you're addressing, though it still imposes extra overhead for well-behaved datasources that obey the java8API.enabled config. I am wondering if this behavior should be configurable? That would enable performance-sensitive users to generate code which strictly obeys the java8API.enabled config, and users who need to accommodate legacy datasources (which don't obey the config) can accept the small performance hit. We would just need to change true in this line to a configurable value:

val toRow = RowEncoder(StructType.fromAttributes(output), lenient = true).createSerializer()

Not a strong concern, I think it is likely that the performance difference is small, but something to consider.

@cloud-fan
Copy link
Contributor

as long as the data source always returns one type of datetime class, the branch prediction will work pretty well on JVM and the per record instanceof should have nearly 0 overhead.

deserializer,
ClassTag(cls))
}
def apply(schema: StructType): ExpressionEncoder[Row] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put a blank line between methods.

@MaxGekk
Copy link
Member Author

MaxGekk commented Mar 9, 2022

... the per record instanceof should have nearly 0 overhead ...

The datetime ops are pretty expensive, especially when need to look up to history data to calculate time zone offsets. Such op could take milliseconds (or hundreds microseconds) comparing to instanceof which takes a few nanoseconds.

@MaxGekk
Copy link
Member Author

MaxGekk commented Mar 9, 2022

Merging to master. Thank you, @xkrogen and @cloud-fan for review.

@MaxGekk MaxGekk closed this in bd6a3b4 Mar 9, 2022
@xkrogen
Copy link
Contributor

xkrogen commented Mar 9, 2022

Fair points from both of you on branch prediction and relative cost of instanceof vs. datetime ops. Thanks for taking the time to address my concerns.

LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request Mar 10, 2022
### What changes were proposed in this pull request?
In the PR, I propose to support the lenient mode by the row serializer used by datasources to converts rows received from scans. Spark SQL will be able to accept:
- `java.time.Instant` and `java.sql.Timestamp` for the `TIMESTAMP` type, and
- `java.time.LocalDate` and `java.sql.Date` for the `DATE` type

independently from the current value of the SQL config `spark.sql.datetime.java8API.enabled`.

### Why are the changes needed?
A datasource might not aware of the Spark SQL config `spark.sql.datetime.java8API.enabled` if this datasource was developed before the config was introduced by Spark version 3.0.0. In that case, it always return "legacy" timestamps/dates of the types `java.sql.Timestamp`/`java.sql.Date` even if an user enabled Java 8 API. As Spark expects `java.time.Instant` or `java.time.LocalDate` but gets `java.time.Timestamp` or `java.sql.Date`, the user observes the exception:
```java
ERROR SparkExecuteStatementOperation: Error executing query with ac61b10a-486e-463b-8726-3b61da58582e, currentState RUNNING,
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 8) (10.157.1.194 executor 0): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.sql.Timestamp is not a valid external type for schema of timestamp
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, instantToMicros, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, loan_perf_date), TimestampType), true, false) AS loan_perf_date#1125
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:239)
```

This PR fixes the issue above. And after the changes, users can use legacy datasource connecters with new Spark versions even when they need to enable Java 8 API.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *CodeGenerationSuite"
$ build/sbt "test:testOnly *ObjectExpressionsSuite"
```
and new tests:
```
$ build/sbt "test:testOnly *RowEncoderSuite"
$ build/sbt "test:testOnly *TableScanSuite"
```

Closes apache#35756 from MaxGekk/dynamic-serializer-java-ts.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
@klaus-xiong
Copy link

@MaxGekk hello , I find when the time zore is not the UTC(GTM 0) , insert a timestamp data like '1000-01-01 08:00:00' with timezone GTM +8, the select command will returen the date '1000-01-01 00:00:00', i find the code side has reduce the timezone offset about the input timestamp. So, do you think which way is correct with timestamp values in SQL. (a. insert '1000-01-01 08:00:00' return '1000-01-01 08:00:00'; b. insert '1000-01-01 08:00:00' return '1000-01-01 00:00:00')

@cloud-fan
Copy link
Contributor

What's the session timezone of Spark for your table insertion and reading? Can you give some SQL statements to demonstrate your issue?

@klaus-xiong
Copy link

klaus-xiong commented Jul 26, 2022

      val zoneIdAndNames =
        Seq((UTC, "UTC"), (PST, "PST"), (getZoneId("Asia/Shanghai"), "SH"))
      zoneIdAndNames.foreach { case (zoneId, zoneName) =>
        withDefaultTimeZone(zoneId) {
          // In avro the date "0001-01-01 00:00:00" will be read as "0002-01-01 00:00:00"
          Seq("sequencefile", "textfile", "rcfile", "orc", "avro").foreach { format =>
            val tableName = "t_timestamp_" + format + "_" + zoneName
            withTable(tableName) {
              sql(s"CREATE TABLE $tableName(a TIMESTAMP) STORED AS $format")
              val timestamps = Seq(
                "1000-01-01 00:00:00.123",
                "1582-10-15 08:00:00.456",
                "1883-11-18 19:59:59.999",
                "1883-11-18 20:00:00.001",
                "1900-11-18 20:00:00.789",
                "1970-01-01 00:00:00")
              val tsValues = timestamps.map(ts => s"(TIMESTAMP('$ts'))").mkString(",")
              sql(s"INSERT INTO $tableName VALUES $tsValues")
              val df = sql(
                s"""
                   |SELECT
                   |CAST(a AS STRING) AS a
                   |FROM $tableName
                   |ORDER BY a
                 """.stripMargin)
              checkAnswer(df, timestamps.toDF("a"))
            }
          }
        }
      }
  }```
i just test like this ut that i write

@cloud-fan
Copy link
Contributor

@MaxGekk can you take a look? This looks unexpected.

@klaus-xiong
Copy link

What's the session timezone of Spark for your table insertion and reading? Can you give some SQL statements to demonstrate your issue?

== Results ==
!== Correct Answer - 6 ==    == Spark Answer - 6 ==
 struct<a:string>            struct<a:string>
![1000-01-01 00:00:00.123]   [1000-01-01 08:00:00.123]
![1582-10-15 08:00:00.456]   [1582-10-15 16:00:00.456]
![1883-11-18 19:59:59.999]   [1883-11-19 03:59:59.999]
![1883-11-18 20:00:00.001]   [1883-11-19 04:00:00.001]
![1900-11-18 20:00:00.789]   [1900-11-19 04:00:00.789]
![1970-01-01 00:00:00]       [1970-01-01 08:00:00]

org.scalatest.exceptions.TestFailedException: 
Results do not match for query:
Timezone: sun.util.calendar.ZoneInfo[id="GMT-08:00",offset=-28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null

this timezone is PST

@klaus-xiong
Copy link

@MaxGekk can you take a look? This looks unexpected.

If this is a issue , i can fix it.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 26, 2022

// In avro the date "0001-01-01 00:00:00" will be read as "0002-01-01 00:00:00"

Is it avro specific? Do you observe the same w/ parquet? One more thing, the config AVRO_REBASE_MODE_IN_WRITE is set to EXCEPTION by default, so, you should see the exception below since you are trying to write ancient timestamps:

org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.WRITE_ANCIENT_DATETIME] You may get a different result due to the upgrading to Spark >= 3.0:
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z
into Avro files can be dangerous, as the files may be read by Spark 2.x
or legacy versions of Hive later, which uses a legacy hybrid calendar that
is different from Spark 3.0+'s Proleptic Gregorian calendar. See more
details in SPARK-31404. You can set "spark.sql.avro.datetimeRebaseModeInWrite" to "LEGACY" to rebase the
datetime values w.r.t. the calendar difference during writing, to get maximum
interoperability. Or set the config to "CORRECTED" to write the datetime
values as it is, if you are sure that the written files will only be read by
Spark 3.0+ or other systems that use Proleptic Gregorian calendar.

I don't see that you modified spark.sql.avro.datetimeRebaseModeInWrite in the test. That's weird, and should be fixed definitely.

@klaus-xiong
Copy link

// In avro the date "0001-01-01 00:00:00" will be read as "0002-01-01 00:00:00"

Is it avro specific? Do you observe the same w/ parquet? One more thing, the config AVRO_REBASE_MODE_IN_WRITE is set to EXCEPTION by default, so, you should see the exception below since you are trying to write ancient timestamps:

org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.WRITE_ANCIENT_DATETIME] You may get a different result due to the upgrading to Spark >= 3.0:
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z
into Avro files can be dangerous, as the files may be read by Spark 2.x
or legacy versions of Hive later, which uses a legacy hybrid calendar that
is different from Spark 3.0+'s Proleptic Gregorian calendar. See more
details in SPARK-31404. You can set "spark.sql.avro.datetimeRebaseModeInWrite" to "LEGACY" to rebase the
datetime values w.r.t. the calendar difference during writing, to get maximum
interoperability. Or set the config to "CORRECTED" to write the datetime
values as it is, if you are sure that the written files will only be read by
Spark 3.0+ or other systems that use Proleptic Gregorian calendar.

I don't see that you modified spark.sql.avro.datetimeRebaseModeInWrite in the test. That's weird, and should be fixed definitely.

ok, i will do more test about the file format and data, Also the config about the avro and parquet.
And one more thing, is the below result is correct in SQL?

== Results ==
!== Correct Answer - 6 ==    == Spark Answer - 6 ==
 struct<a:string>            struct<a:string>
![1000-01-01 00:00:00.123]   [1000-01-01 08:00:00.123]
![1582-10-15 08:00:00.456]   [1582-10-15 16:00:00.456]
![1883-11-18 19:59:59.999]   [1883-11-19 03:59:59.999]
![1883-11-18 20:00:00.001]   [1883-11-19 04:00:00.001]
![1900-11-18 20:00:00.789]   [1900-11-19 04:00:00.789]
![1970-01-01 00:00:00]       [1970-01-01 08:00:00]

org.scalatest.exceptions.TestFailedException: 
Results do not match for query:
Timezone: sun.util.calendar.ZoneInfo[id="GMT-08:00",offset=-28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 26, 2022

And one more thing, is the below result is correct in SQL?

For sure, the loaded data must be the same as saved data. We have many tests for saving/loading directly to datasources but here you save/load via Hive.

@klaus-xiong
Copy link

And one more thing, is the below result is correct in SQL?

For sure, the loaded data must be the same as saved data. We have many tests for saving/loading directly to datasources but here you save/load via Hive.

yeah, the datasource is correct. the "// In avro the date "0001-01-01 00:00:00" will be read as "0002-01-01 00:00:00"" issue is our own changes to make it incorrect. I found parquet /orc/avro/rcfile in the test can pass, sequencefile/textfile may failed, i can fix it, and cc you to review later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants