Skip to content

Commit 0e4190a

Browse files
happyboy1024happyboy1024
andauthored
[Improve][CDC] support exactly-once of cdc and fix the BinlogOffset comparing bug (#5057)
* [Improve][CDC] support exactly-once of cdc, fix the BinlogOffset comparing bug * [Improve][CDC] adjust code style * [Improve][CDC] fix ci error --------- Co-authored-by: happyboy1024 <296442618@qq.com>
1 parent 9821dcf commit 0e4190a

File tree

3 files changed

+84
-10
lines changed

3 files changed

+84
-10
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,11 @@ public void close() {
223223

224224
private boolean isChangeRecordInChunkRange(SourceRecord record) {
225225
if (taskContext.isDataChangeRecord(record)) {
226+
// fix the between condition
226227
return taskContext.isRecordBetween(
227228
record,
228-
null == currentSnapshotSplit.getSplitStart()
229-
? null
230-
: new Object[] {currentSnapshotSplit.getSplitStart()},
231-
null == currentSnapshotSplit.getSplitEnd()
232-
? null
233-
: new Object[] {currentSnapshotSplit.getSplitEnd()});
229+
currentSnapshotSplit.getSplitStart(),
230+
currentSnapshotSplit.getSplitEnd());
234231
}
235232
return false;
236233
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.common.utils.SeaTunnelException;
2121
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
22+
import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
2223
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
2324
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
2425
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -32,8 +33,12 @@
3233
import lombok.extern.slf4j.Slf4j;
3334

3435
import java.util.ArrayList;
36+
import java.util.HashMap;
37+
import java.util.HashSet;
3538
import java.util.Iterator;
3639
import java.util.List;
40+
import java.util.Map;
41+
import java.util.Set;
3742
import java.util.concurrent.ExecutorService;
3843
import java.util.concurrent.Executors;
3944
import java.util.concurrent.ThreadFactory;
@@ -49,6 +54,8 @@
4954
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
5055
private final FetchTask.Context taskContext;
5156
private final ExecutorService executorService;
57+
// has entered pure binlog mode
58+
private final Set<TableId> pureBinlogPhaseTables;
5259
private volatile ChangeEventQueue<DataChangeEvent> queue;
5360
private volatile Throwable readException;
5461

@@ -58,13 +65,19 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So
5865

5966
private Offset splitStartWatermark;
6067

68+
// maximum watermark for each table
69+
private Map<TableId, Offset> maxSplitHighWatermarkMap;
70+
// finished spilt info
71+
private Map<TableId, List<CompletedSnapshotSplitInfo>> finishedSplitsInfo;
72+
6173
private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
6274

6375
public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTaskId) {
6476
this.taskContext = taskContext;
6577
ThreadFactory threadFactory =
6678
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
6779
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
80+
this.pureBinlogPhaseTables = new HashSet<>();
6881
}
6982

7083
@Override
@@ -157,14 +170,72 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
157170
tableId);
158171
return position.isAfter(splitStartWatermark);
159172
}
160-
// TODO only the table who captured snapshot splits need to filter( Used to support
161-
// Exactly-Once )
162-
return position.isAfter(splitStartWatermark);
173+
// check whether the pure binlog mode has been entered
174+
if (hasEnterPureBinlogPhase(tableId, position)) {
175+
return true;
176+
}
177+
// not enter pure binlog mode and need to check whether the current record meets the
178+
// emitting conditions.
179+
if (finishedSplitsInfo.containsKey(tableId)) {
180+
for (CompletedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
181+
if (taskContext.isRecordBetween(
182+
sourceRecord,
183+
splitInfo.getSplitStart(),
184+
splitInfo.getSplitEnd())
185+
&& position.isAfter(splitInfo.getWatermark().getHighWatermark())) {
186+
return true;
187+
}
188+
}
189+
}
190+
return false;
163191
}
164192
return true;
165193
}
166194

195+
private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
196+
// only the table who captured snapshot splits need to filter
197+
if (pureBinlogPhaseTables.contains(tableId)) {
198+
return true;
199+
}
200+
// the existed tables those have finished snapshot reading
201+
if (maxSplitHighWatermarkMap.containsKey(tableId)
202+
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
203+
pureBinlogPhaseTables.add(tableId);
204+
return true;
205+
}
206+
return false;
207+
}
208+
167209
private void configureFilter() {
168210
splitStartWatermark = currentIncrementalSplit.getStartupOffset();
211+
Map<TableId, List<CompletedSnapshotSplitInfo>> splitsInfoMap = new HashMap<>();
212+
Map<TableId, Offset> tableIdBinlogPositionMap = new HashMap<>();
213+
List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
214+
currentIncrementalSplit.getCompletedSnapshotSplitInfos();
215+
216+
// latest-offset mode
217+
if (completedSnapshotSplitInfos.isEmpty()) {
218+
for (TableId tableId : currentIncrementalSplit.getTableIds()) {
219+
tableIdBinlogPositionMap.put(tableId, currentIncrementalSplit.getStartupOffset());
220+
}
221+
}
222+
223+
// calculate the max high watermark of every table
224+
for (CompletedSnapshotSplitInfo finishedSplitInfo : completedSnapshotSplitInfos) {
225+
TableId tableId = finishedSplitInfo.getTableId();
226+
List<CompletedSnapshotSplitInfo> list =
227+
splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
228+
list.add(finishedSplitInfo);
229+
splitsInfoMap.put(tableId, list);
230+
231+
Offset highWatermark = finishedSplitInfo.getWatermark().getHighWatermark();
232+
Offset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
233+
if (maxHighWatermark == null || highWatermark.isAfter(maxHighWatermark)) {
234+
tableIdBinlogPositionMap.put(tableId, highWatermark);
235+
}
236+
}
237+
this.finishedSplitsInfo = splitsInfoMap;
238+
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
239+
this.pureBinlogPhaseTables.clear();
169240
}
170241
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,13 @@ public int compareTo(Offset offset) {
176176
// compared ...
177177
long timestamp = this.getTimestamp();
178178
long targetTimestamp = that.getTimestamp();
179-
return Long.compare(timestamp, targetTimestamp);
179+
// Timestamps are presupposes that they exist,
180+
// because timestamps do not exist for low watermark and high watermark.
181+
// If not judging here results in the really binlog offset comparison to watermark
182+
// always being true.
183+
if (timestamp != 0 && targetTimestamp != 0) {
184+
return Long.compare(timestamp, targetTimestamp);
185+
}
180186
}
181187

182188
// First compare the MySQL binlog filenames

0 commit comments

Comments
 (0)