diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 36111d0fbf4c0c..c43595826e34ec 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -336,6 +336,18 @@ private SplitReadResult prepareStreamSplit( Map offsetMeta, JobBaseRecordRequest baseReq) throws Exception { Tuple2 splitFlag = createStreamSplit(offsetMeta, baseReq); this.streamSplit = splitFlag.f0.asStreamSplit(); + + // Close previous stream reader to release resources (e.g. PG replication slot) + // before creating a new one. This prevents connection leaks when a cancelled + // task's reader is still active while a new task arrives. + if (this.streamReader != null) { + LOG.info( + "Closing previous stream reader before creating new one for job {}", + baseReq.getJobId()); + closeReaderInternal(this.streamReader); + this.streamReader = null; + } + this.streamReader = getBinlogSplitReader(baseReq); LOG.info("Prepare stream split: {}", this.streamSplit.toString()); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 83c9b349e4e5ad..5675b3dd835b5b 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -341,6 +341,18 @@ private SplitReadResult prepareBinlogSplit( Map offsetMeta, JobBaseRecordRequest baseReq) throws Exception { Tuple2 splitFlag = createBinlogSplit(offsetMeta, baseReq); this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0; + + // Close previous binlog reader to release resources before creating a new one. + // This prevents connection leaks when a cancelled task's reader is still active + // while a new task arrives. + if (this.binlogReader != null) { + LOG.info( + "Closing previous binlog reader before creating new one for job {}", + baseReq.getJobId()); + this.binlogReader.close(); + this.binlogReader = null; + } + this.binlogReader = getBinlogSplitReader(baseReq); LOG.info("Prepare binlog split: {}", this.binlogSplit.toString());