Skip to content

Commit 29b04e2

Browse files
authored
[Bug][CDC] Fix concurrent modify of splits (#3937)
1 parent f74ed56 commit 29b04e2

File tree

1 file changed

+3
-3
lines changed
  • seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator

1 file changed

+3
-3
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ public void open() {
6767
}
6868

6969
@Override
70-
public void run() throws Exception {
70+
public synchronized void run() throws Exception {
7171
this.running = true;
7272
assignSplits();
7373
}
7474

7575
@Override
76-
public void handleSplitRequest(int subtaskId) {
76+
public synchronized void handleSplitRequest(int subtaskId) {
7777
if (!context.registeredReaders().contains(subtaskId)) {
7878
// reader failed between sending the request and now. skip this request.
7979
return;
@@ -128,7 +128,7 @@ public PendingSplitsState snapshotState(long checkpointId) {
128128
}
129129

130130
@Override
131-
public void notifyCheckpointComplete(long checkpointId) {
131+
public synchronized void notifyCheckpointComplete(long checkpointId) {
132132
splitAssigner.notifyCheckpointComplete(checkpointId);
133133
// incremental split may be available after checkpoint complete
134134
assignSplits();

0 commit comments

Comments
 (0)