From caaf3ba98942e15c8449d7efe23a006f55875dc1 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 12 May 2026 14:28:28 +0800 Subject: [PATCH] [fix](streaming-job) start counting task max interval after the first record is received (#63141) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Problem Summary: For PG CDC streaming jobs, when a task creates a fresh logical replication connection, the walsender must re-decode the WAL region from `slot.restart_lsn` up to the client-supplied `startLsn` before any event can be emitted. On high RTT networks (e.g. cross-region Aurora) this catch-up alone can take several seconds. Under the previous behavior the task's `maxInterval` window started the moment the task entered `writeRecords`, so the time spent on WAL position lookup + walsender catch-up was charged against the interval. With `max_interval=10s` this consistently caused tasks to finish with `0 records, 0 heartbeats`, the slot's `confirmed_flush_lsn` never advanced, and every following task repeated the same catch-up — the slot was effectively stuck and replication lag grew indefinitely. This PR delays the start of the `maxInterval` countdown until the first record (or heartbeat) is actually received from the source reader, so the per-task interval governs the real streaming window rather than being consumed by setup. The FE-side `streaming_task_timeout_multiplier * maxIntervalSec` still acts as the hard ceiling. --- .../service/PipelineCoordinator.java | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 614c506619fa8d..003ab813829387 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -248,14 +248,16 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception boolean isSnapshotSplit = sourceReader.isSnapshotSplit(readResult.getSplit()); long startTime = System.currentTimeMillis(); + long streamingStartTime = -1; long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 1000; boolean shouldStop = false; boolean lastMessageIsHeartbeat = false; LOG.info( - "Start polling records for jobId={} taskId={}, isSnapshotSplit={}", + "Start polling records for jobId={} taskId={}, isSnapshotSplit={}, maxIntervalMillis={}", writeRecordRequest.getJobId(), writeRecordRequest.getTaskId(), - isSnapshotSplit); + isSnapshotSplit, + maxIntervalMillis); // 2. poll record while (!shouldStop) { @@ -265,9 +267,14 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception Thread.sleep(100); // Check if should stop - long elapsedTime = System.currentTimeMillis() - startTime; + long elapsedTime = + streamingStartTime > 0 + ? System.currentTimeMillis() - streamingStartTime + : 0; boolean timeoutReached = - maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis; + streamingStartTime > 0 + && maxIntervalMillis > 0 + && elapsedTime >= maxIntervalMillis; if (shouldStop( isSnapshotSplit, @@ -281,6 +288,15 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception continue; } + if (streamingStartTime < 0) { + streamingStartTime = System.currentTimeMillis(); + LOG.info( + "Streaming phase started after {} ms setup for jobId={} taskId={}", + streamingStartTime - startTime, + writeRecordRequest.getJobId(), + writeRecordRequest.getTaskId()); + } + while (recordIterator.hasNext()) { SourceRecord element = recordIterator.next(); @@ -294,9 +310,11 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } // If already timeout, stop immediately when heartbeat received - long elapsedTime = System.currentTimeMillis() - startTime; + long elapsedTime = System.currentTimeMillis() - streamingStartTime; boolean timeoutReached = - maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis; + streamingStartTime > 0 + && maxIntervalMillis > 0 + && elapsedTime >= maxIntervalMillis; if (!isSnapshotSplit && timeoutReached) { LOG.info(