Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28973][SQL] Add TimeType and support java.time.LocalTime as its external type. #25678

Closed
wants to merge 18 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Sep 4, 2019

What changes were proposed in this pull request?

Proposed new type for Catalyst's type system to represent local time. According to the SQL standard, a value of data type TIME comprises values of the datetime fields HOUR, MINUTE and SECOND. It is always a valid time of day.

  • HOUR - hour within day, between 00 and 23
  • MINUTE - minute within hour, between 00 and 59
  • SECOND - second and possibly fraction of a second within minute. Valid range is 0-59.999999

Internally, the TIME type is implemented as Catalyst TimeType and stores a number of microseconds since 00:00:00.000000.

The java class java.time.LocalTime was supported as the external type for TimeType. So, instances of java.time.LocalTime can be parallelized and converted to values of TimeType, and collected back to instances of LocalTime as well. Spark also accepts literals of TimeType with values of LocalTime.

Created an encoder that serializes instances of the java.time.LocalTime classto the internal representation of nullable Catalyst's TimeType.

Why are the changes needed?

Does this PR introduce any user-facing change?

The PR extends existing functionality. So, users can parallelize instances of the java.time.LocalTime class and collect them back:

scala> val ds = Seq(java.time.LocalTime.of(20, 38, 10)).toDS
ds: org.apache.spark.sql.Dataset[java.time.LocalTime] = [value: time]
scala> ds.collect
res5: Array[java.time.LocalTime] = Array(20:38:10)

How was this patch tested?

  • Added a few tests to CatalystTypeConvertersSuite to check conversion from/to java.time.LocalTime.
  • Checking row encoding by new tests in RowEncoderSuite.
  • Making literals of TimeType is tested in LiteralExpressionSuite
  • Check collecting by DatasetSuite and JavaDatasetSuite.

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110123 has finished for PR 25678 at commit a8adade.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk MaxGekk changed the title [WIP][SPARK-28973][SQL] Add TimeType and support java.time.LocalTime as its external type. [SPARK-28973][SQL] Add TimeType and support java.time.LocalTime as its external type. Sep 4, 2019
@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 4, 2019

@cloud-fan @dongjoon-hyun @wangyum May I ask you to review this PR.

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110125 has finished for PR 25678 at commit bd6b976.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 8, 2019

@gatorsmile @rxin @HyukjinKwon Do you have any objections for the new type?

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 10, 2019

@srowen WDYT of the PR?

@srowen
Copy link
Member

srowen commented Sep 10, 2019

It looks pretty thorough to me. I don't see a problem with supporting LocalTime as microseconds since midnight.

You're right that the more important question is whether introducing a catalyst TimeType causes any subtler problems. For example, what if I write this type as Parquet? I get a long type out? are there any related types in Parquet or otherwise that we need to consider the behavior of, whether it's consistent with those? I don't know, those are just the questions I'd have to decide if there is any issue here.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 10, 2019

@srowen Thank you for your quick response.

For example, what if I write this type as Parquet? I get a long type out? are there any related types in Parquet or otherwise that we need to consider the behavior of, whether it's consistent with those?

Parquet has appropriate logical type for TIME, see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#time : "TIME with precision MICROS is used for microsecond precision. It must annotate an int64 that stores the number of microseconds after midnight." This type directly maps to new Catalyst's type.

@srowen
Copy link
Member

srowen commented Sep 10, 2019

This is just for my education, but how is TimeType mapped to Parquet TIME here? Yes, that's good confirmation that this is a common type and we are using the same semantics as, at least, Parquet.

Let's say I wrote to some other system that didn't have such a type, like an RDBMS. It would end up a long? wouldn't cause any particular problem right?

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 10, 2019

how is TimeType mapped to Parquet TIME here?

I think in the same way as TIMESTAMP. We should copy-paste micros from a row and store them as INT64, like there:

case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))

The physical Parquet type will be INT64, and logical type will be TIME_MICROS, like:

case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)

Let's say I wrote to some other system that didn't have such a type, like an RDBMS. It would end up a long? wouldn't cause any particular problem right?

TIME is standard SQL type. If the RDBMS does not support it (that should be rare case), we will store our TIME value as long.

@srowen
Copy link
Member

srowen commented Sep 10, 2019

Just to be clear, does ParquetSchemaConverter need a new case to handle the new type then? to and from Parquet? or is it already handled. Likewise, just trying to figure out how for example our JDBC support translates this type, for an RDBMS that does support TIME, because I don't see any change to the JDBC support. How does this happen? I guess the same question about ORC, etc.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 10, 2019

does ParquetSchemaConverter need a new case to handle the new type then? to and from Parquet? or is it already handled.

We will need to support new type in all data sources. In particular, ParquetSchemaConverter must have special case for TIME.

Likewise, just trying to figure out how for example our JDBC support translates this type, for an RDBMS that does support TIME, because I don't see any change to the JDBC support.

In this PR, I just introduce new type and its external type. To support it by the JDBC data source, need to add separate cases to JdbcUtils like

case TimestampType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
and
case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))

@srowen
Copy link
Member

srowen commented Sep 10, 2019

OK, so we need to add those kinds of translations or else this won't properly convert to types in external systems, and the idea is to follow up with that in another PR? I think that could be fine.

@rxin
Copy link
Contributor

rxin commented Sep 10, 2019 via email

@HyukjinKwon
Copy link
Member

Why do we need a new data type?

Seems like that's answered in PR description. It targets to follow ANSI I guess:

According to the SQL standard, a value of data type TIME comprises values of the datetime fields HOUR, MINUTE and SECOND. It is always a valid time of day.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 11, 2019

Why do we need a new data type?

