fix(table): normalize timestamp units for partitioned writes#1112
fix(table): normalize timestamp units for partitioned writes#1112fallintoplace wants to merge 2 commits into
Conversation
| case arrow.Microsecond: | ||
| return value, nil | ||
| case arrow.Nanosecond: | ||
| return value / 1_000, nil |
There was a problem hiding this comment.
Go's / truncates toward zero, but unit downconversion should floor. For pre-epoch values this rounds the wrong way: ns=-1500 gives -1 here, but the correct μs bin is -2 ([-2000, -1000)). Same class of bug this PR is fixing, wrong partition routing for negative timestamps.
Try
case arrow.Nanosecond:
return math.FloorDiv(value, 1_000), nil
Can you please add a regression test with a negative ns value (e.g. one second before epoch with a sub-μs offset) asserting the partition path.
|
Out of scope: Time64 (line ~381) has the same unit-vs-iceberg.Time bug — follow-up PR. |
9ad9bb9 to
217aa9f
Compare
217aa9f to
c873da9
Compare
| func floorDivInt64(a, b int64) int64 { | ||
| d := a / b | ||
| if (a^b) < 0 && d*b != a { | ||
| d-- | ||
| } | ||
|
|
||
| return d | ||
| } |
There was a problem hiding this comment.
this already exists in the root transforms.go file, we should probably just move the version in transforms.go:579 into an internal/utils.go file and then use that in both places rather than duplicate this function.
| if (value > 0 && value > math.MaxInt64/factor) || | ||
| (value < 0 && value < math.MinInt64/factor) { | ||
| return 0, fmt.Errorf("arrow timestamp value %d overflows int64 when scaled by %d", value, factor) | ||
| } |
There was a problem hiding this comment.
can you add a test that covers this? I don't think it's covered by the current tests
| case iceberg.TimestampType, iceberg.TimestampTzType: | ||
| micros, err := arrowTimestampToMicros(value, timestampType.Unit) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return iceberg.NewLiteral(iceberg.Timestamp(micros)), nil | ||
| case iceberg.TimestampNsType, iceberg.TimestampTzNsType: |
There was a problem hiding this comment.
the Tz variants don't seem to get tested, can you add cases that have TimeZone: "UTC" so we hit this case?
| return nil, fmt.Errorf("failed to find source field ID %d in schema", sourceField.SourceID()) | ||
| } | ||
| partitionColumns[i] = record.Column(colIndices[0]) | ||
| partitionFieldsInfo[i] = partitionFieldInfo{&sourceField, sourceField.FieldID, sourceType} |
There was a problem hiding this comment.
| partitionFieldsInfo[i] = partitionFieldInfo{&sourceField, sourceField.FieldID, sourceType} | |
| partitionFieldsInfo[i] = partitionFieldInfo{ | |
| sourceField: &sourceField, | |
| fieldID: sourceField.FieldID, | |
| sourceType: sourceType, | |
| } |
just so we don't accidentally misorder things
| } | ||
|
|
||
| type partitionFieldInfo struct { | ||
| sourceField *iceberg.PartitionField |
There was a problem hiding this comment.
PartitionField is a small struct, why use a pointer here instead of just using it by value?
| } | ||
| sourceType, ok := schema.FindTypeByID(sourceField.SourceID()) | ||
| if !ok { | ||
| return nil, fmt.Errorf("failed to find source field ID %d in schema", sourceField.SourceID()) |
There was a problem hiding this comment.
can we use something like "failed to find type for source field ID" to distinguish this error from the above identical one?
a138222 to
a138e3d
Compare
64f42d6 to
f0ee48e
Compare
Summary
Why
Partitioned writes compute partition keys before
ToRequestedSchemanormalizes Arrow timestamp arrays to the table schema. The old partition path cast raw Arrow timestamp values directly to Iceberg microsecond timestamps, sotimestamp[s]andtimestamp[ms]inputs could be routed to the wrong day/hour partition even though the data values were later written with normalized units.Fixes #1111.
Testing
go test ./table -run TestFanoutWriter -count=1git diff --check