Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
When a Flink source subscribes to multiple partition tables (or multiple buckets) whose consuming progress differs significantly, the reported currentFetchEventTimeLag metric is much smaller than the actual currentEmitEventTimeLag. This makes the metric misleading for monitoring and alerting, because:
currentFetchEventTimeLag stays near 0, suggesting the source has caught up.
currentEmitEventTimeLag reports a large lag (e.g. 2 hours), suggesting records are piling up.
- Yet the Flink job has no backpressure, so the gap cannot be explained by downstream slowness.
Root Cause
In FlinkSourceSplitReader#forLogRecords:
maxConsumerRecordTimestampInFetch =
Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp());
...
flinkSourceReaderMetrics.reportRecordEventTime(
fetchTimestamp - maxConsumerRecordTimestampInFetch);
The aggregated timestamp is the MAX across all buckets in the fetch, so the reported lag is effectively the MIN lag across buckets.
Example scenario
- Partition A is 2 hours behind → its last record timestamp is 2 hours old.
- Partition B is reading the latest data → its last record timestamp ≈
now.
Every logScanner.poll(POLL_TIMEOUT) tends to return partition B's fresh data (since it is always locally ready as a CompletedFetch). The Math.max across buckets then picks partition B's near-now timestamp, so:
fetchTimestamp - maxTimestamp ≈ 0 → currentFetchEventTimeLag reports ~0
Meanwhile, emit lag is computed per record and aggregated as max, so partition A's 2-hour-old records correctly inflate currentEmitEventTimeLag. This is why the two metrics diverge dramatically even though there is no backpressure.
Solution
-
Align fetch lag semantics with emit lag (max-lag across buckets):
Change the aggregation in forLogRecords from Math.max to Math.min on record timestamp (equivalent to max on lag), so currentFetchEventTimeLag reports the worst-case lag across buckets in the fetch.
-
Expose per-bucket / per-split fetch lag:
Add a new gauge currentFetchEventTimeLag under the existing per-bucket metric group (next to currentOffset):
- Non-partitioned:
fluss.reader.bucket.{bucket_id}.currentFetchEventTimeLag
- Partitioned:
fluss.reader.partition.{partition_id}.bucket.{bucket_id}.currentFetchEventTimeLag
This allows users to observe exactly which partition/bucket is lagging, making diagnosis like the scenario above straightforward.
Are you willing to submit a PR?
Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
When a Flink source subscribes to multiple partition tables (or multiple buckets) whose consuming progress differs significantly, the reported
currentFetchEventTimeLagmetric is much smaller than the actualcurrentEmitEventTimeLag. This makes the metric misleading for monitoring and alerting, because:currentFetchEventTimeLagstays near 0, suggesting the source has caught up.currentEmitEventTimeLagreports a large lag (e.g. 2 hours), suggesting records are piling up.Root Cause
In
FlinkSourceSplitReader#forLogRecords:The aggregated timestamp is the MAX across all buckets in the fetch, so the reported lag is effectively the MIN lag across buckets.
Example scenario
now.Every
logScanner.poll(POLL_TIMEOUT)tends to return partition B's fresh data (since it is always locally ready as aCompletedFetch). TheMath.maxacross buckets then picks partition B's near-nowtimestamp, so:Meanwhile, emit lag is computed per record and aggregated as max, so partition A's 2-hour-old records correctly inflate
currentEmitEventTimeLag. This is why the two metrics diverge dramatically even though there is no backpressure.Solution
Align fetch lag semantics with emit lag (max-lag across buckets):
Change the aggregation in
forLogRecordsfromMath.maxtoMath.minon record timestamp (equivalent tomaxon lag), socurrentFetchEventTimeLagreports the worst-case lag across buckets in the fetch.Expose per-bucket / per-split fetch lag:
Add a new gauge
currentFetchEventTimeLagunder the existing per-bucket metric group (next tocurrentOffset):fluss.reader.bucket.{bucket_id}.currentFetchEventTimeLagfluss.reader.partition.{partition_id}.bucket.{bucket_id}.currentFetchEventTimeLagThis allows users to observe exactly which partition/bucket is lagging, making diagnosis like the scenario above straightforward.
Are you willing to submit a PR?