Skip to content

Commit 3287b1d

Browse files
authored
[Bugfix][connector-cdc-mysql] Fix listener not released when BinlogClient reuse (#5011)
1 parent 3cff923 commit 3287b1d

File tree

1 file changed

+17
-1
lines changed
  • seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch

1 file changed

+17
-1
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;
1919

2020
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
21+
import org.apache.seatunnel.common.utils.ReflectionUtils;
2122
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
2223
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
2324
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
@@ -70,6 +71,7 @@
7071
import java.util.ArrayList;
7172
import java.util.List;
7273
import java.util.Map;
74+
import java.util.Optional;
7375

7476
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
7577
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
@@ -326,13 +328,27 @@ public MySqlTaskContextImpl(
326328
MySqlDatabaseSchema schema,
327329
BinaryLogClient reusedBinaryLogClient) {
328330
super(config, schema);
329-
this.reusedBinaryLogClient = reusedBinaryLogClient;
331+
this.reusedBinaryLogClient = resetBinaryLogClient(reusedBinaryLogClient);
330332
}
331333

332334
@Override
333335
public BinaryLogClient getBinaryLogClient() {
334336
return reusedBinaryLogClient;
335337
}
338+
339+
/** reset the listener of binaryLogClient before fetch task start. */
340+
private BinaryLogClient resetBinaryLogClient(BinaryLogClient binaryLogClient) {
341+
Optional<Object> eventListenersField =
342+
ReflectionUtils.getField(
343+
binaryLogClient, BinaryLogClient.class, "eventListeners");
344+
eventListenersField.ifPresent(o -> ((List<BinaryLogClient.EventListener>) o).clear());
345+
Optional<Object> lifecycleListeners =
346+
ReflectionUtils.getField(
347+
binaryLogClient, BinaryLogClient.class, "lifecycleListeners");
348+
lifecycleListeners.ifPresent(
349+
o -> ((List<BinaryLogClient.LifecycleListener>) o).clear());
350+
return binaryLogClient;
351+
}
336352
}
337353

338354
/** Copied from debezium for accessing here. */

0 commit comments

Comments
 (0)