Skip to content

[FLINK-39315][mysql] Unregister listeners of BinaryLogClient to prevent snapshot reader hang during backfill#4334

Open
chengcongchina wants to merge 3 commits intoapache:masterfrom
chengcongchina:FLINK-39315
Open

[FLINK-39315][mysql] Unregister listeners of BinaryLogClient to prevent snapshot reader hang during backfill#4334
chengcongchina wants to merge 3 commits intoapache:masterfrom
chengcongchina:FLINK-39315

Conversation

@chengcongchina
Copy link
Contributor

This closes FLINK-39315.

What is the purpose of the change

This PR fixes a MySQL CDC source hang in the snapshot backfill phase when processing multiple snapshot splits sequentially while reusing the same BinaryLogClient.

SnapshotSplitReader.pollWithBuffer() waits for the BINLOG_END watermark to finish a split. However, BinaryLogClient was reused across split executions and MySqlStreamingChangeEventSource.execute() registered multiple event/lifecycle listeners on each execution without unregistering them. As a result, listeners from previous splits could still receive binlog events during later splits and:

  • stop the shared ChangeEventSourceContext prematurely (causing the current split’s backfill to exit early), and/or
  • dispatch BINLOG_END via a stale SignalEventDispatcher into a stale queue (so the current pollWithBuffer() never sees BINLOG_END).

This could leave the queue empty while the backfill thread has already stopped, causing the reader to hang indefinitely.

Brief change log

  • Unregister BinaryLogClient event and lifecycle listeners in MySqlStreamingChangeEventSource.execute() after each execution to avoid listener accumulation and cross-split interference.
  • Ensure the unregister/cleanup logic is executed deterministically in the cleanup path (fail-fast behavior if cleanup fails).
  • Add/extend unit test coverage in SnapshotSplitReaderTest (based on testMultipleSplitsWithBackfill) to validate multiple snapshot splits with a forced backfill phase can finish and produce the expected output.

Verifying this change

This change is verified by unit tests:

  • SnapshotSplitReaderTest#testMultipleSplitsWithBackfill

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a MySQL CDC snapshot backfill hang when reusing a BinaryLogClient across multiple snapshot splits by ensuring listeners registered during streaming execution are cleaned up deterministically.

Changes:

  • Unregister BinaryLogClient event and lifecycle listeners after each MySqlStreamingChangeEventSource.execute() run to prevent cross-split interference.
  • Add a stress-style test adjustment to generate more binlog activity during sequential split backfills.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
.../SnapshotSplitReaderTest.java Extends an existing test to generate enough binlog events to reproduce/guard against the hang scenario.
.../MySqlStreamingChangeEventSource.java Tracks registered listeners and unregisters them in cleanup to avoid listener accumulation with a reused BinaryLogClient.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

// Only when we reach the first BEGIN event will we start to skip events ...
skipEvent = false;

Throwable executionError = null;
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception occurs in the main try and any unregister* call throws in finally, the cleanup exception will replace the original failure, making the root cause harder to diagnose. Consider using the executionError variable to preserve the primary exception: record the original throwable, then wrap unregister* in a try/catch and attach cleanup failures via addSuppressed (or only throw cleanup failures when there was no primary error).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless, deleted.

Comment on lines +1276 to +1280
client.unregisterEventListener(eventListener);
client.unregisterEventListener(metricsEventListener);
client.unregisterLifecycleListener(lifecycleListener);
if (logEventListener != null) {
client.unregisterEventListener(logEventListener);
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception occurs in the main try and any unregister* call throws in finally, the cleanup exception will replace the original failure, making the root cause harder to diagnose. Consider using the executionError variable to preserve the primary exception: record the original throwable, then wrap unregister* in a try/catch and attach cleanup failures via addSuppressed (or only throw cleanup failures when there was no primary error).

Suggested change
client.unregisterEventListener(eventListener);
client.unregisterEventListener(metricsEventListener);
client.unregisterLifecycleListener(lifecycleListener);
if (logEventListener != null) {
client.unregisterEventListener(logEventListener);
try {
client.unregisterEventListener(eventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering event listener", e);
}
try {
client.unregisterEventListener(metricsEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering metrics event listener", e);
}
try {
client.unregisterLifecycleListener(lifecycleListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering lifecycle listener", e);
}
if (logEventListener != null) {
try {
client.unregisterEventListener(logEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering log event listener", e);
}

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I agree that if unregister*() throws in finally , it can mask the original exception and make the root cause harder to diagnose.

After reconsideration, I moved the listener unregistration to the end of the normal execution path instead of the finally block. The reason is that the problematic case we want to avoid is cross-split reuse when the execution finishes normally; if an exception happens and we exit early, the task will fail and the BinaryLogClient will be recreated on recovery, so the listener accumulation issue should not be hit in that path.

Comment on lines +589 to +603
// To verify that FLINK-39315 is fixed, generate sufficient binlog events,
// so that the MySqlBinlogSplitReadTask runs long enough to exercise the
// context-running checks in binlog reading backfill phase.
for (int i = 0; i < 1000; i++) {
mySqlConnection.execute(
"UPDATE "
+ tableId
+ " SET address = 'Beijing' WHERE id = 106");
mySqlConnection.commit();
mySqlConnection.execute(
"UPDATE "
+ tableId
+ " SET address = 'Shanghai' WHERE id = 106");
mySqlConnection.commit();
}
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds 2000 updates with 2000 commits, which can significantly slow down the unit test and increase flakiness on slower CI environments. Consider batching commits (e.g., commit every N updates or run the loop in a single transaction), reducing iterations to the minimum that still reproduces the issue, or deriving the iteration count from a constant/system property so CI can tune it if needed.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

@chengcongchina chengcongchina Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reduced the loop to 100 loops, which is still sufficient to reproduce the issue when the fix is commented out. This should make the unit test significantly faster and less flaky on CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants