feat(flink): collect event time in HoodieRowDataCreateHandle for min/max event time metrics#18250
Conversation
…max event time metrics - Add event time field index from hoodie.payload.event.time.field (DOUBLE, epoch seconds) - Extract event time in write() when !skipMetadataWrite and pass to WriteStatus.markSuccess - Enables Flink ingestion min/max event time in commit stats for latency metrics - Add TestHoodieRowDataCreateHandle for event time field index and extraction Made-with: Cursor
| for (int i = 0; i < rowType.getFieldCount(); i++) { | ||
| if (rowType.getFieldNames().get(i).equals(eventTimeField)) { | ||
| RowType.RowField field = rowType.getFields().get(i); | ||
| if (field.getType().getTypeRoot() == LogicalTypeRoot.DOUBLE) { |
There was a problem hiding this comment.
why double instead of long, seems long type milliseconds is most widely used.
There was a problem hiding this comment.
Change to support both double and long.
| return Option.empty(); | ||
| } | ||
| double eventTimeSeconds = rowData.getDouble(eventTimeFieldIndex); | ||
| long eventTimeMillis = (long) (eventTimeSeconds * 1000); |
There was a problem hiding this comment.
how are we assured for the time unit?
There was a problem hiding this comment.
You are correct. Searched the code and could not find any good way to tell the time unit here. WriteStatus later uses a heauristic from digit length. The easiest here is not do the conversion and continue to let WriteStatus handle that. So if it is double type of seconds, we lose a bit of precision (of milliseconds data), that seems acceptable. Changed the implementation to skip conversion here, and suppors both double/long types.
…ateHandle - Use RowData.FieldGetter for event time field (hoodie.payload.event.time.field) - Support both DOUBLE and BIGINT; getter is null when field not configured or unsupported type - Extract value as long then string (no unit interpretation; WriteStatus infers by length) - Add defensive try-catch in extractEventTimeMetadata for schema mismatch edge cases - Add testEventTimeExtractionWithBigintMillis for BIGINT (millis) coverage Made-with: Cursor
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18250 +/- ##
============================================
- Coverage 57.30% 57.29% -0.01%
Complexity 18561 18561
============================================
Files 1945 1946 +1
Lines 106256 106319 +63
Branches 13131 13140 +9
============================================
+ Hits 60885 60914 +29
- Misses 39648 39677 +29
- Partials 5723 5728 +5
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
…max event time metrics (apache#18250) * feat(flink): collect event time in HoodieRowDataCreateHandle for min/max event time metrics - Add event time field index from hoodie.payload.event.time.field (DOUBLE, epoch seconds) - Extract event time in write() when !skipMetadataWrite and pass to WriteStatus.markSuccess - Enables Flink ingestion min/max event time in commit stats for latency metrics - Add TestHoodieRowDataCreateHandle for event time field index and extraction Made-with: Cursor * feat(flink): support DOUBLE and BIGINT event time in HoodieRowDataCreateHandle - Use RowData.FieldGetter for event time field (hoodie.payload.event.time.field) - Support both DOUBLE and BIGINT; getter is null when field not configured or unsupported type - Extract value as long then string (no unit interpretation; WriteStatus infers by length) - Add defensive try-catch in extractEventTimeMetadata for schema mismatch edge cases - Add testEventTimeExtractionWithBigintMillis for BIGINT (millis) coverage Made-with: Cursor
Describe the issue this Pull Request addresses
Flink bulk insert does not populate min/max event time in WriteStatus. Use cases (e.g. metrics) that rely on event time stats do not work. This PR adds event time collection in
HoodieRowDataCreateHandlewhenhoodie.payload.event.time.fieldis discovered, so Flink WriteStatus can report min/max event time.Summary and Changelog
hoodie.payload.event.time.fieldis configured,HoodieRowDataCreateHandlereads the event time from each record and passes it toWriteStatus.markSuccessas record metadata. WriteStatus then updates min/max event time for the file. No time unit is interpreted in the handle; the value is converted to long then string. WriteStatus infers unit by string length (e.g. 10 digits vs 13) as today.RowData.FieldGetter(Flink API) to read the field by name/type; getter is null when the field is not configured, not found, or unsupported.Impact
hoodie.payload.event.time.field(field name only) is used. When set and the field is DOUBLE or BIGINT, Flink bulk insert will populate min/max event time in WriteStatus. No new config; no change when the config is unset or the field is missing/unsupported.Risk Level
Low. Change is scoped to Flink
HoodieRowDataCreateHandleand its tests. Event time is optional and only applied when the configured field exists and is DOUBLE or BIGINT. Defensive try-catch avoids failing the write on read errors.Documentation Update
None. The config key is existing; behavior is an extension of its use for Flink (min/max event time in WriteStatus). No new configs or defaults.
Contributor's checklist