Skip to content

Commit ac61409

Browse files
ic4yTaoZex
andauthored
[bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
--------- Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com>
1 parent c6b633f commit ac61409

File tree

12 files changed

+86
-33
lines changed

12 files changed

+86
-33
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public interface FetchTask<Split> {
4141
/** Returns current task is running or not. */
4242
boolean isRunning();
4343

44+
/** Close this task */
45+
void shutdown();
46+
4447
/** Returns the split that the task used. */
4548
Split getSplit();
4649

@@ -63,5 +66,7 @@ interface Context {
6366
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);
6467

6568
List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);
69+
70+
void close();
6671
}
6772
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,20 @@ private void checkReadException() {
192192
@Override
193193
public void close() {
194194
try {
195+
if (taskContext != null) {
196+
taskContext.close();
197+
}
198+
if (snapshotSplitReadTask != null) {
199+
snapshotSplitReadTask.shutdown();
200+
}
195201
if (executorService != null) {
196202
executorService.shutdown();
197-
if (executorService.awaitTermination(
203+
if (!executorService.awaitTermination(
198204
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
199205
log.warn(
200-
"Failed to close the scan fetcher in {} seconds.",
206+
"Failed to close the scan fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
201207
READER_CLOSE_TIMEOUT_SECONDS);
208+
executorService.shutdownNow();
202209
}
203210
}
204211
} catch (Exception e) {

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,20 @@ private void checkReadException() {
121121
@Override
122122
public void close() {
123123
try {
124+
if (taskContext != null) {
125+
taskContext.close();
126+
}
127+
if (streamFetchTask != null) {
128+
streamFetchTask.shutdown();
129+
}
124130
if (executorService != null) {
125131
executorService.shutdown();
126-
if (executorService.awaitTermination(
132+
if (!executorService.awaitTermination(
127133
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
128134
log.warn(
129-
"Failed to close the stream fetcher in {} seconds.",
135+
"Failed to close the stream fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
130136
READER_CLOSE_TIMEOUT_SECONDS);
137+
executorService.shutdownNow();
131138
}
132139
}
133140
} catch (Exception e) {

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,13 @@
3333
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
3434
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
3535

36-
import com.github.shyiko.mysql.binlog.BinaryLogClient;
37-
import io.debezium.connector.mysql.MySqlConnection;
3836
import io.debezium.jdbc.JdbcConnection;
3937
import io.debezium.relational.TableId;
4038
import io.debezium.relational.history.TableChanges;
4139

4240
import java.sql.SQLException;
4341
import java.util.List;
4442

45-
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
46-
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
4743
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
4844

4945
/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -104,12 +100,7 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
104100
@Override
105101
public MySqlSourceFetchTaskContext createFetchTaskContext(
106102
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
107-
final MySqlConnection jdbcConnection =
108-
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
109-
final BinaryLogClient binaryLogClient =
110-
createBinaryClient(taskSourceConfig.getDbzConfiguration());
111-
return new MySqlSourceFetchTaskContext(
112-
taskSourceConfig, this, jdbcConnection, binaryLogClient);
103+
return new MySqlSourceFetchTaskContext(taskSourceConfig, this);
113104
}
114105

115106
@Override

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,21 @@
6262
import io.debezium.schema.DataCollectionId;
6363
import io.debezium.schema.TopicSelector;
6464
import io.debezium.util.Collect;
65+
import lombok.extern.slf4j.Slf4j;
6566

67+
import java.io.IOException;
68+
import java.sql.SQLException;
6669
import java.time.Instant;
6770
import java.util.ArrayList;
6871
import java.util.List;
6972
import java.util.Map;
7073

7174
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
75+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
76+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
7277

7378
/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
79+
@Slf4j
7480
public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
7581

7682
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
@@ -89,13 +95,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
8995
private MySqlErrorHandler errorHandler;
9096

9197
public MySqlSourceFetchTaskContext(
92-
JdbcSourceConfig sourceConfig,
93-
JdbcDataSourceDialect dataSourceDialect,
94-
MySqlConnection connection,
95-
BinaryLogClient binaryLogClient) {
98+
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
9699
super(sourceConfig, dataSourceDialect);
97-
this.connection = connection;
98-
this.binaryLogClient = binaryLogClient;
100+
this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration());
101+
this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration());
99102
this.metadataProvider = new MySqlEventMetadataProvider();
100103
}
101104

@@ -159,6 +162,18 @@ public void configure(SourceSplitBase sourceSplitBase) {
159162
this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
160163
}
161164

165+
@Override
166+
public void close() {
167+
try {
168+
this.connection.close();
169+
this.binaryLogClient.disconnect();
170+
} catch (SQLException e) {
171+
log.warn("Failed to close connection", e);
172+
} catch (IOException e) {
173+
log.warn("Failed to close binaryLogClient", e);
174+
}
175+
}
176+
162177
@Override
163178
public MySqlSourceConfig getSourceConfig() {
164179
return (MySqlSourceConfig) sourceConfig;

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public boolean isRunning() {
8181
return taskRunning;
8282
}
8383

84+
@Override
85+
public void shutdown() {
86+
taskRunning = false;
87+
}
88+
8489
@Override
8590
public SourceSplitBase getSplit() {
8691
return split;

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ public boolean isRunning() {
155155
return taskRunning;
156156
}
157157

158+
@Override
159+
public void shutdown() {
160+
taskRunning = false;
161+
}
162+
158163
@Override
159164
public SourceSplitBase getSplit() {
160165
return split;

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ public static String quote(TableId tableId) {
339339
private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize)
340340
throws SQLException {
341341
final Connection connection = jdbc.connection();
342+
// Add MySQL metadata locks to prevent modification of table structure.
342343
connection.setAutoCommit(false);
343344
final PreparedStatement statement =
344345
connection.prepareStatement(

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
3535
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;
3636

37-
import io.debezium.connector.sqlserver.SqlServerConnection;
3837
import io.debezium.jdbc.JdbcConnection;
3938
import io.debezium.relational.TableId;
4039
import io.debezium.relational.history.TableChanges;
@@ -104,13 +103,8 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
104103
@Override
105104
public SqlServerSourceFetchTaskContext createFetchTaskContext(
106105
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
107-
final SqlServerConnection jdbcConnection =
108-
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
109-
final SqlServerConnection metaDataConnection =
110-
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
111106

112-
return new SqlServerSourceFetchTaskContext(
113-
taskSourceConfig, this, jdbcConnection, metaDataConnection);
107+
return new SqlServerSourceFetchTaskContext(taskSourceConfig, this);
114108
}
115109

116110
@Override

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,18 @@
5858
import io.debezium.schema.DataCollectionId;
5959
import io.debezium.schema.TopicSelector;
6060
import io.debezium.util.Collect;
61+
import lombok.extern.slf4j.Slf4j;
6162

63+
import java.sql.SQLException;
6264
import java.time.Instant;
6365
import java.util.ArrayList;
6466
import java.util.List;
6567
import java.util.Map;
6668

69+
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
70+
6771
/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
72+
@Slf4j
6873
public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
6974

7075
private final SqlServerConnection dataConnection;
@@ -83,13 +88,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
8388
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
8489

8590
public SqlServerSourceFetchTaskContext(
86-
JdbcSourceConfig sourceConfig,
87-
JdbcDataSourceDialect dataSourceDialect,
88-
SqlServerConnection dataConnection,
89-
SqlServerConnection metadataConnection) {
91+
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
9092
super(sourceConfig, dataSourceDialect);
91-
this.dataConnection = dataConnection;
92-
this.metadataConnection = metadataConnection;
93+
94+
this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
95+
this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
9396
this.metadataProvider = new SqlServerEventMetadataProvider();
9497
}
9598

@@ -158,6 +161,16 @@ public void configure(SourceSplitBase sourceSplitBase) {
158161
this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
159162
}
160163

164+
@Override
165+
public void close() {
166+
try {
167+
this.dataConnection.close();
168+
this.metadataConnection.close();
169+
} catch (SQLException e) {
170+
log.warn("Failed to close connection", e);
171+
}
172+
}
173+
161174
@Override
162175
public SqlServerSourceConfig getSourceConfig() {
163176
return (SqlServerSourceConfig) sourceConfig;

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ public boolean isRunning() {
166166
return taskRunning;
167167
}
168168

169+
@Override
170+
public void shutdown() {
171+
taskRunning = false;
172+
}
173+
169174
@Override
170175
public SourceSplitBase getSplit() {
171176
return split;

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public boolean isRunning() {
7878
return taskRunning;
7979
}
8080

81+
@Override
82+
public void shutdown() {
83+
taskRunning = false;
84+
}
85+
8186
@Override
8287
public SourceSplitBase getSplit() {
8388
return split;

0 commit comments

Comments
 (0)