Skip to content

Commit fbb60ce

Browse files
authored
[Improve][CDC] Optimize jdbc fetch-size options (#4352)
1 parent b71d873 commit fbb60ce

File tree

3 files changed

+5
-3
lines changed

3 files changed

+5
-3
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private void createDataEventsForTable(
206206
new Object[] {snapshotSplit.getSplitStart()},
207207
new Object[] {snapshotSplit.getSplitEnd()},
208208
snapshotSplit.getSplitKeyType().getTotalFields(),
209-
connectorConfig.getQueryFetchSize());
209+
connectorConfig.getSnapshotFetchSize());
210210
ResultSet rs = selectStatement.executeQuery()) {
211211

212212
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,10 @@ private static PreparedStatement initStatement(JdbcConnection jdbc, String sql,
344344
final PreparedStatement statement =
345345
connection.prepareStatement(
346346
sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
347-
if (fetchSize == 0) {
347+
if (fetchSize <= 0) {
348348
statement.setFetchSize(Integer.MIN_VALUE);
349+
} else {
350+
statement.setFetchSize(fetchSize);
349351
}
350352
return statement;
351353
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private void createDataEventsForTable(
197197
new Object[] {snapshotSplit.getSplitStart()},
198198
new Object[] {snapshotSplit.getSplitEnd()},
199199
snapshotSplit.getSplitKeyType().getTotalFields(),
200-
connectorConfig.getQueryFetchSize());
200+
connectorConfig.getSnapshotFetchSize());
201201
ResultSet rs = selectStatement.executeQuery()) {
202202

203203
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);

0 commit comments

Comments
 (0)