Skip to content

Commit 4856645

Browse files
authored
[Improve][CDC] Optimize memory allocation for snapshot split reading (#6281)
1 parent 9009b2a commit 4856645

File tree

7 files changed

+31
-5
lines changed

7 files changed

+31
-5
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
8888
executorService.submit(
8989
() -> {
9090
try {
91+
log.info(
92+
"Start snapshot read task for snapshot split: {} exactly-once: {}",
93+
currentSnapshotSplit,
94+
taskContext.isExactlyOnce());
9195
snapshotSplitReadTask.execute(taskContext);
9296
} catch (Exception e) {
9397
log.error(

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
9999
executorService.submit(
100100
() -> {
101101
try {
102+
log.info(
103+
"Start incremental read task for incremental split: {} exactly-once: {}",
104+
currentIncrementalSplit,
105+
taskContext.isExactlyOnce());
102106
streamFetchTask.execute(taskContext);
103107
} catch (Exception e) {
104108
log.error(

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,13 @@ public MongodbFetchTaskContext(
8989
}
9090

9191
public void configure(@Nonnull SourceSplitBase sourceSplitBase) {
92+
// If in the snapshot read phase and enable exactly-once, the queue needs to be set to a
93+
// maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise,
94+
// use the configuration queue size.
9295
final int queueSize =
93-
sourceSplitBase.isSnapshotSplit() ? Integer.MAX_VALUE : sourceConfig.getBatchSize();
96+
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
97+
? Integer.MAX_VALUE
98+
: sourceConfig.getBatchSize();
9499
this.changeEventQueue =
95100
new ChangeEventQueue.Builder<DataChangeEvent>()
96101
.pollInterval(Duration.ofMillis(sourceConfig.getPollAwaitTimeMillis()))

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,12 @@ public void configure(SourceSplitBase sourceSplitBase) {
122122

123123
this.taskContext =
124124
new MySqlTaskContextImpl(connectorConfig, databaseSchema, binaryLogClient);
125+
126+
// If in the snapshot read phase and enable exactly-once, the queue needs to be set to a
127+
// maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise,
128+
// use the configuration queue size.
125129
final int queueSize =
126-
sourceSplitBase.isSnapshotSplit()
130+
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
127131
? Integer.MAX_VALUE
128132
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
129133
this.queue =

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@ public void configure(SourceSplitBase sourceSplitBase) {
119119

120120
this.taskContext = new OracleTaskContext(connectorConfig, databaseSchema);
121121

122+
// If in the snapshot read phase and enable exactly-once, the queue needs to be set to a
123+
// maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise,
124+
// use the configuration queue size.
122125
final int queueSize =
123-
sourceSplitBase.isSnapshotSplit()
126+
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
124127
? Integer.MAX_VALUE
125128
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
126129
this.queue =

seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,11 @@ public void configure(SourceSplitBase sourceSplitBase) {
145145
loadStartingOffsetState(
146146
new PostgresOffsetContext.Loader(connectorConfig), sourceSplitBase);
147147

148+
// If in the snapshot read phase and enable exactly-once, the queue needs to be set to a
149+
// maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise,
150+
// use the configuration queue size.
148151
final int queueSize =
149-
sourceSplitBase.isSnapshotSplit()
152+
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
150153
? Integer.MAX_VALUE
151154
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
152155

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,11 @@ public void configure(SourceSplitBase sourceSplitBase) {
122122

123123
this.taskContext = new SqlServerTaskContext(connectorConfig, databaseSchema);
124124

125+
// If in the snapshot read phase and enable exactly-once, the queue needs to be set to a
126+
// maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise,
127+
// use the configuration queue size.
125128
final int queueSize =
126-
sourceSplitBase.isSnapshotSplit()
129+
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
127130
? Integer.MAX_VALUE
128131
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
129132

0 commit comments

Comments
 (0)