Skip to content

NIFI-15568 Fix Timestamp Partitioning in PutIcebergRecord#10996

Merged
pvillard31 merged 2 commits intoapache:mainfrom
exceptionfactory:NIFI-15568
Mar 12, 2026
Merged

NIFI-15568 Fix Timestamp Partitioning in PutIcebergRecord#10996
pvillard31 merged 2 commits intoapache:mainfrom
exceptionfactory:NIFI-15568

Conversation

@exceptionfactory
Copy link
Contributor

Summary

NIFI-15568 Corrects partition by Timestamp, Date, and Time fields in the PutIcebergRecord Processor and the ParquetIcebergWriter Controller Service.

Based on the initial approach in #10877, changes include selected conversion of fields with java.sql types to corresponding java.time types in the DelegatedRecord class. The conversion process evaluates the record schema to determine the presence of field types requiring conversion, avoiding unnecessary object creation.

Instead of adding the iceberg-data library, which brings in additional transitive dependencies, changes to the ParquetIcebergWriter include a PartitionKeyRecord for wrapping input Iceberg Record objects and returning primitive partition keys following the pattern of the Iceberg InternalRecordWrapper.

New unit tests verify the behavior of the DelegatedRecord with Timestamp, Date, and Time fields, and also verify partition key handling for ParquetIcebergWriter.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Comment on lines +99 to +104
final Types.TimestampType timestampType = (Types.TimestampType) fieldType;
if (timestampType.shouldAdjustToUTC()) {
converter = dateTime -> DateTimeUtil.nanosFromTimestamptz((OffsetDateTime) dateTime);
} else {
converter = dateTime -> DateTimeUtil.nanosFromTimestamp((LocalDateTime) dateTime);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy/paste I guess?

Suggested change
final Types.TimestampType timestampType = (Types.TimestampType) fieldType;
if (timestampType.shouldAdjustToUTC()) {
converter = dateTime -> DateTimeUtil.nanosFromTimestamptz((OffsetDateTime) dateTime);
} else {
converter = dateTime -> DateTimeUtil.nanosFromTimestamp((LocalDateTime) dateTime);
}
final Types.TimestampType timestampType = (Types.TimestampType) fieldType;
if (timestampType.shouldAdjustToUTC()) {
converter = dateTime -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) dateTime);
} else {
converter = dateTime -> DateTimeUtil.microsFromTimestamp((LocalDateTime) dateTime);
}

The existing test testWriteDataFilesPartitionedTimestamp does not catch this because it only verifies that dataFiles has length 1 and a recordCount() of 1, it never asserts on the actual partition values of the output DataFile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I will adjust and add some testing for this behavior.


private StructLike wrapped = null;

@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is needed for casting the array of functional converters.

/**
* Record Converter handles translating field values to types compatible with Apache Iceberg Records
*/
class RecordConverter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support nested records. The PartitionKeyRecord does handle nested structs via its STRUCT converter, but not here. Might be an acceptable limitation for know but wanted to point it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noting this detail, yes, the PartitionKeyRecord follows the pattern of the implementation from Apache Iceberg, but this RecordConverter is more narrowly implemented. The difference in capability is acceptable for now, but I will track it for subsequent improvement with more general support for nested structures.

- Added conditional conversion of partition key fields for ParquetIcebergWriter
- Added conditional conversion of java.sql types to java.time types for PutIcebergRecord
@exceptionfactory
Copy link
Contributor Author

Thanks for the review @pvillard31, I pushed a commit to use the micros conversion method for the Timestamp Type, and added a unit test for non-nested field types of PartitionKeyRecord.

@pvillard31 pvillard31 merged commit 2dd3059 into apache:main Mar 12, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants