[improve](streaming-job) add per-job lag metric to streaming insert jobs#63194
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR extends the existing per-job Prometheus metrics for streaming insert jobs by adding a per-job lag gauge (streaming_job_per_job_lag, seconds). The lag value is sourced from the CDC offset provider and is only emitted when meaningful (CDC incremental/binlog/WAL phase), aligning Prometheus exposure with the existing SQL Lag column added previously.
Changes:
- Add new per-job metric name constant and register/unregister logic for
streaming_job_per_job_lagin FE metrics export. - Add
StreamingInsertJob#getLag()wrapper to expose offset-provider lag to the metrics layer. - Extend MySQL streaming job regression test to (a) drive the job into incremental phase and (b) validate the new per-job lag metric appears in
/metrics?type=json.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy | Ensures lag becomes available after snapshot and checks the new per-job lag metric is emitted. |
| fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java | Adds per-job lag metric registration and cleanup as part of per-request per-job metric regeneration. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Adds a small wrapper method to fetch lag from the offset provider for metric consumption. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Review result: no blocking issues found.
Critical checkpoint conclusions:
- Goal and tests: The change adds
streaming_job_per_job_lagfor streaming insert jobs and extends the MySQL CDC metrics regression test to wait for a numeric Lag and then assert the metric appears. - Scope: The actual GitHub PR diff is focused on
StreamingInsertJob,MetricRepo, and the CDC metrics regression test. - Concurrency and locking: No new locking or shared mutable structures are introduced; per-job metric refresh remains inside the existing synchronized metric collection flow.
- Lifecycle: No new persisted state or replay behavior is added; lag is read from the existing offset provider.
- Configuration and compatibility: No new config, thrift, storage format, or rolling-upgrade-sensitive protocol changes.
- Parallel paths: Existing per-job streaming metrics are extended consistently with remove/re-register behavior; empty/non-applicable lag is skipped as intended.
- Error handling: Invalid or non-numeric lag strings are ignored rather than breaking
/metrics, matching the defensive behavior expected for observability code. - Test coverage: The regression test covers CDC lag becoming available and verifies the Prometheus JSON metric label appears. I did not run the external MySQL regression locally in this runner.
- Observability: Metric name, unit, and labels follow the adjacent per-job streaming metrics.
- User focus: No additional user-provided focus points were present.
…ult to -1 for N/A
|
/review |
|
run buildall |
| GaugeMetric<Long> lag = new GaugeMetric<Long>( | ||
| STREAMING_JOB_PER_JOB_LAG, MetricUnit.SECONDS, | ||
| "per job lag in seconds of streaming job, -1 means N/A") { | ||
| @Override | ||
| public Long getValue() { | ||
| return sJob.getLagSeconds(); | ||
| } | ||
| }; | ||
| lag.addLabel(new MetricLabel("job_id", jobId)) | ||
| .addLabel(new MetricLabel("job_name", jobName)); | ||
| DORIS_METRIC_REGISTER.addMetrics(lag); |
There was a problem hiding this comment.
I found one blocking test-coverage issue: the added regression check only verifies that the new lag metric exists, not that it exposes a real CDC lag value. Because the implementation emits -1 for N/A/snapshot/unparseable lag, this test would still pass if the job never reaches the binlog/WAL phase or if getLag() always returns an empty/unparseable value.
Critical checkpoint conclusions:
- Goal/test proof: The code adds a per-job lag metric, but the regression test does not prove the CDC lag value is actually surfaced.
- Scope/focus: The production change is small and localized.
- Concurrency/lifecycle: Metrics are updated from the existing synchronized metrics path; no new lock-order or lifecycle issue found.
- Configuration/compatibility: No new configs or storage/protocol compatibility concerns found.
- Parallel paths: The metric is added to the existing streaming per-job metrics path.
- Tests: The new assertion needs to validate the metric value is numeric and non-negative after the job has entered CDC/binlog phase, instead of accepting the -1 N/A fallback.
- Observability/performance: The new metric is lightweight and uses existing labels; no additional observability issue found.
User focus: No additional user-provided review focus was present.
|
/review |
There was a problem hiding this comment.
Review summary:
No additional distinct inline issues found beyond the existing review threads.
Critical checkpoint conclusions:
- Goal and proof: The PR adds a per-job streaming lag metric from the existing
Lagvalue. The implementation exposes the metric, but the existing thread correctly notes the regression test only proves the series exists and does not prove a real non-N/A CDC lag value is exported. - Scope: The code change is small and focused on one metric plus its regression coverage.
- Concurrency and locking: No new metadata locks or multi-lock paths are introduced. The metric reads existing job/offset state during
/metricscollection, matching the existing metrics pattern. - Lifecycle/static initialization: No new static initialization dependency or non-obvious lifecycle ownership issue found.
- Configuration/compatibility: No new config, persistence format, FE-BE protocol, or rolling-upgrade compatibility issue found.
- Parallel paths: The new metric is added to the existing per-job streaming metric registration/removal path. No separate cloud/shared-nothing path requiring another change was found.
- Conditional checks/error handling: The
-1N/A sentinel behavior is intentional per the current PR description; existing review discussion already covers the semantic tradeoff and test implications. - Test coverage: Existing review threads cover the main gap: the regression test should verify the lag value semantics, not just metric presence.
- Observability/performance: The metric improves observability and uses the existing metric refresh path; no additional hot-path or excessive-allocation concern found.
- Transaction/data correctness: No transaction visibility, persistence, or data write path changes are introduced.
User focus: No additional user-provided review focus was specified.
TPC-H: Total hot run time: 29725 ms |
TPC-DS: Total hot run time: 171883 ms |
FE Regression Coverage ReportIncrement line coverage |
|
PR approved by at least one committer and no changes requested. |
…obs (#63194) ### What problem does this PR solve? Related PR: #62224 (per-job metrics), #62269 (Lag column) Problem Summary: #62224 introduced per-job metrics (`streaming_job_per_job_scanned_rows`, `_load_bytes`, `_filtered_rows`, `_succeed_task_count`, `_failed_task_count`) for streaming insert jobs, exposed via `/metrics` with `job_id`/`job_name` labels for Prometheus. #62269 later added a `Lag` column to `SHOW JOBS` / `jobs()` TVF that reports end-to-end CDC delay in seconds, but the value was only exposed through SQL — there was no corresponding Prometheus metric, so dashboards/alerting on lag was not possible. This PR adds `streaming_job_per_job_lag` (unit: `SECONDS`) to the existing per-job metric set.
…obs (#63194) ### What problem does this PR solve? Related PR: #62224 (per-job metrics), #62269 (Lag column) Problem Summary: #62224 introduced per-job metrics (`streaming_job_per_job_scanned_rows`, `_load_bytes`, `_filtered_rows`, `_succeed_task_count`, `_failed_task_count`) for streaming insert jobs, exposed via `/metrics` with `job_id`/`job_name` labels for Prometheus. #62269 later added a `Lag` column to `SHOW JOBS` / `jobs()` TVF that reports end-to-end CDC delay in seconds, but the value was only exposed through SQL — there was no corresponding Prometheus metric, so dashboards/alerting on lag was not possible. This PR adds `streaming_job_per_job_lag` (unit: `SECONDS`) to the existing per-job metric set.
What problem does this PR solve?
Related PR: #62224 (per-job metrics), #62269 (Lag column)
Problem Summary:
#62224 introduced per-job metrics (
streaming_job_per_job_scanned_rows,_load_bytes,_filtered_rows,_succeed_task_count,_failed_task_count) for streaming insert jobs, exposed via/metricswithjob_id/job_namelabels for Prometheus.#62269 later added a
Lagcolumn toSHOW JOBS/jobs()TVF that reports end-to-end CDC delay in seconds, but the value was only exposed through SQL — there was no corresponding Prometheus metric, so dashboards/alerting on lag was not possible.This PR adds
streaming_job_per_job_lag(unit:SECONDS) to the existing per-job metric set.Implementation:
StreamingInsertJob#getLagSeconds()parses the existingoffsetProvider.getLag()string into a long. Returns-1when lag is not applicable (S3, snapshot phase) or unparseable, so dashboards can filter N/A jobs vialag >= 0and distinguish them from "CDC caught up (lag = 0)".MetricRepo.updateStreamingJobPerJobMetrics()registers the gauge for every streaming insert job (no skip), matching the always-present behavior of the other per-job metrics.Semantics summary:
Lag""-1"0"0"N"NRelease note
Add
streaming_job_per_job_lagPrometheus metric (unit: seconds,-1means N/A) for streaming insert jobs.Check List (For Author)
Test
Behavior changed:
Does this need documentation?