Skip to content

[SPARK-36182][SQL] Support TimestampNTZ type in Parquet data source #34495

Closed
gengliangwang wants to merge 5 commits intoapache:masterfrom
gengliangwang:parquetTSNTZ
Closed

[SPARK-36182][SQL] Support TimestampNTZ type in Parquet data source #34495
gengliangwang wants to merge 5 commits intoapache:masterfrom
gengliangwang:parquetTSNTZ

Conversation

@gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Nov 5, 2021

What changes were proposed in this pull request?

Support TimestampNTZ type in Parquet data source. In this PR, the timestamp types of Parquet are mapped to the two timestamp types in Spark:

Parquet type Spark Catalyst Type
INT64 Timestamp isAdjustedToUTC = false unit = MILLIS TimestampNTZType
unit = MICROS TimestampNTZType
isAdjustedToUTC = true unit = MILLIS TimestampType
unit = MICROS TimestampType
INT96 Timestamp - - TimestampType

Parquet writer

For TIMESTAMP_NTZ columns, it follows the Parquet Logical Type Definitions and sets the field isAdjustedToUTC as false on writing TIMESTAMP_NTZ columns. The output type is always INT64 in the MICROS time unit. Parquet's timestamp logical annotation can only be used for INT64 so that we won't support writing TIMESTAMP_NTZ as INT96. Otherwise, it is hard to decide the timestamp type on the reader side.

Parquet reader

  • INT96 columns: The reader behavior is the same with Spark 3.2 or prior.
  • Schema inference for INT64 Timestamp columns: for schema inference of one file, Spark infers TIMESTAMP_NTZ or TIMESTAMP_LTZ type according to the annotation flag isAdjustedToUTC.
  • Row converter for INT64 Timestamp columns during reading
    • Given a TIMESTAMP_NTZ Parquet column and a catalyst schema of TIMESTAMP_LTZ type, Spark allows the read
      operation since TIMESTAMP_LTZ is the "wider" type.
    • Given a TIMESTAMP_LTZ Parquet column and a catalyst schema of TIMESTAMP_NTZ type, Spark disallows the read operation since TIMESTAMP_NTZ is the "narrower" type.

Why are the changes needed?

Support TimestampNTZ type in Parquet data source

Does this PR introduce any user-facing change?

Yes, support TimestampNTZ type in Parquet data source

How was this patch tested?

New UTs

@github-actions github-actions bot added the SQL label Nov 5, 2021
@gengliangwang
Copy link
Member Author

cc @sadikovi @cloud-fan @beliefer


void validateTimestampType(DataType sparkType) {
assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
// Throw an exception if the Parquet type is TimestampLTZ and the Catalyst type is TimestampNTZ.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here only reading TimestampLTZ as TimestampNTZ is disallowed. Suggestions are welcome.

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49404/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49405/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49404/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49405/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Test build #144933 has finished for PR 34495 at commit b8e1e02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// For TimestampNTZType column, Spark always output as INT64 with Timestamp annotation in
// MICROS time unit.
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))
Copy link
Contributor

@beliefer beliefer Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal))

}
}

void converterErrorForTimestampNTZ(String parquetType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: convertErrorForTimestampNTZ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

}
}

test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for reading TimestampNTZ as TimestampLTZ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated


void convertErrorForTimestampNTZ(String parquetType) {
throw new RuntimeException("Unable to create Parquet converter for data type " +
DataTypes.TimestampNTZType.json() + " whose Parquet type is " + parquetType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the SQL format? TIMESTAMP WITHOUT TIMEZONE

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are "timestamp_ntz"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not referring to .sql, I mean "TIMESTAMP WITHOUT TIMEZONE".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer keeping "timestamp_ntz", which is the keyword we used in DDL/literals..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, timestamp_ntz is kind of an alias of TIMESTAMP WITHOUT TIMEZONE in Spark today, and a shorter name is better here.

!parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC &&
parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we have a method to return an optional timestamp unit? then we can shorten this very long condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, updated


test("SPARK-36182: writing and reading TimestampNTZType column") {
withTable("ts") {
sql("create table ts (c1 timestamp_ntz) using parquet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test that we can not insert ltz into a ntz column?

Copy link
Member Author

@gengliangwang gengliangwang Nov 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should forbid it in the store assignment rule, which is independent of data sources. I will raise another PR for that.

@SparkQA
Copy link

SparkQA commented Nov 10, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49522/

@SparkQA
Copy link

SparkQA commented Nov 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49531/

@SparkQA
Copy link

SparkQA commented Nov 10, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49531/

@SparkQA
Copy link

SparkQA commented Nov 10, 2021

Test build #145052 has finished for PR 34495 at commit 8a71793.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 10, 2021

Test build #145061 has finished for PR 34495 at commit 760f5b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in ef5278f Nov 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants