Skip to content

Flink Sink: Add WriteObserver plugin interface for per-record metadata#15784

Open
herbherbherb wants to merge 6 commits intoapache:mainfrom
herbherbherb:flink-sink-write-observer
Open

Flink Sink: Add WriteObserver plugin interface for per-record metadata#15784
herbherbherb wants to merge 6 commits intoapache:mainfrom
herbherbherb:flink-sink-write-observer

Conversation

@herbherbherb
Copy link
Copy Markdown

Closes #15783

Summary

Adds a WriteObserver plugin interface to IcebergSink (Sink V2) that observes each record written and produces per-checkpoint metadata that flows through the entire sink pipeline (writer → serializer → aggregator → committer) to the Iceberg snapshot summary.

Motivation

Users need to extract per-record metadata (e.g., watermark timestamps, data quality scores) at the writer level and attach it to the committed Iceberg snapshot. Currently there is no way to propagate custom metadata from the writer through the aggregator to the committer without subclassing multiple internal classes.

Changes

  • New: WriteObserver.java -- interface with observe(RowData, SinkWriter.Context) and default Map<String, String> snapshotMetadata()
  • New: WriteObserverMetadataHolder.java -- ThreadLocal holder for passing metadata through serialization boundaries
  • Modified: IcebergSinkWriter -- calls observer per-record, collects metadata at checkpoint
  • Modified: WriteResultSerializer -- v2 format carries observer metadata alongside WriteResult
  • Modified: IcebergWriteAggregator -- merges metadata from writer subtasks
  • Modified: IcebergCommittable + IcebergCommittableSerializer -- v2 format with observer metadata (backward-compatible with v1)
  • Modified: IcebergCommitter -- applies observer metadata as snapshot properties
  • Modified: IcebergSink.Builder -- new writeObserver() method

Compatibility

  • No behavioral change when the observer is not set (null default)
  • Serializers v2 can deserialize v1 payloads (backward compatible)
  • No changes to public API signatures of existing methods

@github-actions github-actions bot added the flink label Mar 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flink Sink: Add WriteObserver plugin interface for per-record metadata

1 participant