Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata #28137

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Apr 6, 2020

What changes were proposed in this pull request?

This PR adds a new parquet/avro file metadata: org.apache.spark.legacyDatetime. It indicates that the file was written with the "rebaseInWrite" config enabled, and spark need to do rebase when reading it.

This makes Spark be able to do rebase more smartly:

  1. If we don't know which Spark version writes the file, do rebase if the "rebaseInRead" config is true.
  2. If the file was written by Spark 2.4 and earlier, then do rebase.
  3. If the file was written by Spark 3.0 and later, do rebase if the org.apache.spark.legacyDatetime exists in file metadata.

Why are the changes needed?

It's very easy to have mixed-calendar parquet/avro files: e.g. A user upgrades to Spark 3.0 and writes some parquet files to an existing directory. Then he realizes that the directory contains legacy datetime values before 1582. However, it's too late and he has to find out all the legacy files manually and read them separately.

To support mixed-calendar parquet/avro files, we need to decide to rebase or not based on the file metadata.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Updated test

@cloud-fan
Copy link
Contributor Author

def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
// If there is no version, we return None and let the caller side to decide.
// Files written by Spark 3.0 and later follow the new calendar and don't need to rebase.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map(_ < "3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much, @cloud-fan . This is much better solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files written by Spark 3.0 and later follow the new calendar and don't need to rebase.

This is an unexpected solution for me which doesn't cover some use case from my point of view. I do believe versions in parquet/avro are not enough to make decision about rebasing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use case you pointed out (write with 3.0 and read with 2.4) is a bit artificial. If someone develops a data pipeline, it's very tricky if some parts of the pipeline use 3.0 and some parts use 2.4. IMO the more common problem is to deal with legacy data files.

BTW we can support this use case by creating a new metadata entry to record the calendar, but I think it's too complicated and not worth it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we guarantee forward compatibility - things from Spark 2.4 should work in Spark 3.0 but I don't think the opposite case is guaranteed.

@dongjoon-hyun
Copy link
Member

Could you update the PR description? It should be >= 3.0 instead of > 3.0.

Do not rebase datetime values when reading parquet/avro, if we know the file written version and it's > "3.0".

@MaxGekk
Copy link
Member

MaxGekk commented Apr 6, 2020

I haven't looked at the implementation yet but the description looks dangerous for me:

always write the datetime values to parquet/avro without rebasing

@cloud-fan I think there are at least 2 cases:

  1. Spark 3.0 saves parquets that are supposed to be loaded by Spark 3.0 - no need to rebase
  2. Spark 3.0 saves parquets that should be loaded by Spark 2.4, 2.3 and etc. - need to rebase.

Do you think the seconds case is not possible?

Do not rebase datetime values when reading parquet/avro, if we know the file written version and it's > "3.0"

but you don't know the purpose of written files. Maybe an users wants to save parquet files by Spark 3.0 and read them back by Spark 2.4? For example, Spark 3.0 and Spark 2.4 can be used in different clusters that prepares data for each others. And later, if files are loaded by Spark 3.0, how users should read already saved parquet files. For example, when they migrated from 2.4 to 3.0 in a cluster but files were written for 2.4.

@@ -26,17 +15,6 @@ before 1582, vec off, rebase on 13836 13872
before 1582, vec on, rebase off 3664 3672 7 27.3 36.6 3.5X
before 1582, vec on, rebase on 6049 6078 30 16.5 60.5 2.1X

OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you delete it? How should I know the performance in end-to-end case in write?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you decide to merge this PR, I would ask to do that after the optimizations #28119 otherwise I will not have good base line for end-to-end performance in write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can wait for that. IIUC the optimization can also benefit rebasing when writing ORC files, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

days rebasing in ORC will do after the optimization because it uses DaysWritable which is based on functions touched in this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamps rebasing in ORC won't get any benefits from the optimization because it uses to/fromJavaTimestamp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't toJavaTimestamp benefit from your optimization?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it does not use rebaseGregorianToJulianMicros, see

def toJavaTimestamp(us: SQLTimestamp): Timestamp = {
val ldt = microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime
Timestamp.valueOf(ldt)
}
. We need to migrate it on the function as I am going to do for days rebasing in #28091

@SparkQA
Copy link

SparkQA commented Apr 6, 2020

Test build #120876 has finished for PR 28137 at commit 8295549.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean)
  • class ParquetReadSupport(

@cloud-fan
Copy link
Contributor Author

@MaxGekk It's always an option to ask users to upgrade software if some bugs are not fixed in earlier versions. For example, if Spark 3.0 writes Parquet files and Spark 2.4 may read wrong data, we can ask them to upgrade to Spark 3.0. Forward compatibility is not officially supported.

On the other hand, if Spark 2.4 writes Parquet files and Spark 3.0 may read wrong data, asking users to downgrade is not a good option. This PR tries to improve backward compatibility by rebasing datetime more smartly: don't rely on the config if the file written Spark version is known. I think it's more important than forward compatibility.

@MaxGekk
Copy link
Member

MaxGekk commented Apr 7, 2020

For example, if Spark 3.0 writes Parquet files and Spark 2.4 may read wrong data, we can ask them to upgrade to Spark 3.0. Forward compatibility is not officially supported.

but before you removed the SQL config spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled, it was possible. After this PR, we force users to upgrade all theirs clusters to Spark 3.0, so, partial migrations won't be possible.

@cloud-fan
Copy link
Contributor Author

Partial upgrade is still possible, e.g. write files using 2.4 and read using 3.0. It's not possible to do the opposite though, as forward compatibility is not guaranteed. And that config doesn't really help too much with the forward compatibility: it's hard for users to know when to enable that config.

@MaxGekk
Copy link
Member

MaxGekk commented Apr 7, 2020

It's not possible to do the opposite though, as forward compatibility is not guaranteed.

@cloud-fan I would propose to write value of spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled to Parquet/Avro meta. So, Spark 3.0 could check the flag in read and decide either do rebasing or skip it. If the flag does not present in meta info (saved by Spark 2.4), just do rebasing.

@cloud-fan
Copy link
Contributor Author

Please see #28137 (comment)

My point is: it's more of an artificial use case and I'm not sure it worth introducing a new metadata key for it.

@@ -2522,19 +2522,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I am concerned, though, I suspect it won't be compatible with other external projects if we don't rebase? We have spark.sql.parquet.writeLegacyFormat in particular for the compatibility with Apache Hive and Apache Impala for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People would assume the storage systems to use the ISO standard calendar (proleptic Gregorian calendar) by default. The Parquet spec doesn't define the calendar explicitly but it refers to java 8 time API, which kind of implictly requires proleptic Gregorian calendar.

Spark has this issue because we use java 7 time API before 3.0.

Hive switched to the proleptic Gregorian calendar in 3.1, and AFAIK caused some troubles in other systems, which kind of proves that other systems are using the proleptic Gregorian calendar already.

Impala has added a config convert_legacy_hive_parquet_utc_timestamps to work around it: https://issues.apache.org/jira/browse/IMPALA-3933

Presto also hit issues when reading Hive tables with timestamp: prestodb/presto#12180

That why I think it's better to write datetime values without rebasing, as that's the correct data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigation. SGTM.

@cloud-fan cloud-fan changed the title [SPARK-31361][SQL] Rebase datetime in parquet/avro according to file written Spark version [SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata Apr 15, 2020
@SparkQA
Copy link

SparkQA commented Apr 15, 2020

Test build #121326 has finished for PR 28137 at commit 2e908c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean)
  • class AvroSerializer(
  • class ParquetReadSupport(

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left minor comments

val path2_4 = getResourceAvroFilePath("before_1582_date_v2_4.avro")
val path3_0 = path.getCanonicalPath
val dateStr = "1001-01-01"
Seq(dateStr).toDF("str").select($"str".cast("date").as("date"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you test this things together, I would write by Spark 3.0 with rebase on as well

Row(java.sql.Timestamp.valueOf(tsStr))))
}

withTempPath { path =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it possible to write a generic function which takes as parameters: before_1582_ts_millis_v2_4.avro, timestamp-millis, 1001-01-01 01:02:03.124

@@ -31,16 +31,20 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
* @param convertTz the optional time zone to convert to for int96 data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to for. some of them could be removed?

* Parquet/Avro file metadata key to indicate that the file was written with legacy datetime
* values.
*/
private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDatetime"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not important but we use DateTime in other places. How about org.apache.spark.legacyDateTime

}
}

withTempPath { path =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment as for Avro. I think we could fold the tests

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, looks good.

@SparkQA
Copy link

SparkQA commented Apr 21, 2020

Test build #121576 has finished for PR 28137 at commit 6905247.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.0.

HyukjinKwon pushed a commit that referenced this pull request Apr 21, 2020
…metadata

### What changes were proposed in this pull request?

This PR adds a new parquet/avro file metadata: `org.apache.spark.legacyDatetime`. It indicates that the file was written with the "rebaseInWrite" config enabled, and spark need to do rebase when reading it.

This makes Spark be able to do rebase more smartly:
1. If we don't know which Spark version writes the file, do rebase if the "rebaseInRead" config is true.
2. If the file was written by Spark 2.4 and earlier, then do rebase.
3. If the file was written by Spark 3.0 and later, do rebase if the `org.apache.spark.legacyDatetime` exists in file metadata.

### Why are the changes needed?

It's very easy to have mixed-calendar parquet/avro files: e.g. A user upgrades to Spark 3.0 and writes some parquet files to an existing directory. Then he realizes that the directory contains legacy datetime values before 1582. However, it's too late and he has to find out all the legacy files manually and read them separately.

To support mixed-calendar parquet/avro files, we need to decide to rebase or not based on the file metadata.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Updated test

Closes #28137 from cloud-fan/datetime.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit a5ebbac)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Oct 16, 2020
…gacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that the metadata key 'org.apache.spark.legacyDateTime' is written correctly depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up #28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Oct 16, 2020
…gacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that the metadata key 'org.apache.spark.legacyDateTime' is written correctly depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up #28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 38c05af)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…gacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that the metadata key 'org.apache.spark.legacyDateTime' is written correctly depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up apache#28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes apache#30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 38c05af)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants