-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Protocol, Spark] UTC normalize timestamp partition values #3378
[Protocol, Spark] UTC normalize timestamp partition values #3378
Conversation
@@ -377,6 +378,13 @@ object DeltaFileFormatWriter extends LoggingShims { | |||
} | |||
} | |||
|
|||
class PartitionedTaskAttemptContextImpl(conf: Configuration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed a way to actually send the types of the partition columns as part of the task context, that's important to distinguish the timestamp vs timestamp_ntz cases for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's another way to get the partition column type details without this addition, that'd be awesome but didn't see any where else to get these detials
@@ -67,15 +67,15 @@ sealed trait TimestampFormatter extends Serializable { | |||
|
|||
class Iso8601TimestampFormatter( | |||
pattern: String, | |||
timeZone: TimeZone, | |||
timeZone: ZoneId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think changing this poses a compatibility concern as long as the various apply methods still are compatible (My scala is a bit rusty so let me know, but I'm unable to initialize this outside anyways)?
As for why the change:
Spark SQL supports setting any zoneID like "UTC-08:00" etc. However, if one tries to convert this into java timezone type via TimeZone.of(ZoneId.of("UTC-08:00")) then the timezone actualy ends up being just "UTC" because timezone API can't handle the offset -08:00. So passing a zoneID avoids a correctness issue there.
spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/util/PartitionUtils.scala
Outdated
Show resolved
Hide resolved
80125b9
to
8abff59
Compare
@@ -117,7 +117,8 @@ object PartitionSpec { | |||
|
|||
private[delta] object PartitionUtils { | |||
|
|||
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" | |||
lazy val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" | |||
lazy val utcFormatter = TimestampFormatter("yyyy-MM-dd HH:mm:ss.SSSSSSz", ZoneId.of("UTC")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalizes to microseconds like in Delta kernel:
delta/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/PartitionUtils.java
Line 45 in 573a57f
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); |
954568b
to
40c9bf4
Compare
PROTOCOL.md
Outdated
Note: A `timestamp` value in a partition value may be stored in one of the following ways: | ||
1. Without a timezone, where the timestamp should be interpreted using the time zone of the system which wrote to the table. | ||
2. Adjusted to UTC where the partition value must have the suffix "UTC". | ||
|
||
It is highly recommended that modern writers adjust the timestamp value to UTC as outlined in 2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we'd like to do the protocol changes separately, happy to do so, just let me know @scottsand-db @allisonport-db @vkorukanti @lzlfred
@@ -67,7 +70,7 @@ class DelayedCommitProtocol( | |||
// since there's no guarantee the stats will exist. | |||
@transient val addedStatuses = new ArrayBuffer[AddFile] | |||
|
|||
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" | |||
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.SSSSSS][.S]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also addresses an issue of microsecond partitions; not that I'd reccomend identity partitions on microseconds due to extremely small files but it is technically supported by the delta protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure this is explicitly mentioned in the PR description.
@@ -164,7 +183,8 @@ class DelayedCommitProtocol( | |||
*/ | |||
override def newTaskTempFile( | |||
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { | |||
val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String]) | |||
val partitionValues = dir.map(dir => parsePartitions(dir, taskContext)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala esthetics nit. You could use currying:
val partitionValues = dir.map(dir => parsePartitions(dir, taskContext)) | |
val partitionValues = dir.map(parsePartitions(taskContext)) |
And, then the signature of the parsePartitions would look like:
protected def parsePartitions(taskContext: TaskAttemptContext)(dir: String): Map[String, String] = {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could, but then we'd have to adjust the signature of the called function just to support currying...
spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
Show resolved
Hide resolved
.map(l => Cast(l, StringType).eval()) | ||
.map(Option(_).map(_.toString).orNull)) | ||
.toMap | ||
.columnNames |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to minimize the number of unrelated changes
@@ -67,7 +70,7 @@ class DelayedCommitProtocol( | |||
// since there's no guarantee the stats will exist. | |||
@transient val addedStatuses = new ArrayBuffer[AddFile] | |||
|
|||
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" | |||
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.SSSSSS][.S]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure this is explicitly mentioned in the PR description.
// TODO: enable validatePartitionColumns? | ||
val utcTimestampPartitionValues = taskContext.getConfiguration.getBoolean( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like it's the actual values. Let's prefix with "use" to make it clear that it's a boolean.
val utcTimestampPartitionValues = taskContext.getConfiguration.getBoolean( | |
val useUtcTimestampPartitionValues = taskContext.getConfiguration.getBoolean( |
val dateFormatter = DateFormatter() | ||
val timestampFormatter = | ||
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault) | ||
TimestampFormatter(timestampPartitionPattern, sessionTimeZone) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a behavior change as well. The JVM timezone doesn't need to be the same as the session timezone. Given that the old behavior is bad with or without this change, I would suggest leaving it as-is. Then at least the fallback (disabling the config) will work to get back to exactly the old bad behavior, instead of some subtly different bad behavior. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great point. I'll leave it as is. I think arguably it should've been session timezone to begin with, but again since it's already released like this for a while, we shouldn't introduce any new behavior.
if (validatePartitionColumns && columnValue != null && castedValue == null) { | ||
throw DeltaErrors.partitionColumnCastFailed( | ||
columnValue.toString, dataType.toString, columnName) | ||
if (dataType == DataTypes.TimestampType && utcNormalizeTimestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just pass the utcFormatter into inferPartitionColumnValue
? Heck, why are we passing a timestampFormatter
and a boolean here when we could just pass in a the utc formatter through timestampFormatter
from the outside? (If there is a fundamental reason for this, then it needs at least a code comment that explains why we need this extra complexity.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this ended up being answered in #3378 (comment) but let me know if this part is still unclear. Essentially the timestampFormatter
here is for reading the partition string values, and what the boolean specifies is how to format the output partition values.
@@ -164,7 +183,8 @@ class DelayedCommitProtocol( | |||
*/ | |||
override def newTaskTempFile( | |||
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { | |||
val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String]) | |||
val partitionValues = dir.map(dir => parsePartitions(dir, taskContext)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could, but then we'd have to adjust the signature of the called function just to support currying...
val parsedPartition = | ||
PartitionUtils | ||
.parsePartition( | ||
new Path(dir), | ||
typeInference = false, | ||
Set.empty, | ||
Map.empty, | ||
partitionCols, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent situation where named arguments helps with readability. E.g. in the line above (Set.empty
) I have no idea what we're opting out of. Even here, I'm not sure what partitionCols
does here. (It's actually userSpecifiedDataTypes
.) It would also help if we name partitionCols
as partitionColTypes
, to make it clear what it represents.
We could also use a comment that explains why we need to override the types here. I'm starting to think that this PartitionUtils is really weird because it tries to infer the column type from the data. This code seems to have been borrowed from Spark where it might have had a function because the values are parsed from the path. But in Delta we always know the exact data type...
Oh boy. I did some further digging. It seems that we're actually using this only for newTaskTempFile
, which is used to generate the file name for a new file. And that one gets a path that Spark already pre-generated that includes the partition key values, which we then reverse engineer and reconstitute to create the real path. And then we also store those values into the partitions that we're storing into the Delta log, via addedFiles
.
It would be really useful to add that context to the code here as a comment. It's totally unclear that this is the key point where we determine what the partition keys are for a file...
(FWIW, given that context, it would make sense to me to do always pass in the partition columns explicitly, so that we no longer rely on the weird inference logic. At least then this change makes some sort of sense. It would be great if we would not rely on that inference logic anywhere. But I think that's a larger change that we shouldn't do in the same PR.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+100, I'll add some comments explaining all of this since it's def not an intuitive API to work with and in the end we probably want to get away from using it since we know the exact partition column data types for Delta.
val parsedPartition = | ||
PartitionUtils | ||
.parsePartition( | ||
new Path(dir), | ||
typeInference = false, | ||
Set.empty, | ||
Map.empty, | ||
partitionCols, | ||
validatePartitionColumns = false, | ||
java.util.TimeZone.getDefault, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly, this is the timezone that is to be used to parse the input. It is worth a comment that this is different from the output formatting, and that we expect Spark to produce the partitions using the JVM default timezone. (This probably means that Spark has the same issue with partition columns on any other table.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've commented this section in more detail!
spark/src/main/scala/org/apache/spark/sql/delta/files/DelayedCommitProtocol.scala
Outdated
Show resolved
Hide resolved
PROTOCOL.md
Outdated
@@ -1754,13 +1754,16 @@ Type | Serialization Format | |||
string | No translation required | |||
numeric types | The string representation of the number | |||
date | Encoded as `{year}-{month}-{day}`. For example, `1970-01-01` | |||
timestamp | Encoded as `{year}-{month}-{day} {hour}:{minute}:{second}` or `{year}-{month}-{day} {hour}:{minute}:{second}.{microsecond}` For example: `1970-01-01 00:00:00`, or `1970-01-01 00:00:00.123456` | |||
timestamp | Encoded as `{year}-{month}-{day} {hour}:{minute}:{second}` or `{year}-{month}-{day} {hour}:{minute}:{second}.{microsecond}` For example: `1970-01-01 00:00:00`, or `1970-01-01 00:00:00.123456` or `2024-06-14 04:00:00UTC` or `2024-06-14 04:00:00.123456UTC` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz add the Z postfix to the protocol.
@@ -19,21 +19,24 @@ package org.apache.spark.sql.delta.files | |||
// scalastyle:off import.ordering.noEmptyLine | |||
import java.net.URI | |||
import java.util.UUID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to ask to revert all the import ordering changes as those are not related to this feature.
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types.{DataType, StringType, TimestampType} | ||
|
||
import java.time.ZoneId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java import should go to very top.
protected def parsePartitions(dir: String): Map[String, String] = { | ||
// TODO: timezones? | ||
protected def parsePartitions(dir: String, | ||
taskContext: TaskAttemptContext): Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz make the indentation 4 spaces.
val utcTimestampPartitionValues = taskContext.getConfiguration.getBoolean( | ||
DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES.key, true) && | ||
taskContext.isInstanceOf[PartitionedTaskAttemptContextImpl] && | ||
taskContext.asInstanceOf[PartitionedTaskAttemptContextImpl] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz avoid as/isInstanceOf; in almost all cases they can be replaced by pattern matching.
val utcTimestampPartitionValues = taskContext match {
case context:PartitionedTaskAttemptContextImpl if DeltaSQLConf.... =>
case _ => Map.empty
}
@@ -377,6 +378,13 @@ object DeltaFileFormatWriter extends LoggingShims { | |||
} | |||
} | |||
|
|||
class PartitionedTaskAttemptContextImpl(conf: Configuration, | |||
taskId: TaskAttemptID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation.
@@ -17,7 +17,6 @@ | |||
package org.apache.spark.sql.delta.files | |||
|
|||
import scala.collection.mutable.ListBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert changes in this file.
faf26d9
to
78f1a38
Compare
78f1a38
to
a181752
Compare
This change depends on the protocol changes from #3398 |
61b5669
to
ae1cd95
Compare
…to UTC and Spark writes these adjusted values
ae1cd95
to
4028268
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM !!!
Which Delta project/connector is this regarding?
Description
Currently, in the Delta Protocol, timestamps are not stored with their time zone. This leads to unexpected behavior when querying across systems with different timezones configured (e.g. different spark sessions for instance). For instance in Spark, the timestamp value will be adjusted to spark session time zone and written to the delta log partition values without TZ. If someone were to query the same "timestamp" from a different session timezone, the same time zone value it can fail to surface results due to partition pruning.
What this change proposes to the delta lake protocol is to allow timestamp partition values to be adjusted to UTC and explicitly stored in partition values with a UTC suffix. The original approach is still supported for compatibility but it is recommended for newer writers to write with UTC suffix.
This is also important for Iceberg Uniform conversion because Iceberg timestamps must be UTC adjusted. Now we have a well defined format for UTC in delta, we can convert string partition values to Iceberg longs to make Uniform conversion succeed.
This change updates the Spark-Delta integration to write out the UTC adjusted values for timestamp types.
This also addresses an issue of microsecond partitions where previously microsecond partitioning (not recommended but technically allowed) would not work and be truncated to seconds.
How was this patch tested?
Added unit tests for the following cases:
1.) UTC timestamp partition values round trip across different session TZ
2.) A delta log with a mix of Non-UTC and UTC partition values round trip across the same session TZ
3.) Timestamp No Timezone round trips across timezones (kind of a tautology but important to make sure that the timestamp_ntz does not get written with UTC timestamp unintentionally)
4.) Timestamp round trips across same session time zone: UTC normalized
5.) Timestamp round trips across same session time zone: session time normalized (this case worked before this change, so it's important that it keeps working after this change)
Mix of microsecond/second level precision and dates before epoch (to test if everything works with negative)
Does this PR introduce any user-facing changes?
Yes in the sense that new timestamp partition values will be the normalized UTC values.