Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,16 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception

boolean isSnapshotSplit = sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long streamingStartTime = -1;
long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 1000;
boolean shouldStop = false;
boolean lastMessageIsHeartbeat = false;
LOG.info(
"Start polling records for jobId={} taskId={}, isSnapshotSplit={}",
"Start polling records for jobId={} taskId={}, isSnapshotSplit={}, maxIntervalMillis={}",
writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId(),
isSnapshotSplit);
isSnapshotSplit,
maxIntervalMillis);

// 2. poll record
while (!shouldStop) {
Expand All @@ -265,9 +267,14 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
Thread.sleep(100);

// Check if should stop
long elapsedTime = System.currentTimeMillis() - startTime;
long elapsedTime =
streamingStartTime > 0
? System.currentTimeMillis() - streamingStartTime
: 0;
boolean timeoutReached =
maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis;
streamingStartTime > 0
&& maxIntervalMillis > 0
&& elapsedTime >= maxIntervalMillis;

if (shouldStop(
isSnapshotSplit,
Expand All @@ -281,6 +288,15 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
continue;
}

if (streamingStartTime < 0) {
streamingStartTime = System.currentTimeMillis();
LOG.info(
"Streaming phase started after {} ms setup for jobId={} taskId={}",
streamingStartTime - startTime,
writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId());
}

while (recordIterator.hasNext()) {
SourceRecord element = recordIterator.next();

Expand All @@ -294,9 +310,11 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
}

// If already timeout, stop immediately when heartbeat received
long elapsedTime = System.currentTimeMillis() - startTime;
long elapsedTime = System.currentTimeMillis() - streamingStartTime;
boolean timeoutReached =
maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis;
streamingStartTime > 0
&& maxIntervalMillis > 0
&& elapsedTime >= maxIntervalMillis;

if (!isSnapshotSplit && timeoutReached) {
LOG.info(
Expand Down
Loading