Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP-Improve][CDC] Upgrade debezium to 1.9.8.Final #6288

Closed
wants to merge 3 commits into from

Conversation

CheneyYin
Copy link
Contributor

Purpose of this pull request

Upgrade debezium to 1.9.8.Final

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@CheneyYin CheneyYin force-pushed the upgrade-debezium branch 12 times, most recently from a772b0b to 0e081ae Compare February 3, 2024 09:55
@hailin0 hailin0 added cdc debezium core SeaTunnel core module labels Feb 18, 2024
@CheneyYin CheneyYin force-pushed the upgrade-debezium branch 3 times, most recently from 81aef5e to 1541b2d Compare March 3, 2024 13:45
@CheneyYin
Copy link
Contributor Author

// debezium 1.6.4.Final

    private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier,
                             long maxQueueSizeInBytes, boolean buffering) {
        this.pollInterval = pollInterval;
        this.maxBatchSize = maxBatchSize;
        this.maxQueueSize = maxQueueSize;
        this.queue = new LinkedBlockingDeque<>(maxQueueSize);
        this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
        this.loggingContextSupplier = loggingContextSupplier;
        this.maxQueueSizeInBytes = maxQueueSizeInBytes;
        this.buffering = buffering;
    }

https://github.com/debezium/debezium/blob/c6c40201d2a3cf70832525d77f87fc3270af145f/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java#L87-L98

// debezium 1.9.8.Final

    private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier,
                             long maxQueueSizeInBytes, boolean buffering) {
        this.pollInterval = pollInterval;
        this.maxBatchSize = maxBatchSize;
        this.maxQueueSize = maxQueueSize;
        this.queue = new ArrayDeque<>(maxQueueSize);
        this.loggingContextSupplier = loggingContextSupplier;
        this.sizeInBytesQueue = new ArrayDeque<>(maxQueueSize);
        this.maxQueueSizeInBytes = maxQueueSizeInBytes;
        this.buffering = buffering;
    }

https://github.com/debezium/debezium/blob/393291bba54edc745f45a679ffd7b2aa7110f454/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java#L84-L94

Some JdbcSourceFetchTaskContext class (like MySqlSourceFetchTaskContext) set size of ChangeEventQueue on Integer.MAX_VALUE. It will lead OOM because ArrayQueue preallocated memory after upgrade debezium to 1.9.8.Final.

// If in the snapshot read phase and enable exactly-once, the queue needs to be set to a
// maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise,
// use the configuration queue size.
final int queueSize =
sourceSplitBase.isSnapshotSplit() && isExactlyOnce()
? Integer.MAX_VALUE
: getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(queueSize)
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(
() ->
taskContext.configureLoggingContext(
"mysql-cdc-connector-task"))
// do not buffer any element, we use signal event
// .buffering()
.build();

@CheneyYin
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cdc core SeaTunnel core module
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants