diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 614c506619fa8d..003ab813829387 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -248,14 +248,16 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception boolean isSnapshotSplit = sourceReader.isSnapshotSplit(readResult.getSplit()); long startTime = System.currentTimeMillis(); + long streamingStartTime = -1; long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 1000; boolean shouldStop = false; boolean lastMessageIsHeartbeat = false; LOG.info( - "Start polling records for jobId={} taskId={}, isSnapshotSplit={}", + "Start polling records for jobId={} taskId={}, isSnapshotSplit={}, maxIntervalMillis={}", writeRecordRequest.getJobId(), writeRecordRequest.getTaskId(), - isSnapshotSplit); + isSnapshotSplit, + maxIntervalMillis); // 2. poll record while (!shouldStop) { @@ -265,9 +267,14 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception Thread.sleep(100); // Check if should stop - long elapsedTime = System.currentTimeMillis() - startTime; + long elapsedTime = + streamingStartTime > 0 + ? System.currentTimeMillis() - streamingStartTime + : 0; boolean timeoutReached = - maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis; + streamingStartTime > 0 + && maxIntervalMillis > 0 + && elapsedTime >= maxIntervalMillis; if (shouldStop( isSnapshotSplit, @@ -281,6 +288,15 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception continue; } + if (streamingStartTime < 0) { + streamingStartTime = System.currentTimeMillis(); + LOG.info( + "Streaming phase started after {} ms setup for jobId={} taskId={}", + streamingStartTime - startTime, + writeRecordRequest.getJobId(), + writeRecordRequest.getTaskId()); + } + while (recordIterator.hasNext()) { SourceRecord element = recordIterator.next(); @@ -294,9 +310,11 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } // If already timeout, stop immediately when heartbeat received - long elapsedTime = System.currentTimeMillis() - startTime; + long elapsedTime = System.currentTimeMillis() - streamingStartTime; boolean timeoutReached = - maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis; + streamingStartTime > 0 + && maxIntervalMillis > 0 + && elapsedTime >= maxIntervalMillis; if (!isSnapshotSplit && timeoutReached) { LOG.info(