Skip to content

Flink Sink: Add WriteObserver plugin interface for per-record metadata #15783

@herbherbherb

Description

@herbherbherb

Query engine

Flink

Feature Request / Improvement

Add a WriteObserver plugin interface to IcebergSink (Sink V2) that observes each record written and produces per-checkpoint metadata that flows through the entire sink pipeline to the Iceberg snapshot summary.

Motivation

Users need to extract per-record metadata (e.g., watermark timestamps, data quality scores) at the writer level and attach it to the committed Iceberg snapshot. Currently there is no way to propagate custom metadata from the writer through the aggregator to the committer without subclassing multiple internal classes (IcebergSinkWriter, IcebergWriteAggregator, IcebergCommittable, IcebergCommitter).

This PR adds a WriteObserver that is called for each record in IcebergSinkWriter.write(). At checkpoint time, the observer's accumulated metadata is carried through the serialization boundary via a ThreadLocal holder, serialized alongside WriteResult and IcebergCommittable, merged across parallel writer subtasks in the aggregator, and applied as additional Iceberg snapshot properties in the committer.

Changes

  • New: WriteObserver.java -- interface with observe(RowData, SinkWriter.Context) and default Map<String, String> snapshotMetadata()
  • New: WriteObserverMetadataHolder.java -- ThreadLocal holder for passing metadata through serialization boundaries
  • Modified: IcebergSinkWriter -- calls observer per-record, collects metadata at checkpoint via prepareCommit()
  • Modified: WriteResultSerializer -- v2 format carries observer metadata alongside WriteResult bytes
  • Modified: IcebergWriteAggregator -- reads metadata from deserialized WriteResults, merges across writer subtasks, passes to IcebergCommittable
  • Modified: IcebergCommittable + IcebergCommittableSerializer -- new observerMetadata field with v2 serialization format (backward-compatible with v1)
  • Modified: IcebergCommitter -- merges observer metadata from committables and applies as snapshot properties
  • Modified: IcebergSink.Builder -- new writeObserver() method

Compatibility

  • No behavioral change when the observer is not set (null default)
  • IcebergCommittableSerializer v2 can deserialize v1 payloads (backward compatible)
  • WriteResultSerializer v2 can deserialize v1 payloads (backward compatible)
  • No changes to public API signatures of existing methods

Use cases

  • Per-record watermark extraction for downstream freshness tracking
  • Data quality score aggregation per checkpoint
  • Custom metadata that should appear in Iceberg snapshot summaries

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions