From afefe40944e61487da12fac75c833fc9c477e62f Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Tue, 13 Jun 2023 18:01:43 +0800 Subject: [PATCH] [cdc-base] Close idle readers when snapshot finished (#2202) * [cdc-base] Close idle readers when snapshot finished --- .../base/config/BaseSourceConfig.java | 8 ++ .../base/config/JdbcSourceConfig.java | 2 + .../base/config/JdbcSourceConfigFactory.java | 16 +++ .../connectors/base/config/SourceConfig.java | 2 + .../base/options/SourceOptions.java | 11 ++ .../source/assigner/HybridSplitAssigner.java | 5 + .../base/source/assigner/SplitAssigner.java | 5 + .../source/assigner/StreamSplitAssigner.java | 5 + .../IncrementalSourceEnumerator.java | 8 ++ .../base/utils/EnvironmentUtils.java | 43 +++++++ .../base/utils/VersionComparable.java | 107 ++++++++++++++++++ .../base/experimental/MySqlSourceBuilder.java | 15 +++ .../config/MySqlSourceConfig.java | 2 + .../config/MySqlSourceConfigFactory.java | 3 + .../cdc/connectors/tests/MongoE2eITCase.java | 7 +- .../cdc/connectors/tests/MySqlE2eITCase.java | 8 +- .../mongodb/source/MongoDBSourceBuilder.java | 18 ++- .../source/config/MongoDBSourceConfig.java | 14 ++- .../config/MongoDBSourceConfigFactory.java | 24 +++- .../mongodb/table/MongoDBTableSource.java | 15 ++- .../table/MongoDBTableSourceFactory.java | 6 +- .../MongoDBParallelSourceExampleTest.java | 8 +- .../table/MongoDBTableFactoryTest.java | 13 ++- .../mysql/source/MySqlSourceBuilder.java | 15 +++ .../assigners/MySqlHybridSplitAssigner.java | 5 + .../source/assigners/MySqlSplitAssigner.java | 5 + .../source/config/MySqlSourceConfig.java | 7 ++ .../config/MySqlSourceConfigFactory.java | 19 ++++ .../source/config/MySqlSourceOptions.java | 10 ++ .../enumerator/MySqlSourceEnumerator.java | 8 ++ .../mysql/source/utils/EnvironmentUtils.java | 43 +++++++ .../mysql/source/utils/VersionComparable.java | 107 ++++++++++++++++++ .../mysql/table/MySqlTableSource.java | 7 ++ .../mysql/table/MySqlTableSourceFactory.java | 5 + .../table/MySqlTableSourceFactoryTest.java | 12 ++ .../oracle/source/OracleSourceBuilder.java | 15 +++ .../source/config/OracleSourceConfig.java | 2 + .../config/OracleSourceConfigFactory.java | 3 + .../oracle/table/OracleTableSource.java | 15 ++- .../table/OracleTableSourceFactory.java | 7 +- .../table/OracleTableSourceFactoryTest.java | 24 ++-- .../source/config/SqlServerSourceConfig.java | 2 + .../config/SqlServerSourceConfigFactory.java | 3 + 43 files changed, 628 insertions(+), 31 deletions(-) create mode 100644 flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java create mode 100644 flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/EnvironmentUtils.java create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/VersionComparable.java diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java index 824dac83a7..46b95b39ce 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java @@ -33,6 +33,7 @@ public abstract class BaseSourceConfig implements SourceConfig { protected final double distributionFactorUpper; protected final double distributionFactorLower; protected final boolean includeSchemaChanges; + protected final boolean closeIdleReaders; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -47,6 +48,7 @@ public BaseSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration) { this.startupOptions = startupOptions; @@ -55,6 +57,7 @@ public BaseSourceConfig( this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; + this.closeIdleReaders = closeIdleReaders; this.dbzProperties = dbzProperties; this.dbzConfiguration = dbzConfiguration; } @@ -87,6 +90,11 @@ public boolean isIncludeSchemaChanges() { return includeSchemaChanges; } + @Override + public boolean isCloseIdleReaders() { + return closeIdleReaders; + } + public Properties getDbzProperties() { return dbzProperties; } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java index 3b5fa905c0..27dbf95452 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java @@ -53,6 +53,7 @@ public JdbcSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration, String driverClassName, @@ -73,6 +74,7 @@ public JdbcSourceConfig( distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, dbzProperties, dbzConfiguration); this.driverClassName = driverClassName; diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java index d7f1cd1e34..455721d2f0 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java @@ -42,6 +42,7 @@ public abstract class JdbcSourceConfigFactory implements Factory tableList; protected StartupOptions startupOptions = StartupOptions.initial(); protected boolean includeSchemaChanges = false; + protected boolean closeIdleReaders = false; protected double distributionFactorUpper = SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(); protected double distributionFactorLower = @@ -209,6 +210,21 @@ public JdbcSourceConfigFactory startupOptions(StartupOptions startupOptions) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public JdbcSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) { + this.closeIdleReaders = closeIdleReaders; + return this; + } + @Override public abstract JdbcSourceConfig create(int subtask); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java index 5dcd2b5328..be5accb5ec 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java @@ -34,6 +34,8 @@ public interface SourceConfig extends Serializable { boolean isIncludeSchemaChanges(); + boolean isCloseIdleReaders(); + /** Factory for the {@code SourceConfig}. */ @FunctionalInterface interface Factory extends Serializable { diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java index 4c4ed87106..ac0f8ad5fa 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.base.options; +import org.apache.flink.annotation.Experimental; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -111,4 +112,14 @@ public class SourceOptions { + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14 when enabling this feature."); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java index f73897d494..d47184d975 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java @@ -170,6 +170,11 @@ public void notifyCheckpointComplete(long checkpointId) { snapshotSplitAssigner.notifyCheckpointComplete(checkpointId); } + @Override + public boolean isStreamSplitAssigned() { + return isStreamSplitAssigned; + } + @Override public void close() { snapshotSplitAssigner.close(); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java index d0a72fdbf2..26f88e9497 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java @@ -55,6 +55,11 @@ public interface SplitAssigner { */ boolean waitingForFinishedSplits(); + /** Whether the split assigner is finished stream split assigning. */ + default boolean isStreamSplitAssigned() { + throw new UnsupportedOperationException("Not support to assigning StreamSplit."); + } + /** * Gets the finished splits' information. This is useful metadata to generate a stream split * that considering finished snapshot splits. diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java index 02406cd10a..7cec9ee6b2 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java @@ -117,6 +117,11 @@ public void notifyCheckpointComplete(long checkpointId) { // nothing to do } + @Override + public boolean isStreamSplitAssigned() { + return isStreamSplitAssigned; + } + @Override public void close() {} diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java index c8cf10c9fb..4fa7abc473 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java @@ -163,6 +163,14 @@ private void assignSplits() { continue; } + if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) { + // close idle readers when snapshot phase finished. + context.signalNoMoreSplits(nextAwaiting); + awaitingReader.remove(); + LOG.info("Close idle reader of subtask {}", nextAwaiting); + continue; + } + Optional split = splitAssigner.getNext(); if (split.isPresent()) { final SourceSplitBase sourceSplit = split.get(); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java new file mode 100644 index 0000000000..970012e14e --- /dev/null +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.utils; + +import org.apache.flink.runtime.util.EnvironmentInformation; + +/** Utilities for environment information at runtime. */ +public class EnvironmentUtils { + + private EnvironmentUtils() {} + + private static final VersionComparable FLINK_1_14 = VersionComparable.fromVersionString("1.14"); + + public static VersionComparable runtimeFlinkVersion() { + return VersionComparable.fromVersionString(EnvironmentInformation.getVersion()); + } + + public static boolean supportCheckpointsAfterTasksFinished() { + return runtimeFlinkVersion().newerThanOrEqualTo(FLINK_1_14); + } + + public static void checkSupportCheckpointsAfterTasksFinished(boolean closeIdleReaders) { + if (closeIdleReaders && !supportCheckpointsAfterTasksFinished()) { + throw new UnsupportedOperationException( + "The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true. But the current version is " + + runtimeFlinkVersion()); + } + } +} diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java new file mode 100644 index 0000000000..01218942ae --- /dev/null +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java @@ -0,0 +1,107 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.utils; + +/** Used to compare version numbers at runtime. */ +public class VersionComparable implements Comparable { + + private int majorVersion; + private int minorVersion; + private int patchVersion; + private String versionString; + + private VersionComparable(String versionString) { + this.versionString = versionString; + try { + int pos = versionString.indexOf('-'); + String numberPart = versionString; + if (pos > 0) { + numberPart = versionString.substring(0, pos); + } + + String[] versions = numberPart.split("\\."); + + this.majorVersion = Integer.parseInt(versions[0]); + this.minorVersion = Integer.parseInt(versions[1]); + if (versions.length == 3) { + this.patchVersion = Integer.parseInt(versions[2]); + } + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Can not recognize version %s.", versionString)); + } + } + + public int getMajorVersion() { + return majorVersion; + } + + public int getMinorVersion() { + return minorVersion; + } + + public int getPatchVersion() { + return patchVersion; + } + + public static VersionComparable fromVersionString(String versionString) { + return new VersionComparable(versionString); + } + + @Override + public int compareTo(VersionComparable version) { + if (equalTo(version)) { + return 0; + } else if (newerThan(version)) { + return 1; + } else { + return -1; + } + } + + public boolean equalTo(VersionComparable version) { + return majorVersion == version.majorVersion + && minorVersion == version.minorVersion + && patchVersion == version.patchVersion; + } + + public boolean newerThan(VersionComparable version) { + if (majorVersion <= version.majorVersion) { + if (majorVersion < version.majorVersion) { + return false; + } else { + if (minorVersion <= version.minorVersion) { + if (minorVersion < version.patchVersion) { + return false; + } else { + return patchVersion > version.patchVersion; + } + } + } + } + return true; + } + + public boolean newerThanOrEqualTo(VersionComparable version) { + return newerThan(version) || equalTo(version); + } + + @Override + public String toString() { + return versionString; + } +} diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java index 187dea6cd6..8933a25ca6 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java @@ -205,6 +205,21 @@ public MySqlSourceBuilder debeziumProperties(Properties properties) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MySqlSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java index dcd6fcaa8c..4e79dd0c62 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java @@ -45,6 +45,7 @@ public MySqlSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration, String driverClassName, @@ -66,6 +67,7 @@ public MySqlSourceConfig( distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, dbzProperties, dbzConfiguration, driverClassName, diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java index db291aab91..f97439ddc6 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java @@ -24,6 +24,7 @@ import java.util.Properties; import java.util.UUID; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; import static org.apache.flink.util.Preconditions.checkNotNull; /** A factory to initialize {@link MySqlSourceConfig}. */ @@ -47,6 +48,7 @@ public MySqlSourceConfigFactory serverId(String serverId) { /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig create(int subtaskId) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); Properties props = new Properties(); // hard code server name, because we don't need to distinguish it, docs: // Logical name that identifies and provides a namespace for the particular @@ -113,6 +115,7 @@ public MySqlSourceConfig create(int subtaskId) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, props, dbzConfiguration, driverClassName, diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java index e416525ccd..1fa89260bb 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java @@ -46,6 +46,7 @@ import java.util.Random; import java.util.stream.Stream; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGODB_PORT; @@ -141,6 +142,7 @@ public void testMongoDbCDC() throws Exception { List sqlLines = Arrays.asList( "SET 'execution.checkpointing.interval' = '3s';", + "SET 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';", "CREATE TABLE products_source (", " _id STRING NOT NULL,", " name STRING,", @@ -156,7 +158,10 @@ public void testMongoDbCDC() throws Exception { " 'password' = '" + FLINK_USER_PASSWORD + "',", " 'collection' = 'products',", " 'heartbeat.interval.ms' = '1000',", - " 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "'", + " 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "',", + " 'scan.incremental.close-idle-reader.enabled' = '" + + supportCheckpointsAfterTasksFinished() + + "'", ");", "CREATE TABLE mongodb_products_sink (", " `id` STRING NOT NULL,", diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java index f35cc45dc3..c565513c86 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MySqlE2eITCase.java @@ -32,6 +32,8 @@ import java.util.Arrays; import java.util.List; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished; + /** End-to-end tests for mysql-cdc connector uber jar. */ public class MySqlE2eITCase extends FlinkContainerTestEnvironment { @@ -44,6 +46,7 @@ public void testMySqlCDC() throws Exception { List sqlLines = Arrays.asList( "SET 'execution.checkpointing.interval' = '3s';", + "SET 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';", "CREATE TABLE products_source (", " `id` INT NOT NULL,", " name STRING,", @@ -63,7 +66,10 @@ public void testMySqlCDC() throws Exception { " 'table-name' = 'products_source',", " 'server-time-zone' = 'UTC',", " 'server-id' = '5800-5900',", - " 'scan.incremental.snapshot.chunk.size' = '4'", + " 'scan.incremental.snapshot.chunk.size' = '4',", + " 'scan.incremental.close-idle-reader.enabled' = '" + + supportCheckpointsAfterTasksFinished() + + "'", ");", "CREATE TABLE products_sink (", " `id` INT NOT NULL,", diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java index 8a2f76b6af..d0bda19c20 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java @@ -183,6 +183,23 @@ public MongoDBSourceBuilder splitMetaGroupSize(int splitMetaGroupSize) { return this; } + /** + * scan.incremental.close-idle-reader.enabled + * + *

Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MongoDBSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. @@ -198,7 +215,6 @@ public MongoDBSourceBuilder deserializer(DebeziumDeserializationSchema des * @return a MongoDBParallelSource with the settings made for this builder. */ public MongoDBSource build() { - configFactory.validate(); return new MongoDBSource<>(configFactory, checkNotNull(deserializer)); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java index 9f0e43dbcf..cc082a322d 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java @@ -48,6 +48,7 @@ public class MongoDBSourceConfig implements SourceConfig { private final int heartbeatIntervalMillis; private final int splitMetaGroupSize; private final int splitSizeMB; + private final boolean closeIdleReaders; MongoDBSourceConfig( String scheme, @@ -64,7 +65,8 @@ public class MongoDBSourceConfig implements SourceConfig { StartupOptions startupOptions, int heartbeatIntervalMillis, int splitMetaGroupSize, - int splitSizeMB) { + int splitSizeMB, + boolean closeIdleReaders) { this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); this.username = username; @@ -81,6 +83,7 @@ public class MongoDBSourceConfig implements SourceConfig { this.heartbeatIntervalMillis = heartbeatIntervalMillis; this.splitMetaGroupSize = splitMetaGroupSize; this.splitSizeMB = splitSizeMB; + this.closeIdleReaders = closeIdleReaders; } public String getScheme() { @@ -155,6 +158,11 @@ public boolean isIncludeSchemaChanges() { return false; } + @Override + public boolean isCloseIdleReaders() { + return closeIdleReaders; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -172,6 +180,7 @@ public boolean equals(Object o) { && heartbeatIntervalMillis == that.heartbeatIntervalMillis && splitMetaGroupSize == that.splitMetaGroupSize && splitSizeMB == that.splitSizeMB + && closeIdleReaders == that.closeIdleReaders && Objects.equals(scheme, that.scheme) && Objects.equals(hosts, that.hosts) && Objects.equals(username, that.username) @@ -198,6 +207,7 @@ public int hashCode() { startupOptions, heartbeatIntervalMillis, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + closeIdleReaders); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java index 9ff96a6e09..70034c78a0 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java @@ -25,6 +25,7 @@ import java.util.List; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; @@ -52,11 +53,12 @@ public class MongoDBSourceConfigFactory implements Factory private Integer batchSize = BATCH_SIZE.defaultValue(); private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue(); private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue(); - private Boolean updateLookup = true; + private boolean updateLookup = true; private StartupOptions startupOptions = StartupOptions.initial(); private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue(); private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue(); private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue(); + private boolean closeIdleReaders = false; /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */ public MongoDBSourceConfigFactory scheme(String scheme) { @@ -202,14 +204,25 @@ public MongoDBSourceConfigFactory splitMetaGroupSize(int splitMetaGroupSize) { return this; } - /** Validate required options. */ - public void validate() { - checkNotNull(hosts, "hosts must be provided"); + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MongoDBSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) { + this.closeIdleReaders = closeIdleReaders; + return this; } /** Creates a new {@link MongoDBSourceConfig} for the given subtask {@code subtaskId}. */ @Override public MongoDBSourceConfig create(int subtaskId) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); return new MongoDBSourceConfig( scheme, hosts, @@ -225,6 +238,7 @@ public MongoDBSourceConfig create(int subtaskId) { startupOptions, heartbeatIntervalMillis, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + closeIdleReaders); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index cfce1b8227..a29a3a384b 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -79,6 +79,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final boolean enableParallelRead; private final Integer splitMetaGroupSize; private final Integer splitSizeMB; + private final boolean closeIdlerReaders; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -108,7 +109,8 @@ public MongoDBTableSource( ZoneId localTimeZone, boolean enableParallelRead, @Nullable Integer splitMetaGroupSize, - @Nullable Integer splitSizeMB) { + @Nullable Integer splitSizeMB, + boolean closeIdlerReaders) { this.physicalSchema = physicalSchema; this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); @@ -129,6 +131,7 @@ public MongoDBTableSource( this.enableParallelRead = enableParallelRead; this.splitMetaGroupSize = splitMetaGroupSize; this.splitSizeMB = splitSizeMB; + this.closeIdlerReaders = closeIdlerReaders; } @Override @@ -178,6 +181,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { MongoDBSource.builder() .scheme(scheme) .hosts(hosts) + .closeIdleReaders(closeIdlerReaders) .deserializer(deserializer); Optional.ofNullable(databaseList).ifPresent(builder::databaseList); @@ -271,7 +275,8 @@ public DynamicTableSource copy() { localTimeZone, enableParallelRead, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + closeIdlerReaders); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -305,7 +310,8 @@ public boolean equals(Object o) { && Objects.equals(splitMetaGroupSize, that.splitMetaGroupSize) && Objects.equals(splitSizeMB, that.splitSizeMB) && Objects.equals(producedDataType, that.producedDataType) - && Objects.equals(metadataKeys, that.metadataKeys); + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(closeIdlerReaders, that.closeIdlerReaders); } @Override @@ -330,7 +336,8 @@ public int hashCode() { splitMetaGroupSize, splitSizeMB, producedDataType, - metadataKeys); + metadataKeys, + closeIdlerReaders); } @Override diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index fd48a20005..f3180f573f 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -33,6 +33,7 @@ import java.util.Set; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS; @@ -92,6 +93,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { : ZoneId.of(zoneId); boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + boolean enableCloseIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE); @@ -121,7 +123,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { localTimeZone, enableParallelRead, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + enableCloseIdleReaders); } private void checkPrimaryKey(UniqueConstraint pk, String message) { @@ -160,6 +163,7 @@ public Set> optionalOptions() { options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); options.add(CHUNK_META_GROUP_SIZE); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); return options; } } diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java index 2b2cdbb890..aed4a29526 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.mongodb.source; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; @@ -25,6 +26,7 @@ import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; +import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH; /** Example Tests for {@link MongoDBSource}. */ public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase { @@ -42,9 +44,13 @@ public void testMongoDBExampleSource() throws Exception { .username(FLINK_USER) .password(FLINK_USER_PASSWORD) .deserializer(new JsonDebeziumDeserializationSchema()) + .closeIdleReaders(true) .build(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration config = new Configuration(); + config.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); // enable checkpoint env.enableCheckpointing(3000); // set the source parallelism to 2 diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index f7b84e193e..9c3d49c00d 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -45,6 +45,7 @@ import java.util.Map; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; @@ -103,6 +104,8 @@ public class MongoDBTableFactoryTest { private static final int SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue(); private static final int CHUNK_META_GROUP_SIZE_DEFAULT = CHUNK_META_GROUP_SIZE.defaultValue(); + private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT = + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(); @Test public void testCommonProperties() { @@ -129,7 +132,8 @@ public void testCommonProperties() { LOCAL_TIME_ZONE, SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT, CHUNK_META_GROUP_SIZE_DEFAULT, - SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT); + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); assertEquals(expectedSource, actualSource); } @@ -147,6 +151,7 @@ public void testOptionalProperties() { options.put("scan.incremental.snapshot.enabled", "true"); options.put("chunk-meta.group.size", "1001"); options.put("scan.incremental.snapshot.chunk.size.mb", "10"); + options.put("scan.incremental.close-idle-reader.enabled", "true"); DynamicTableSource actualSource = createTableSource(SCHEMA, options); MongoDBTableSource expectedSource = @@ -168,7 +173,8 @@ public void testOptionalProperties() { LOCAL_TIME_ZONE, true, 1001, - 10); + 10, + true); assertEquals(expectedSource, actualSource); } @@ -203,7 +209,8 @@ public void testMetadataColumns() { LOCAL_TIME_ZONE, SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT, CHUNK_META_GROUP_SIZE_DEFAULT, - SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT); + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 35b8645560..53b5e963ce 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -237,6 +237,21 @@ public MySqlSourceBuilder heartbeatInterval(Duration heartbeatInterval) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MySqlSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index e92a35257c..780d9d531d 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -129,6 +129,11 @@ public boolean waitingForFinishedSplits() { return snapshotSplitAssigner.waitingForFinishedSplits(); } + @Override + public boolean isStreamSplitAssigned() { + return isBinlogSplitAssigned; + } + @Override public List getFinishedSplitInfos() { return snapshotSplitAssigner.getFinishedSplitInfos(); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java index b7032c88e9..9e9fe28541 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java @@ -53,6 +53,11 @@ public interface MySqlSplitAssigner { */ boolean waitingForFinishedSplits(); + /** Whether the split assigner is finished stream split assigning. */ + default boolean isStreamSplitAssigned() { + throw new UnsupportedOperationException("Not support to assigning StreamSplit."); + } + /** * Gets the finished splits' information. This is useful metadata to generate a binlog split * that considering finished snapshot splits. diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 35026bb39f..69157c9853 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -57,6 +57,7 @@ public class MySqlSourceConfig implements Serializable { private final double distributionFactorLower; private final boolean includeSchemaChanges; private final boolean scanNewlyAddedTableEnabled; + private final boolean closeIdleReaders; private final Properties jdbcProperties; private final Map chunkKeyColumns; @@ -87,6 +88,7 @@ public class MySqlSourceConfig implements Serializable { double distributionFactorLower, boolean includeSchemaChanges, boolean scanNewlyAddedTableEnabled, + boolean closeIdleReaders, Properties dbzProperties, Properties jdbcProperties, Map chunkKeyColumns) { @@ -109,6 +111,7 @@ public class MySqlSourceConfig implements Serializable { this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; + this.closeIdleReaders = closeIdleReaders; this.dbzProperties = checkNotNull(dbzProperties); this.dbzConfiguration = Configuration.from(dbzProperties); this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration); @@ -193,6 +196,10 @@ public boolean isScanNewlyAddedTableEnabled() { return scanNewlyAddedTableEnabled; } + public boolean isCloseIdleReaders() { + return closeIdleReaders; + } + public Properties getDbzProperties() { return dbzProperties; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index cd5f4db7fa..843ce749e0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -42,6 +42,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static com.ververica.cdc.connectors.mysql.source.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; import static org.apache.flink.util.Preconditions.checkNotNull; /** A factory to construct {@link MySqlSourceConfig}. */ @@ -71,6 +72,7 @@ public class MySqlSourceConfigFactory implements Serializable { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; private boolean scanNewlyAddedTableEnabled = false; + private boolean closeIdleReaders = false; private Properties jdbcProperties; private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue(); private Properties dbzProperties; @@ -260,8 +262,24 @@ public MySqlSourceConfigFactory debeziumProperties(Properties properties) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MySqlSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) { + this.closeIdleReaders = closeIdleReaders; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); Properties props = new Properties(); // hard code server name, because we don't need to distinguish it, docs: // Logical name that identifies and provides a namespace for the particular @@ -342,6 +360,7 @@ public MySqlSourceConfig createConfig(int subtaskId) { distributionFactorLower, includeSchemaChanges, scanNewlyAddedTableEnabled, + closeIdleReaders, props, jdbcProperties, chunkKeyColumns); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index 3912f01def..11a8e15694 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -245,4 +245,14 @@ public class MySqlSourceOptions { "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." + "By default, the chunk key is the first column of the primary key." + "This column must be a column of the primary key."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14 when enabling this feature."); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java index a19bf713e7..06a5f7f7dd 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java @@ -187,6 +187,14 @@ private void assignSplits() { continue; } + if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) { + // close idle readers when snapshot phase finished. + context.signalNoMoreSplits(nextAwaiting); + awaitingReader.remove(); + LOG.info("Close idle reader of subtask {}", nextAwaiting); + continue; + } + Optional split = splitAssigner.getNext(); if (split.isPresent()) { final MySqlSplit mySqlSplit = split.get(); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/EnvironmentUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/EnvironmentUtils.java new file mode 100644 index 0000000000..fd4601fd95 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/EnvironmentUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.utils; + +import org.apache.flink.runtime.util.EnvironmentInformation; + +/** Utilities for environment information at runtime. */ +public class EnvironmentUtils { + + private EnvironmentUtils() {} + + private static final VersionComparable FLINK_1_14 = VersionComparable.fromVersionString("1.14"); + + public static VersionComparable runtimeFlinkVersion() { + return VersionComparable.fromVersionString(EnvironmentInformation.getVersion()); + } + + public static boolean supportCheckpointsAfterTasksFinished() { + return runtimeFlinkVersion().newerThanOrEqualTo(FLINK_1_14); + } + + public static void checkSupportCheckpointsAfterTasksFinished(boolean closeIdleReaders) { + if (closeIdleReaders && !supportCheckpointsAfterTasksFinished()) { + throw new UnsupportedOperationException( + "The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true. But the current version is " + + runtimeFlinkVersion()); + } + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/VersionComparable.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/VersionComparable.java new file mode 100644 index 0000000000..2dc008d79d --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/VersionComparable.java @@ -0,0 +1,107 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.utils; + +/** Used to compare version numbers at runtime. */ +public class VersionComparable implements Comparable { + + private int majorVersion; + private int minorVersion; + private int patchVersion; + private String versionString; + + private VersionComparable(String versionString) { + this.versionString = versionString; + try { + int pos = versionString.indexOf('-'); + String numberPart = versionString; + if (pos > 0) { + numberPart = versionString.substring(0, pos); + } + + String[] versions = numberPart.split("\\."); + + this.majorVersion = Integer.parseInt(versions[0]); + this.minorVersion = Integer.parseInt(versions[1]); + if (versions.length == 3) { + this.patchVersion = Integer.parseInt(versions[2]); + } + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Can not recognize version %s.", versionString)); + } + } + + public int getMajorVersion() { + return majorVersion; + } + + public int getMinorVersion() { + return minorVersion; + } + + public int getPatchVersion() { + return patchVersion; + } + + public static VersionComparable fromVersionString(String versionString) { + return new VersionComparable(versionString); + } + + @Override + public int compareTo(VersionComparable version) { + if (equalTo(version)) { + return 0; + } else if (newerThan(version)) { + return 1; + } else { + return -1; + } + } + + public boolean equalTo(VersionComparable version) { + return majorVersion == version.majorVersion + && minorVersion == version.minorVersion + && patchVersion == version.patchVersion; + } + + public boolean newerThan(VersionComparable version) { + if (majorVersion <= version.majorVersion) { + if (majorVersion < version.majorVersion) { + return false; + } else { + if (minorVersion <= version.minorVersion) { + if (minorVersion < version.patchVersion) { + return false; + } else { + return patchVersion > version.patchVersion; + } + } + } + } + return true; + } + + public boolean newerThanOrEqualTo(VersionComparable version) { + return newerThan(version) || equalTo(version); + } + + @Override + public String toString() { + return versionString; + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index 8c90e6fbd6..095905c93e 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -77,6 +77,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final double distributionFactorLower; private final StartupOptions startupOptions; private final boolean scanNewlyAddedTableEnabled; + private final boolean closeIdleReaders; private final Properties jdbcProperties; private final Duration heartbeatInterval; private final String chunkKeyColumn; @@ -113,6 +114,7 @@ public MySqlTableSource( double distributionFactorLower, StartupOptions startupOptions, boolean scanNewlyAddedTableEnabled, + boolean closeIdleReaders, Properties jdbcProperties, Duration heartbeatInterval, @Nullable String chunkKeyColumn) { @@ -137,6 +139,7 @@ public MySqlTableSource( this.distributionFactorLower = distributionFactorLower; this.startupOptions = startupOptions; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; + this.closeIdleReaders = closeIdleReaders; this.jdbcProperties = jdbcProperties; // Mutable attributes this.producedDataType = physicalSchema.toPhysicalRowDataType(); @@ -190,6 +193,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .startupOptions(startupOptions) .deserializer(deserializer) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) + .closeIdleReaders(closeIdleReaders) .jdbcProperties(jdbcProperties) .heartbeatInterval(heartbeatInterval) .chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn) @@ -270,6 +274,7 @@ public DynamicTableSource copy() { distributionFactorLower, startupOptions, scanNewlyAddedTableEnabled, + closeIdleReaders, jdbcProperties, heartbeatInterval, chunkKeyColumn); @@ -295,6 +300,7 @@ public boolean equals(Object o) { && distributionFactorUpper == that.distributionFactorUpper && distributionFactorLower == that.distributionFactorLower && scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled + && closeIdleReaders == that.closeIdleReaders && Objects.equals(physicalSchema, that.physicalSchema) && Objects.equals(hostname, that.hostname) && Objects.equals(database, that.database) @@ -341,6 +347,7 @@ public int hashCode() { producedDataType, metadataKeys, scanNewlyAddedTableEnabled, + closeIdleReaders, jdbcProperties, heartbeatInterval, chunkKeyColumn); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 15c25e87c2..b025835967 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -56,6 +56,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; @@ -119,6 +120,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + if (enableParallelRead) { validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); @@ -154,6 +157,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { distributionFactorLower, startupOptions, scanNewlyAddedTableEnabled, + closeIdleReaders, JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()), heartbeatInterval, chunkKeyColumn); @@ -198,6 +202,7 @@ public Set> optionalOptions() { options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(CONNECT_MAX_RETRIES); options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); options.add(HEARTBEAT_INTERVAL); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); return options; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index cf5ecaa09d..d001baea9b 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -121,6 +121,7 @@ public void testCommonProperties() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -165,6 +166,7 @@ public void testEnableParallelReadSource() { 0.01d, StartupOptions.initial(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), "testCol"); @@ -205,6 +207,7 @@ public void testEnableParallelReadSourceWithSingleServerId() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -243,6 +246,7 @@ public void testEnableParallelReadSourceLatestOffset() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.latest(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -260,6 +264,7 @@ public void testOptionalProperties() { options.put("jdbc.properties.useSSL", "false"); options.put("heartbeat.interval", "15213ms"); options.put("scan.incremental.snapshot.chunk.key-column", "testCol"); + options.put("scan.incremental.close-idle-reader.enabled", "true"); DynamicTableSource actualSource = createTableSource(options); Properties dbzProperties = new Properties(); @@ -289,6 +294,7 @@ public void testOptionalProperties() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), true, + true, jdbcProperties, Duration.ofMillis(15213), "testCol"); @@ -333,6 +339,7 @@ public void testStartupFromSpecificOffset() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.specificOffset(offsetFile, offsetPos), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -369,6 +376,7 @@ public void testStartupFromInitial() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -406,6 +414,7 @@ public void testStartupFromEarliestOffset() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.earliest(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -444,6 +453,7 @@ public void testStartupFromSpecificTimestamp() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.timestamp(0L), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -480,6 +490,7 @@ public void testStartupFromLatestOffset() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.latest(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); @@ -521,6 +532,7 @@ public void testMetadataColumns() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, + false, new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null); diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java index 4d7b38156f..9517db29da 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java @@ -203,6 +203,21 @@ public OracleSourceBuilder debeziumProperties(Properties properties) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public OracleSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java index 1e97aee446..478611d8cc 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java @@ -47,6 +47,7 @@ public OracleSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration, String driverClassName, @@ -70,6 +71,7 @@ public OracleSourceConfig( distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, dbzProperties, dbzConfiguration, driverClassName, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java index d35016249f..c40686556f 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java @@ -28,6 +28,7 @@ import java.util.Properties; import java.util.UUID; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; import static org.apache.flink.util.Preconditions.checkNotNull; /** A factory to initialize {@link OracleSourceConfig}. */ @@ -58,6 +59,7 @@ public JdbcSourceConfigFactory schemaList(String... schemaList) { /** Creates a new {@link OracleSourceConfig} for the given subtask {@code subtaskId}. */ public OracleSourceConfig create(int subtaskId) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); Properties props = new Properties(); props.setProperty("connector.class", OracleConnector.class.getCanonicalName()); // Logical name that identifies and provides a namespace for the particular Oracle @@ -114,6 +116,7 @@ public OracleSourceConfig create(int subtaskId) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, props, dbzConfiguration, DRIVER_ClASS_NAME, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java index 0f5ccfe3a1..c0cdd162e1 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java @@ -77,6 +77,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada private final double distributionFactorUpper; private final double distributionFactorLower; private final String chunkKeyColumn; + private final boolean closeIdleReaders; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -109,7 +110,8 @@ public OracleTableSource( int connectionPoolSize, double distributionFactorUpper, double distributionFactorLower, - @Nullable String chunkKeyColumn) { + @Nullable String chunkKeyColumn, + boolean closeIdleReaders) { this.physicalSchema = physicalSchema; this.url = url; this.port = port; @@ -133,6 +135,7 @@ public OracleTableSource( this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.chunkKeyColumn = chunkKeyColumn; + this.closeIdleReaders = closeIdleReaders; } @Override @@ -178,6 +181,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .connectMaxRetries(connectMaxRetries) .distributionFactorUpper(distributionFactorUpper) .distributionFactorLower(distributionFactorLower) + .closeIdleReaders(closeIdleReaders) .build(); return SourceProvider.of(oracleChangeEventSource); @@ -241,7 +245,8 @@ public DynamicTableSource copy() { connectionPoolSize, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -278,7 +283,8 @@ public boolean equals(Object o) { && Objects.equals(connectionPoolSize, that.connectionPoolSize) && Objects.equals(distributionFactorUpper, that.distributionFactorUpper) && Objects.equals(distributionFactorLower, that.distributionFactorLower) - && Objects.equals(chunkKeyColumn, that.chunkKeyColumn); + && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) + && Objects.equals(closeIdleReaders, that.closeIdleReaders); } @Override @@ -306,7 +312,8 @@ public int hashCode() { connectionPoolSize, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java index 879cecff9b..65d20a09a0 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -39,6 +39,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME; @@ -93,6 +94,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); String serverTimezone = config.get(SERVER_TIME_ZONE); + boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + if (enableParallelRead) { validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); @@ -126,7 +129,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { connectionPoolSize, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdlerReaders); } @Override @@ -162,6 +166,7 @@ public Set> optionalOptions() { options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); return options; } diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java index d0b9a6a858..4c95091e19 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java @@ -118,7 +118,9 @@ public void testRequiredProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED + .defaultValue()); assertEquals(expectedSource, actualSource); } @@ -152,7 +154,8 @@ public void testCommonProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -190,7 +193,8 @@ public void testOptionalProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -214,6 +218,8 @@ public void testScanIncrementalProperties() { String.valueOf(chunkSize)); options.put(SourceOptions.CHUNK_META_GROUP_SIZE.key(), String.valueOf(splitMetaGroupSize)); options.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize)); + options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), "true"); + options.put( JdbcSourceOptions.CONNECT_TIMEOUT.key(), String.format("%ds", connectTimeout.getSeconds())); @@ -251,7 +257,8 @@ public void testScanIncrementalProperties() { connectPoolSize, distributionFactorUpper, distributionFactorLower, - null); + null, + true); assertEquals(expectedSource, actualSource); } @@ -286,7 +293,8 @@ public void testStartupFromInitial() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -321,7 +329,8 @@ public void testStartupFromLatestOffset() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -360,7 +369,8 @@ public void testMetadataColumns() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name", "schema_name"); diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java index fd842a4a6a..8ee666272d 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java @@ -41,6 +41,7 @@ public SqlServerSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration, String driverClassName, @@ -63,6 +64,7 @@ public SqlServerSourceConfig( distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, dbzProperties, dbzConfiguration, driverClassName, diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java index 41ebb93bbf..1acced0c49 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.UUID; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; import static org.apache.flink.util.Preconditions.checkNotNull; /** Factory for creating {@link SqlServerSourceConfig}. */ @@ -47,6 +48,7 @@ public JdbcSourceConfigFactory schemaList(String... schemaList) { @Override public SqlServerSourceConfig create(int subtask) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); Properties props = new Properties(); props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName()); @@ -98,6 +100,7 @@ public SqlServerSourceConfig create(int subtask) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, props, dbzConfiguration, DRIVER_ClASS_NAME,