@rxin , I see the following reasons to add new type TIME:

  1. To be compliant with SQL standard.
  2. To maintain feature parity with PostgreSQL which supports the type. And for example, we have to comment the TIME constructor in our tests for now:
    -- select make_time(8, 20, 0.0);
  3. To be able to read TIME columns and especially write such columns from/to Parquet files (and other formats as well). Let's image, you have a parquet file with TIME columns (https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#time) and wants to update the file, but you cannot do that by Spark for now because Spark cannot write TIME values.
  4. Similar reason as above but related to JDBC is to work with modern RDBMS that highly likely support TIME type. If an user need to write TIME column, she/he cannot do that by Spark without UDFs.
  5. To make Catalyst's time-related types more consistent. For now you can make TIMESTAMP from DATE by casting the former one which is actually DATE + 00:00:00.000000 + TimeZone = TIMESTAMP. You cannot add arbitrary time to date to get a timestamp. If we support TIME type, this will give users more flexibility and makes Spark's type system more consistent: DATE + TIME + TimeZone = TIMESTAMP.

@gatorsmile
Copy link
Member

Supporting Time type is pretty complex and the sizing is pretty big. Can we delay it and do it in the future?

@rxin
Copy link
Contributor

rxin commented Sep 13, 2019

I haven't had a chance but I'd push back against most of the functionalities in the umbrella ticket (Postgres compatibility). While I think it's a good idea in general to align with some existing standard when a functionality exists, a blanket "implement some other database's functionality" is a dangerous umbrella, because the context and use cases are very different between Spark and "some other databases".

A few reasons:

  1. Different systems evolve differently, and systems that have been around longer have a lot of legacy that even the maintainers of such systems regret. For example, maybe nobody (or worse, somebody but less than 0.01% so you can't get rid of it) uses a particular feature.

  2. Database systems have a very different architecture from Spark's more modern big data world. One constraint is that SQL is often the only way in, and SQL is the only way out. UDFs are very difficult to do. Think recursive subquery as an example ... in Spark you can easily accomplish a similar thing by just using the programming language's capabilities, which is far more flexible than SQL's recursion.

  3. There's a cost to adding some new code or feature to the codebase. In the case of a new data type, it has high overhead to the end user and library developer as well, because now they need to handle this data type in their code. It's one thing if a lot of users are asking for this functionality; it's a very different thing if the whole motivation is to add it because we want to have parity with some database that we don't even know if any of the Spark use cases need.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 13, 2019

@rxin how about those real use cases:

  1. Update data in parquet files containing TIME columns? For example, Snowflake supports TIME type, and Presto, BigQuery, Apache Flink as well. Would it be useful for Spark users to be able to read/write/update parquet files written by other systems?

  2. Update tables with TIME columns via JDBC. Most of modern Relational DBMS follow the SQL standard and support the TIME type. Would it be useful for Spark users to be able to update such table via JDBC?

Another argument for the TIME type, we cannot say that Spark SQL is compliant with SQL ANSI standard without supporting its basic type. Is Spark SQL going to be compatible with the SQL standard?

In the case of a new data type, it has high overhead to the end user and library developer as well, because now they need to handle this data type in their code.

This argument blocks any extensions of Spark SQL type systems, actually forever. It seems implementing the INTERVAL type according to the SQL standard has no chance for merging to the upstream: SPARK-27793: Support SQL day-time INTERVAL type and SPARK-27791: Support SQL year-month INTERVAL type. Right?

@marmbrus
Copy link
Contributor

I tend to agree with @rxin here. This is a lot of long term cost without good user justification.

I would be much more sympathetic to an argument that showed lots of use of the TIME data type by potential users of Spark, rather than just pointers to other systems that have this data type.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 15, 2019

I am closing this PR. @srowen @rxin @marmbrus @gatorsmile @HyukjinKwon Thank you for your comments.

@MaxGekk MaxGekk closed this Sep 15, 2019
@rxin
Copy link
Contributor

rxin commented Sep 15, 2019

Thanks @MaxGekk. In case somebody else wonders in the future, there's a lot more complexity to adding a new type than what's shown here. For example, we need to add support for Python and R. We also need to support expressions. Those are much larger than this PR itself.

@hurelhuyag
Copy link

I'm currently using timestamp as time type for create data for rush hour graphic. date part is always epoch (1970-01-01). Because spark don't support TimeType.

My point is micros is not important. We can implement it just like timestamp internally. But date part is always epoch. Only requirement is toString() should print only time part (HH:mm:ss.SSS) and mappable to java.sql.Time, java.time.LocalTime. This is enough. Maybe we can use timestamp directly. only printable representation must be only time part.

@melin
Copy link

melin commented Jul 6, 2022

@MaxGekk Whether to consider supporting the time type again,iceberg and parquet support time type
https://iceberg.apache.org/docs/latest/schemas/

@rxin Spark 3.2 supports INTERVAL type, whether to consider supports Time type

@redblackcoder
Copy link

@MaxGekk @rxin I am also looking into this issue of missing Time type in Spark. I am working with Iceberg tables built on top of parquet files which have Time data type support. Spark cannot read or write to such tables.
Many times, these tables come from other systems like Snowflake, Flink, Google BigQuery, etc., and the inability to read them using spark is limiting the capability to support such tables.

@zeddit
Copy link

zeddit commented Jan 18, 2024

looking forward to this feature because my iceberg table needs time data type.
In the ecosystem of iceberg, I found pyarrow, parquet, flink, trino, starrock support time type and spark and clickhouse seems not to.
And it's really hard for some strong type systems like trino to cast a time type to timestamp by adding '1970-01-01' prefix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet