Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29448][SQL] Support the INTERVAL type by Parquet datasource #26102

Closed
wants to merge 19 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Oct 12, 2019

What changes were proposed in this pull request?

In the PR, I propose to support Catalyst's CalendarIntervalType in the Parquet datasource. Interval values are saved as parquet INTERVAL logical type according to the format specification - https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval .

Parquet format allows to store intervals in millisecond precision. Because of this restriction, values of Spark's INTERVAL type have to be truncated to milliseconds before storing to parquet files.

Why are the changes needed?

  • Spark users will be able to load interval columns stored to parquet files in other systems
  • Datasets with interval columns can be stored to parquet files for future processing

Does this PR introduce any user-facing change?

Yes. Before, write to parquet files fails with the error:

scala> spark.range(1).selectExpr("interval 10 year as i").write.parquet("intervals")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.;

After:

scala> spark.range(1).selectExpr("interval 10 year as i").write.parquet("intervals")
scala> spark.read.parquet("intervals").show(false)
+-----------------+
|i                |
+-----------------+
|interval 10 years|
+-----------------+

How was this patch tested?

  • Add tests to ParquetSchemaSuite and ParquetIOSuite
  • by end-to-end test in ParquetQuerySuite which writes intervals and read them back

@SparkQA
Copy link

SparkQA commented Oct 12, 2019

Test build #111969 has finished for PR 26102 at commit 4563a4e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2019

Test build #111972 has finished for PR 26102 at commit 83b4bf2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2019

Test build #111973 has finished for PR 26102 at commit 81ef7be.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2019

Test build #111975 has finished for PR 26102 at commit de0872e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Oct 13, 2019

@cloud-fan @HyukjinKwon @dongjoon-hyun @srowen Could you take a look at the PR, please.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Parquet and Spark support an interval type, it makes some sense to support reading/writing. My only concern is loss of precision, but what can you do, I suppose, besides not support it? I suppose it could issue a warning but that gets noisy, fast.

@@ -535,8 +535,10 @@ case class DataSource(
* Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
*/
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
if (providingClass != classOf[ParquetFileFormat]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there's no cleaner way to do this than an 'instanceof'-style check? it's done a few other places here, so maybe.

@MaxGekk
Copy link
Member Author

MaxGekk commented Oct 13, 2019

My only concern is loss of precision, but what can you do, I suppose, besides not support it?

Just in case, when spark.sql.parquet.outputTimestampType is set to TIMESTAMP_MILLIS, Spark looses precision:

"Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
"means Spark has to truncate the microsecond portion of its timestamp value.")
. This is not default but anyway.

@SparkQA
Copy link

SparkQA commented Oct 14, 2019

Test build #112007 has finished for PR 26102 at commit c260622.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val milliseconds = buf.getInt
var microseconds = milliseconds * DateTimeUtils.MICROS_PER_MILLIS
val days = buf.getInt
val daysInUs = Math.multiplyExact(days, DateTimeUtils.MICROS_PER_DAY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet stores # of days as a separated field because one logical day interval can be 23 or 24 or 25 hours in case of daylight saving. If we convert parquet interval to Spark interval, it's not a truncation but losing information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be fixed only if we change structure of CalendarInterval but such modifications are almost orthogonal to this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't change CalendarInterval, hm, how can we handle the different structure of a Parquet interval without getting it wrong in some cases?

As in the other PR, another option is to refuse to read/write intervals that are longer than a day, I guess?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. According to the SQL standard, hours must be in the range of 0-23
  2. We already loose the information while converting an interval string to a CalendarInterval value:
spark-sql> select interval 1 day 25 hours;
interval 2 days 1 hours

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's quite the issue. If a Parquet INTERVAL of 1 day is stored as "1 day", then adding it to a date will always produce the same time the next day. If we don't represent days separately in CalendarInterval, the 1 day is stored as "86400000000 µs" (right?) Adding that will usually, but not always, produce the same time the next day.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to defend another side :-) but the consequence of storing days separately means that hours are unbounded. In this way, interval 1 day 25 hours and interval 2 days 1 hours are represented differently in parquet - (0, 1, 90000000) and (0, 2, 3600000). As @cloud-fan wrote above, this can lead to different result while adding those intervals to 2 November 2019: 2019-11-02 + interval 1 day 25 hours = 2019-11-04 00:00:00 but 2019-11-02 + interval 2 days 1 hour = 2019-11-04 01:00:00.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's complicated. Those are actually semantically different intervals, so, I don't think it's a problem if they produce different results or are represented differently.

@MaxGekk
Copy link
Member Author

MaxGekk commented Oct 15, 2019

In general, are you ok with the proposed changes?

@cloud-fan
Copy link
Contributor

let's wait for #26134 and revisit this later.

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112674 has finished for PR 26102 at commit 8fd037b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112681 has finished for PR 26102 at commit 032a2ea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2019

Test build #113093 has finished for PR 26102 at commit cf8023a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 1, 2019

let's wait for #26134 and revisit this later.

@cloud-fan @srowen Could you take a look at the PR one more time since it has been unblocked by #26134 recently.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks plausible, I just don't know this code very well. So there should no longer be a difference in reading/writing "1 day" as an interval after the previous change?

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 3, 2019

So there should no longer be a difference in reading/writing "1 day" as an interval after the previous change?

It should be no difference since days are written and read back from a separate field.

@@ -498,10 +498,8 @@ case class DataSource(
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation = {
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this change after we officially make CalendarIntervalType public. i.e. move it to a public package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering what's the relation between this PR and opening CalendarIntervalType? An INTERVAL column could appear as the result of subtraction of 2 datetime columns, and an user may want to store it into fs.

Copy link
Contributor

@cloud-fan cloud-fan Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interval type is kind of an internal type for now. It's a big decision if we can read/write it from/to data sources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and Python and R needs a proper conversion for both to read and write as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are Python and R involved into read/write in parquet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, if Scala API saves interval types:

df.write.parquet("...")

and Python reads it.

spark.read.parquet("...").collect()

There's no way to map it in Python side via collect. In case of Date type, it's mapped to date.date instance in Python.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We gotta make it all supported before exposing it all related interval ones (see #25022 (comment))

val interval = row.getInterval(ordinal)
val buf = ByteBuffer.wrap(reusableBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN)
.putInt((interval.milliseconds()).toInt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk the doc(https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval) says:

three little-endian unsigned integers

what happens if we set negative values for some parts in the interval and negative values are written here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will read them back as negative values: https://github.com/apache/spark/pull/26102/files#diff-35a70bb270f17ea3a1d964c4bec0e0a2R912 . I don't know about other systems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, shouldn't we maybe add an assert to reject negative parts for now? Seems it doesn't comply parquet format. I'm just worried about the case we have to explain this multiple times to users later like https://issues.apache.org/jira/browse/SPARK-20937 and https://issues.apache.org/jira/browse/SPARK-20297

Do you think it is common to use negative parts? If not, let's just disallow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use strict toInt so that we can fail earlier if out of range.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@SparkQA
Copy link

SparkQA commented Dec 11, 2019

Test build #115128 has finished for PR 26102 at commit 9041110.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jan 14, 2020

@cloud-fan Should I close this PR?

@MaxGekk MaxGekk closed this Jan 15, 2020
@MaxGekk MaxGekk deleted the parquet-interval branch June 5, 2020 19:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants