Skip to content

Commit

Permalink
try decrease ChangeEventQueue size.
Browse files Browse the repository at this point in the history
  • Loading branch information
CheneyYin committed Mar 5, 2024
1 parent 5ed3e02 commit f9e9d1e
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void configure(@Nonnull SourceSplitBase sourceSplitBase) {
// use the configuration queue size.
final int queueSize =
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
? Integer.MAX_VALUE
? 4096
: sourceConfig.getBatchSize();
this.changeEventQueue =
new ChangeEventQueue.Builder<DataChangeEvent>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
// use the configuration queue size.
final int queueSize =
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
? Integer.MAX_VALUE
? 4096
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
// use the configuration queue size.
final int queueSize =
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
? Integer.MAX_VALUE
? 4096
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
// use the configuration queue size.
final int queueSize =
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
? Integer.MAX_VALUE
? 4096
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();

LoggingContext.PreviousContext previousContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
// use the configuration queue size.
final int queueSize =
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
? Integer.MAX_VALUE
? 4096
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();

this.queue =
Expand Down

0 comments on commit f9e9d1e

Please sign in to comment.