Skip to content

Commit c13c031

Browse files
authored
[Hotfix][CDC] Fix chunk start/end parameter type error (#4777)
Incorrect wrapping as Array<Array> types, but only Array type required
1 parent 40707a3 commit c13c031

File tree

6 files changed

+18
-14
lines changed

6 files changed

+18
-14
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,16 @@ public class CompletedSnapshotSplitInfo implements Serializable {
3030
private final String splitId;
3131
private final TableId tableId;
3232
private final SeaTunnelRowType splitKeyType;
33-
private final Object splitStart;
34-
private final Object splitEnd;
33+
private final Object[] splitStart;
34+
private final Object[] splitEnd;
3535
private final Offset watermark;
3636

3737
public CompletedSnapshotSplitInfo(
3838
String splitId,
3939
TableId tableId,
4040
SeaTunnelRowType splitKeyType,
41-
Object splitStart,
42-
Object splitEnd,
41+
Object[] splitStart,
42+
Object[] splitEnd,
4343
Offset watermark) {
4444
this.splitId = splitId;
4545
this.tableId = tableId;

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ public class SnapshotSplit extends SourceSplitBase {
2828
private static final long serialVersionUID = 1L;
2929
private final TableId tableId;
3030
private final SeaTunnelRowType splitKeyType;
31-
private final Object splitStart;
32-
private final Object splitEnd;
31+
private final Object[] splitStart;
32+
private final Object[] splitEnd;
3333

3434
private final Offset highWatermark;
3535

3636
public SnapshotSplit(
3737
String splitId,
3838
TableId tableId,
3939
SeaTunnelRowType splitKeyType,
40-
Object splitStart,
41-
Object splitEnd,
40+
Object[] splitStart,
41+
Object[] splitEnd,
4242
Offset highWatermark) {
4343
super(splitId);
4444
this.tableId = tableId;

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,10 @@ private SnapshotSplit createSnapshotSplit(
295295
Object chunkStart,
296296
Object chunkEnd) {
297297
// currently, we only support single split column
298+
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
299+
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
298300
return new SnapshotSplit(
299-
splitId(tableId, chunkId), tableId, splitKeyType, chunkStart, chunkEnd, null);
301+
splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd, null);
300302
}
301303

302304
// ------------------------------------------------------------------------------------------

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,8 @@ private void createDataEventsForTable(
205205
selectSql,
206206
snapshotSplit.getSplitStart() == null,
207207
snapshotSplit.getSplitEnd() == null,
208-
new Object[] {snapshotSplit.getSplitStart()},
209-
new Object[] {snapshotSplit.getSplitEnd()},
208+
snapshotSplit.getSplitStart(),
209+
snapshotSplit.getSplitEnd(),
210210
snapshotSplit.getSplitKeyType().getTotalFields(),
211211
connectorConfig.getSnapshotFetchSize());
212212
ResultSet rs = selectStatement.executeQuery()) {

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,10 @@ private SnapshotSplit createSnapshotSplit(
292292
Object chunkStart,
293293
Object chunkEnd) {
294294
// currently, we only support single split column
295+
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
296+
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
295297
return new SnapshotSplit(
296-
splitId(tableId, chunkId), tableId, splitKeyType, chunkStart, chunkEnd, null);
298+
splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd, null);
297299
}
298300

299301
// ------------------------------------------------------------------------------------------

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ private void createDataEventsForTable(
195195
selectSql,
196196
snapshotSplit.getSplitStart() == null,
197197
snapshotSplit.getSplitEnd() == null,
198-
new Object[] {snapshotSplit.getSplitStart()},
199-
new Object[] {snapshotSplit.getSplitEnd()},
198+
snapshotSplit.getSplitStart(),
199+
snapshotSplit.getSplitEnd(),
200200
snapshotSplit.getSplitKeyType().getTotalFields(),
201201
connectorConfig.getSnapshotFetchSize());
202202
ResultSet rs = selectStatement.executeQuery()) {

0 commit comments

Comments
 (0)