Skip to content

Flink: Expose scan planning metrics on ContinuousIcebergEnumerator (#16589)#16590

Open
2dmurali wants to merge 2 commits into
apache:mainfrom
2dmurali:flink-scan-planning-metrics
Open

Flink: Expose scan planning metrics on ContinuousIcebergEnumerator (#16589)#16590
2dmurali wants to merge 2 commits into
apache:mainfrom
2dmurali:flink-scan-planning-metrics

Conversation

@2dmurali
Copy link
Copy Markdown

Closes #16589

Summary

  • Wire metricsReporter() into BaseIncrementalScan.planFiles(), bringing incremental scans to parity with batch scans (SnapshotScan) for metrics reporting.
  • Expose all ScanMetricsResult fields as Flink gauges on ContinuousIcebergEnumerator, reporting per-scan (last-value) snapshots via the coordinator metric
    group.
  • Accessible through Flink metric reporters (Prometheus, Datadog, JMX, Slf4j) at path: coordinator.enumerator.IcebergSourceEnumerator.table.<tableName>.<metric>

Changes

Core:

  • BaseIncrementalScan.planFiles() now emits ScanReport via metricsReporter() (same pattern as SnapshotScan)

Flink (v1.20, v2.0, v2.1):

  • New IcebergSourceEnumeratorMetrics class — 17 AtomicLong-backed gauges
  • ContinuousSplitPlannerImpl creates InMemoryMetricsReporter per scan cycle, attaches ScanReport to ContinuousEnumerationResult
  • ContinuousIcebergEnumerator updates metrics after each scan discovery
  • FlinkSplitPlanner supports optional MetricsReporter parameter

Design decisions

  • Gauges (not counters): Per-scan snapshots match Spark's reporting model and let operators see spikes directly without applying rate().
  • AtomicLong: Gauges are read by metric reporter threads, written by coordinator thread.
  • InMemoryMetricsReporter per cycle: Lightweight, no accumulation, follows existing Spark pattern.

Testing

  • Unit tests: TestIcebergSourceEnumeratorMetrics (all 17 gauges, null handling, last-value semantics)
  • Wiring test: TestContinuousIcebergEnumerator.testEnumeratorMetricsUpdatedFromScanReport
  • Integration test: TestIcebergSourceContinuous (MiniCluster + InMemoryReporter)
  • Core test: TestIncrementalScanPlanningAndReporting
  • Verified end-to-end on standalone Flink 2.0 cluster

Future work

  • Batch metrics (StaticIcebergEnumerator) — requires different design due to serialization boundary, planned as follow-up.

@2dmurali
Copy link
Copy Markdown
Author

Hi @pvary could you review this when you have a chance? I see you're actively reviewing on the Flink side. TIA

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flink: Expose scan planning metrics on ContinuousIcebergEnumerator

1 participant