diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index b1f4a945ef..9f9465ed71 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -275,6 +275,21 @@ pipeline: - `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。 - `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。 +例如,可以在 YAML 配置文件中这样指定启动模式: + +```yaml +source: + type: mysql + scan.startup.mode: earliest-offset # Start from earliest offset + scan.startup.mode: latest-offset # Start from latest offset + scan.startup.mode: specific-offset # Start from specific offset + scan.startup.mode: timestamp # Start from timestamp + scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode + scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode + scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode + scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode + # ... +``` ## 数据类型映射 diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index b450c4017e..30feb47f74 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -284,6 +284,21 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c specified with binlog filename and position, or a GTID set if GTID is enabled on server. - `timestamp`: Skip snapshot phase and start reading binlog events from a specific timestamp. +For example in YAML definition: + +```yaml +source: + type: mysql + scan.startup.mode: earliest-offset # Start from earliest offset + scan.startup.mode: latest-offset # Start from latest offset + scan.startup.mode: specific-offset # Start from specific offset + scan.startup.mode: timestamp # Start from timestamp + scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode + scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode + scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode + scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode + # ... +``` ## Data Type Mapping diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java index 32e2508712..83ccb1cbec 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java @@ -83,7 +83,8 @@ void testGlobalPipelineConfigParsing() throws Exception { "--global-config", globalPipelineConfig()); assertThat(executor.getGlobalPipelineConfig().toMap().get("parallelism")).isEqualTo("1"); - assertThat(executor.getGlobalPipelineConfig().toMap().get("foo")).isEqualTo("bar"); + assertThat(executor.getGlobalPipelineConfig().toMap().get("schema.change.behavior")) + .isEqualTo("ignore"); } @Test diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 863acbdd35..2d05bcbde3 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -77,7 +77,6 @@ void testOverridingGlobalConfig() throws Exception { Configuration.fromMap( ImmutableMap.builder() .put("parallelism", "1") - .put("foo", "bar") .build())); assertThat(pipelineDef).isEqualTo(fullDefWithGlobalConf); } @@ -224,7 +223,7 @@ void testInvalidTimeZone() throws Exception { ImmutableMap.builder() .put("name", "source-database-sync-pipe") .put("parallelism", "4") - .put("enable-schema-evolution", "false") + .put("schema.change.behavior", "evolve") .put("schema-operator.rpc-timeout", "1 h") .build())); @@ -285,9 +284,8 @@ void testInvalidTimeZone() throws Exception { ImmutableMap.builder() .put("name", "source-database-sync-pipe") .put("parallelism", "4") - .put("enable-schema-evolution", "false") + .put("schema.change.behavior", "evolve") .put("schema-operator.rpc-timeout", "1 h") - .put("foo", "bar") .build())); private final PipelineDef defWithOptional = @@ -327,5 +325,5 @@ void testInvalidTimeZone() throws Exception { new SinkDef("kafka", null, new Configuration()), Collections.emptyList(), Collections.emptyList(), - new Configuration()); + Configuration.fromMap(Collections.singletonMap("parallelism", "1"))); } diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml index e06ad904b9..b92e237d16 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml @@ -55,5 +55,5 @@ transform: pipeline: name: source-database-sync-pipe parallelism: 4 - enable-schema-evolution: false + schema.change.behavior: evolve schema-operator.rpc-timeout: 1 h diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml index 1ebeede7a5..20808e4728 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml @@ -19,3 +19,6 @@ source: sink: type: kafka + +pipeline: + parallelism: 1 \ No newline at end of file diff --git a/flink-cdc-cli/src/test/resources/global-config/global-config.yaml b/flink-cdc-cli/src/test/resources/global-config/global-config.yaml index 6c7928f903..86d9867b5f 100644 --- a/flink-cdc-cli/src/test/resources/global-config/global-config.yaml +++ b/flink-cdc-cli/src/test/resources/global-config/global-config.yaml @@ -15,4 +15,4 @@ # limitations under the License. ################################################################################ parallelism: 1 -foo: bar +schema.change.behavior: ignore \ No newline at end of file diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/Configuration.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/Configuration.java index e5ce2f2d5e..c1928191e1 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/Configuration.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/Configuration.java @@ -22,8 +22,10 @@ import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.BiFunction; import static org.apache.flink.cdc.common.configuration.ConfigurationUtils.canBePrefixMap; @@ -131,6 +133,17 @@ public Configuration set(ConfigOption option, T value) { return this; } + /** + * Returns the keys of all key/value pairs stored inside this configuration object. + * + * @return the keys of all key/value pairs stored inside this configuration object + */ + public Set keySet() { + synchronized (this.confData) { + return new HashSet<>(this.confData.keySet()); + } + } + public Map toMap() { synchronized (this.confData) { Map ret = new HashMap<>(this.confData.size()); @@ -247,6 +260,10 @@ private Optional applyWithOption( return Optional.empty(); } + public Set getKeys() { + return confData.keySet(); + } + @Override public int hashCode() { int hash = 0; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java index a3a9ae1ac5..43633382d5 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java @@ -18,12 +18,133 @@ package org.apache.flink.cdc.common.factories; import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.table.api.ValidationException; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** A helper for working with {@link Factory}. */ @PublicEvolving public class FactoryHelper { + private final Factory factory; + private final Factory.Context context; + + private FactoryHelper(Factory factory, Factory.Context context) { + this.factory = factory; + this.context = context; + } + + public static FactoryHelper createFactoryHelper(Factory factory, Factory.Context context) { + return new FactoryHelper(factory, context); + } + + /** + * Validates the required and optional {@link ConfigOption}s of a factory. + * + *

Note: It does not check for left-over options. + */ + public static void validateFactoryOptions(Factory factory, Configuration configuration) { + validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), configuration); + } + + /** + * Validates the required options and optional options. + * + *

Note: It does not check for left-over options. + */ + public static void validateFactoryOptions( + Set> requiredOptions, + Set> optionalOptions, + Configuration configuration) { + final List missingRequiredOptions = + requiredOptions.stream() + .filter(option -> configuration.get(option) == null) + .map(ConfigOption::key) + .sorted() + .collect(Collectors.toList()); + + if (!missingRequiredOptions.isEmpty()) { + throw new ValidationException( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + String.join("\n", missingRequiredOptions))); + } + + optionalOptions.forEach(configuration::getOptional); + } + + /** Validates unconsumed option keys. */ + public static void validateUnconsumedKeys( + String factoryIdentifier, Set allOptionKeys, Set consumedOptionKeys) { + final Set remainingOptionKeys = new HashSet<>(allOptionKeys); + remainingOptionKeys.removeAll(consumedOptionKeys); + if (!remainingOptionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Unsupported options found for '%s'.\n\n" + + "Unsupported options:\n\n" + + "%s\n\n" + + "Supported options:\n\n" + + "%s", + factoryIdentifier, + remainingOptionKeys.stream().sorted().collect(Collectors.joining("\n")), + String.join("\n", consumedOptionKeys))); + } + } + + /** Validates the options of the factory. It checks for unconsumed option keys. */ + public void validate() { + Set allOptionKeys = + Stream.concat( + factory.requiredOptions().stream().map(ConfigOption::key), + factory.optionalOptions().stream().map(ConfigOption::key)) + .collect(Collectors.toSet()); + + validateFactoryOptions(factory, context.getFactoryConfiguration()); + validateUnconsumedKeys( + factory.identifier(), context.getFactoryConfiguration().getKeys(), allOptionKeys); + } + + /** + * Validates the options of the factory. It checks for unconsumed option keys while ignoring the + * options with given prefixes. + * + *

The option keys that have given prefix {@code prefixToSkip} would just be skipped for + * validation. + * + * @param prefixesToSkip Set of option key prefixes to skip validation + */ + public void validateExcept(String... prefixesToSkip) { + Preconditions.checkArgument( + prefixesToSkip.length > 0, "Prefixes to skip can not be empty."); + + final List prefixesList = Arrays.asList(prefixesToSkip); + + Set allOptionKeys = + Stream.concat( + factory.requiredOptions().stream().map(ConfigOption::key), + factory.optionalOptions().stream().map(ConfigOption::key)) + .collect(Collectors.toSet()); + + Set filteredOptionKeys = + context.getFactoryConfiguration().getKeys().stream() + .filter(key -> prefixesList.stream().noneMatch(key::startsWith)) + .collect(Collectors.toSet()); + + validateFactoryOptions(factory, context.getFactoryConfiguration()); + validateUnconsumedKeys(factory.identifier(), filteredOptionKeys, allOptionKeys); + } + /** Default implementation of {@link Factory.Context}. */ public static class DefaultContext implements Factory.Context { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java new file mode 100644 index 0000000000..861684c437 --- /dev/null +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/factories/FactoryHelperTests.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.factories; + +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** Tests for {@link FactoryHelper}. */ +public class FactoryHelperTests { + + private Factory getDummyFactory() { + + return new Factory() { + @Override + public String identifier() { + return "dummy"; + } + + @Override + public Set> requiredOptions() { + return Sets.newHashSet( + ConfigOptions.key("id").intType().noDefaultValue(), + ConfigOptions.key("name").stringType().noDefaultValue(), + ConfigOptions.key("age").doubleType().noDefaultValue()); + } + + @Override + public Set> optionalOptions() { + return Sets.newHashSet( + ConfigOptions.key("hobby").stringType().noDefaultValue(), + ConfigOptions.key("location").stringType().defaultValue("Everywhere"), + ConfigOptions.key("misc") + .mapType() + .defaultValue(Collections.singletonMap("A", "Z"))); + } + }; + } + + @Test + void testCorrectConfigValidation() { + // This is a valid configuration. + Map configurations = new HashMap<>(); + configurations.put("id", "1"); + configurations.put("name", "Alice"); + configurations.put("age", "17"); + configurations.put("location", "Here"); + + FactoryHelper factoryHelper = + FactoryHelper.createFactoryHelper( + getDummyFactory(), + new FactoryHelper.DefaultContext( + Configuration.fromMap(configurations), null, null)); + + factoryHelper.validate(); + } + + @Test + void testMissingRequiredOptionConfigValidation() { + // This configuration doesn't provide all required options. + Map configurations = new HashMap<>(); + configurations.put("id", "1"); + configurations.put("age", "17"); + configurations.put("location", "Here"); + + FactoryHelper factoryHelper = + FactoryHelper.createFactoryHelper( + getDummyFactory(), + new FactoryHelper.DefaultContext( + Configuration.fromMap(configurations), null, null)); + + Assertions.assertThatThrownBy(factoryHelper::validate) + .isExactlyInstanceOf(ValidationException.class) + .hasMessageContaining("One or more required options are missing."); + } + + @Test + void testIncompatibleTypeValidation() { + // This configuration has an option with mismatched type. + Map configurations = new HashMap<>(); + configurations.put("id", "1"); + configurations.put("name", "Alice"); + configurations.put("age", "Not a number"); + configurations.put("location", "Here"); + + FactoryHelper factoryHelper = + FactoryHelper.createFactoryHelper( + getDummyFactory(), + new FactoryHelper.DefaultContext( + Configuration.fromMap(configurations), null, null)); + + Assertions.assertThatThrownBy(factoryHelper::validate) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Could not parse value 'Not a number' for key 'age'."); + } + + @Test + void testRedundantConfigValidation() { + // This configuration has redundant config options. + Map configurations = new HashMap<>(); + configurations.put("id", "1"); + configurations.put("name", "Alice"); + configurations.put("age", "17"); + configurations.put("what", "Not a valid configOption"); + + FactoryHelper factoryHelper = + FactoryHelper.createFactoryHelper( + getDummyFactory(), + new FactoryHelper.DefaultContext( + Configuration.fromMap(configurations), null, null)); + + Assertions.assertThatThrownBy(factoryHelper::validate) + .isExactlyInstanceOf(ValidationException.class) + .hasMessageContaining("Unsupported options found for 'dummy'."); + } + + @Test + void testAllowedPrefixConfigValidation() { + // This configuration has allowed prefix options. + Map configurations = new HashMap<>(); + configurations.put("id", "1"); + configurations.put("name", "Alice"); + configurations.put("age", "17"); + configurations.put("debezium.foo", "Some debezium options"); + configurations.put("debezium.bar", "Another debezium options"); + configurations.put("canal.baz", "Yet another debezium options"); + + FactoryHelper factoryHelper = + FactoryHelper.createFactoryHelper( + getDummyFactory(), + new FactoryHelper.DefaultContext( + Configuration.fromMap(configurations), null, null)); + + Assertions.assertThatThrownBy(factoryHelper::validate) + .isExactlyInstanceOf(ValidationException.class) + .hasMessageContaining("Unsupported options found for 'dummy'."); + + Assertions.assertThatThrownBy(() -> factoryHelper.validateExcept("debezium.")) + .isExactlyInstanceOf(ValidationException.class) + .hasMessageContaining("Unsupported options found for 'dummy'."); + + Assertions.assertThatThrownBy(() -> factoryHelper.validateExcept("canal.")) + .isExactlyInstanceOf(ValidationException.class) + .hasMessageContaining("Unsupported options found for 'dummy'."); + + factoryHelper.validateExcept("debezium.", "canal."); + } +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java index 49e6a4a0c6..27d2bb83ec 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java @@ -18,15 +18,21 @@ package org.apache.flink.cdc.composer.definition; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.table.api.ValidationException; import java.time.ZoneId; +import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.TimeZone; +import java.util.stream.Collectors; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; @@ -67,6 +73,8 @@ public PipelineDef( this.routes = routes; this.transforms = transforms; this.config = evaluatePipelineTimeZone(config); + + validatePipelineDefinition(this.config); } public SourceDef getSource() { @@ -130,6 +138,43 @@ public int hashCode() { // Utilities // ------------------------------------------------------------------------ + @VisibleForTesting + public static void validatePipelineDefinition(Configuration configuration) + throws ValidationException { + List> options = + Arrays.asList( + PipelineOptions.PIPELINE_NAME, + PipelineOptions.PIPELINE_PARALLELISM, + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, + PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, + PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID, + PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT); + + Set optionKeys = + options.stream().map(ConfigOption::key).collect(Collectors.toSet()); + + configuration + .getKeys() + .forEach( + key -> { + if (!optionKeys.contains(key)) { + throw new ValidationException( + String.format("Unknown configuration key `%s`", key)); + } + }); + + options.forEach( + option -> { + if (!configuration.getOptional(option).isPresent() + && !option.hasDefaultValue()) { + throw new ValidationException( + String.format( + "Configuration key `%s` is not specified, and no default value available.", + option.key())); + } + }); + } + /** * Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP * WITH LOCAL TIME ZONE}. diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/definition/PipelineValidationTest.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/definition/PipelineValidationTest.java new file mode 100644 index 0000000000..f4a07d0311 --- /dev/null +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/definition/PipelineValidationTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.definition; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** Tests for {@link PipelineDef} validation. */ +public class PipelineValidationTest { + + @Test + void testNormalConfigValidation() { + // A common configuration file + Map configurations = new HashMap<>(); + + configurations.put("parallelism", "1"); + configurations.put("name", "Pipeline Job"); + + PipelineDef.validatePipelineDefinition(Configuration.fromMap(configurations)); + } + + @Test + void testTypeMismatchValidation() { + Map configurations = new HashMap<>(); + + // option value with mismatched type. + configurations.put("parallelism", "Not a Number"); + configurations.put("name", "Pipeline Job"); + + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> PipelineDef.validatePipelineDefinition(Configuration.fromMap(configurations)), + "Could not parse value 'Not a Number' for key 'parallelism'."); + } + + @Test + void testEmptyConfigValidation() { + + // An empty configuration should fail + Map configurations = new HashMap<>(); + + Assertions.assertThrowsExactly( + ValidationException.class, + () -> + PipelineDef.validatePipelineDefinition( + Configuration.fromMap(configurations))); + } + + @Test + void testUnknownConfigValidation() { + // An empty configuration should fail + Map configurations = new HashMap<>(); + + configurations.put("parallelism", "1"); + configurations.put("name", "Pipeline Job"); + configurations.put("unknown", "optionValue"); + + Assertions.assertThrowsExactly( + ValidationException.class, + () -> + PipelineDef.validatePipelineDefinition( + Configuration.fromMap(configurations))); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java index 8bd36f4394..d0567ab03d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.connectors.doris.sink.DorisDataSink; @@ -56,6 +57,7 @@ import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_MAX_RETRIES; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_USE_CACHE; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX; +import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; /** A dummy {@link DataSinkFactory} to create {@link DorisDataSink}. */ @@ -63,6 +65,9 @@ public class DorisDataSinkFactory implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context) + .validateExcept(TABLE_CREATE_PROPERTIES_PREFIX, STREAM_LOAD_PROP_PREFIX); + Configuration config = context.getFactoryConfiguration(); DorisOptions.Builder optionsBuilder = DorisOptions.builder(); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); @@ -134,11 +139,9 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { Set> options = new HashSet<>(); - options.add(FENODES); options.add(BENODES); - options.add(USERNAME); - options.add(PASSWORD); options.add(JDBC_URL); + options.add(PASSWORD); options.add(AUTO_REDIRECT); options.add(SINK_CHECK_INTERVAL); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java index f993d63258..243991d1ae 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.connectors.kafka.json.ChangeLogJsonFormatFactory; @@ -37,6 +38,10 @@ import java.util.Set; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT; /** A dummy {@link DataSinkFactory} to create {@link KafkaDataSink}. */ public class KafkaDataSinkFactory implements DataSinkFactory { @@ -45,6 +50,8 @@ public class KafkaDataSinkFactory implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context).validateExcept(PROPERTIES_PREFIX); + Configuration configuration = Configuration.fromMap(context.getFactoryConfiguration().toMap()); DeliveryGuarantee deliveryGuarantee = @@ -97,15 +104,17 @@ public String identifier() { @Override public Set> requiredOptions() { - return null; + return new HashSet<>(); } @Override public Set> optionalOptions() { Set> options = new HashSet<>(); + options.add(VALUE_FORMAT); + options.add(TOPIC); + options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED); + options.add(SINK_CUSTOM_HEADER); options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE); - options.add(KafkaDataSinkOptions.TOPIC); - options.add(KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java index 1f1957beab..f0736dd0b6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java @@ -22,26 +22,72 @@ import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; -import org.junit.jupiter.api.Assertions; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; /** Tests for {@link KafkaDataSinkFactory}. */ public class KafkaDataSinkFactoryTest { @Test - public void testCreateDataSink() { + void testCreateDataSink() { DataSinkFactory sinkFactory = FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class); - Assertions.assertTrue(sinkFactory instanceof KafkaDataSinkFactory); + Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class); Configuration conf = Configuration.fromMap(ImmutableMap.builder().build()); DataSink dataSink = sinkFactory.createDataSink( new FactoryHelper.DefaultContext( conf, conf, Thread.currentThread().getContextClassLoader())); - Assertions.assertTrue(dataSink instanceof KafkaDataSink); + Assertions.assertThat(dataSink).isInstanceOf(KafkaDataSink.class); + } + + @Test + void testUnsupportedOption() { + + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.builder() + .put("unsupported_key", "unsupported_value") + .build()); + + Assertions.assertThatThrownBy( + () -> + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, + conf, + Thread.currentThread().getContextClassLoader()))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Unsupported options found for 'kafka'.\n\n" + + "Unsupported options:\n\n" + + "unsupported_key"); + } + + @Test + void testPrefixRequireOption() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("kafka", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.builder() + .put("properties.compression.type", "none") + .build()); + DataSink dataSink = + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, conf, Thread.currentThread().getContextClassLoader())); + Assertions.assertThat(dataSink).isInstanceOf(KafkaDataSink.class); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index dc9972fe0f..b1d3e5966f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.factories.DataSourceFactory; import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource; @@ -75,7 +76,9 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; +import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; +import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.getJdbcProperties; import static org.apache.flink.util.Preconditions.checkState; @@ -89,6 +92,9 @@ public class MySqlDataSourceFactory implements DataSourceFactory { @Override public DataSource createDataSource(Context context) { + FactoryHelper.createFactoryHelper(this, context) + .validateExcept(PROPERTIES_PREFIX, DEBEZIUM_OPTIONS_PREFIX); + final Configuration config = context.getFactoryConfiguration(); String hostname = config.get(HOSTNAME); int port = config.get(PORT); @@ -192,26 +198,28 @@ public Set> requiredOptions() { public Set> optionalOptions() { Set> options = new HashSet<>(); options.add(PORT); - options.add(SERVER_TIME_ZONE); + options.add(TABLES_EXCLUDE); + options.add(SCHEMA_CHANGE_ENABLED); options.add(SERVER_ID); + options.add(SERVER_TIME_ZONE); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + options.add(SCAN_SNAPSHOT_FETCH_SIZE); options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS); options.add(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET); options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS); options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS); - options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); - options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); - options.add(CHUNK_META_GROUP_SIZE); - options.add(SCAN_SNAPSHOT_FETCH_SIZE); options.add(CONNECT_TIMEOUT); + options.add(CONNECT_MAX_RETRIES); options.add(CONNECTION_POOL_SIZE); + options.add(HEARTBEAT_INTERVAL); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + + options.add(CHUNK_META_GROUP_SIZE); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); - options.add(CONNECT_MAX_RETRIES); - options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); - options.add(HEARTBEAT_INTERVAL); - options.add(SCHEMA_CHANGE_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 68ea425a75..b1aab84b09 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -17,10 +17,12 @@ package org.apache.flink.cdc.connectors.mysql.source; +import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.factories.Factory; import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.table.api.ValidationException; import org.junit.Test; @@ -29,7 +31,9 @@ import java.sql.Statement; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; @@ -165,6 +169,76 @@ public void testDatabaseAndTableWithTheSameName() throws SQLException { + inventoryDatabase.getDatabaseName())); } + @Test + public void testLackRequireOption() { + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + List requireKeys = + factory.requiredOptions().stream() + .map(ConfigOption::key) + .collect(Collectors.toList()); + for (String requireKey : requireKeys) { + Map remainingOptions = new HashMap<>(options); + remainingOptions.remove(requireKey); + Factory.Context context = new MockContext(Configuration.fromMap(remainingOptions)); + + assertThatThrownBy(() -> factory.createDataSource(context)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + requireKey)); + } + } + + @Test + public void testUnsupportedOption() { + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + options.put("unsupported_key", "unsupported_value"); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + assertThatThrownBy(() -> factory.createDataSource(context)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Unsupported options found for 'mysql'.\n\n" + + "Unsupported options:\n\n" + + "unsupported_key"); + } + + @Test + public void testPrefixRequireOption() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); + options.put("jdbc.properties.requireSSL", "true"); + options.put("debezium.snapshot.mode", "initial"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + assertThat(dataSource.getSourceConfig().getTableList()) + .isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".products")); + } + class MockContext implements Factory.Context { Configuration factoryConfiguration; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java index 3927fbc734..cf207325d7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.common.utils.Preconditions; @@ -40,6 +41,9 @@ import java.util.Objects; import java.util.Set; +import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES; +import static org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES; + /** A {@link DataSinkFactory} to create {@link PaimonDataSink}. */ public class PaimonDataSinkFactory implements DataSinkFactory { @@ -47,16 +51,16 @@ public class PaimonDataSinkFactory implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context) + .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); + Map allOptions = context.getFactoryConfiguration().toMap(); Map catalogOptions = new HashMap<>(); Map tableOptions = new HashMap<>(); allOptions.forEach( (key, value) -> { - if (key.startsWith(PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES)) { - tableOptions.put( - key.substring( - PaimonDataSinkOptions.PREFIX_TABLE_PROPERTIES.length()), - value); + if (key.startsWith(PREFIX_TABLE_PROPERTIES)) { + tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value); } else if (key.startsWith(PaimonDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) { catalogOptions.put( key.substring( @@ -118,9 +122,10 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { Set> options = new HashSet<>(); - options.add(PaimonDataSinkOptions.URI); options.add(PaimonDataSinkOptions.WAREHOUSE); + options.add(PaimonDataSinkOptions.URI); options.add(PaimonDataSinkOptions.COMMIT_USER); + options.add(PaimonDataSinkOptions.PARTITION_KEY); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java index 018920d091..5f2712a93f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java @@ -47,7 +47,7 @@ public class PaimonDataSinkOptions { public static final ConfigOption METASTORE = key("catalog.properties.metastore") .stringType() - .defaultValue("filesystem") + .noDefaultValue() .withDescription("Metastore of paimon catalog, supports filesystem and hive."); public static final ConfigOption URI = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java index f2ab4a51bb..2a924e4f15 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactoryTest.java @@ -17,20 +17,26 @@ package org.apache.flink.cdc.connectors.paimon.sink; +import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.factories.DataSinkFactory; import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; -import org.junit.jupiter.api.Assertions; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; /** Tests for {@link PaimonDataSinkFactory}. */ public class PaimonDataSinkFactoryTest { @@ -38,10 +44,10 @@ public class PaimonDataSinkFactoryTest { @TempDir public static java.nio.file.Path temporaryFolder; @Test - public void testCreateDataSink() { + void testCreateDataSink() { DataSinkFactory sinkFactory = FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class); - Assertions.assertInstanceOf(PaimonDataSinkFactory.class, sinkFactory); + Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class); Configuration conf = Configuration.fromMap( @@ -58,6 +64,104 @@ public void testCreateDataSink() { sinkFactory.createDataSink( new FactoryHelper.DefaultContext( conf, conf, Thread.currentThread().getContextClassLoader())); - Assertions.assertInstanceOf(PaimonDataSink.class, dataSink); + Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class); + } + + @Test + void testLackRequireOption() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class); + + Map options = new HashMap<>(); + options.put(PaimonDataSinkOptions.METASTORE.key(), "filesystem"); + options.put( + PaimonDataSinkOptions.WAREHOUSE.key(), + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString()); + + List requireKeys = + sinkFactory.requiredOptions().stream() + .map(ConfigOption::key) + .collect(Collectors.toList()); + for (String requireKey : requireKeys) { + Map remainingOptions = new HashMap<>(options); + remainingOptions.remove(requireKey); + Configuration conf = Configuration.fromMap(remainingOptions); + + Assertions.assertThatThrownBy( + () -> + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, + conf, + Thread.currentThread() + .getContextClassLoader()))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + requireKey)); + } + } + + @Test + void testUnsupportedOption() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.builder() + .put(PaimonDataSinkOptions.METASTORE.key(), "filesystem") + .put( + PaimonDataSinkOptions.WAREHOUSE.key(), + new File( + temporaryFolder.toFile(), + UUID.randomUUID().toString()) + .toString()) + .put("unsupported_key", "unsupported_value") + .build()); + + Assertions.assertThatThrownBy( + () -> + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, + conf, + Thread.currentThread().getContextClassLoader()))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Unsupported options found for 'paimon'.\n\n" + + "Unsupported options:\n\n" + + "unsupported_key"); + } + + @Test + void testPrefixRequireOption() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class); + Configuration conf = + Configuration.fromMap( + ImmutableMap.builder() + .put(PaimonDataSinkOptions.METASTORE.key(), "filesystem") + .put( + PaimonDataSinkOptions.WAREHOUSE.key(), + new File( + temporaryFolder.toFile(), + UUID.randomUUID().toString()) + .toString()) + .put("catalog.properties.uri", "") + .put("table.properties.bucket", "2") + .build()); + + DataSink dataSink = + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, conf, Thread.currentThread().getContextClassLoader())); + Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java index f78659bbec..d1995fee66 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.factories.DataSinkFactory; +import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.sink.DataSink; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; @@ -31,6 +32,8 @@ import java.util.stream.Collectors; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.SINK_PROPERTIES_PREFIX; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX; /** A {@link DataSinkFactory} to create {@link StarRocksDataSink}. */ public class StarRocksDataSinkFactory implements DataSinkFactory { @@ -39,6 +42,9 @@ public class StarRocksDataSinkFactory implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context) + .validateExcept(TABLE_CREATE_PROPERTIES_PREFIX, SINK_PROPERTIES_PREFIX); + StarRocksSinkOptions sinkOptions = buildSinkConnectorOptions(context.getFactoryConfiguration()); TableCreateConfig tableCreateConfig = @@ -124,8 +130,7 @@ private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) sinkConfig.set(StarRocksSinkOptions.SINK_SEMANTIC, "at-least-once"); Map streamProperties = - getPrefixConfigs( - cdcConfig.toMap(), StarRocksDataSinkOptions.SINK_PROPERTIES_PREFIX); + getPrefixConfigs(cdcConfig.toMap(), SINK_PROPERTIES_PREFIX); // force to use json format for stream load to simplify the configuration, // such as there is no need to reconfigure the "columns" property after // schema change. csv format can be supported in the future if needed diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java index 8406ec772a..7f498beb81 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactoryTest.java @@ -17,26 +17,32 @@ package org.apache.flink.cdc.connectors.starrocks.sink; +import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.factories.DataSinkFactory; import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; -import org.junit.Test; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.assertTrue; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** Tests for {@link org.apache.flink.cdc.connectors.starrocks.sink.StarRocksDataSinkFactory}. */ public class StarRocksDataSinkFactoryTest { @Test - public void testCreateDataSink() { + void testCreateDataSink() { DataSinkFactory sinkFactory = FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class); - assertTrue(sinkFactory instanceof StarRocksDataSinkFactory); + Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class); Configuration conf = Configuration.fromMap( @@ -50,6 +56,98 @@ public void testCreateDataSink() { sinkFactory.createDataSink( new FactoryHelper.DefaultContext( conf, conf, Thread.currentThread().getContextClassLoader())); - assertTrue(dataSink instanceof StarRocksDataSink); + Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class); + } + + @Test + void testLackRequireOption() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class); + + Map options = new HashMap<>(); + options.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030"); + options.put("load-url", "127.0.0.1:8030"); + options.put("username", "root"); + options.put("password", ""); + + List requireKeys = + sinkFactory.requiredOptions().stream() + .map(ConfigOption::key) + .collect(Collectors.toList()); + for (String requireKey : requireKeys) { + Map remainingOptions = new HashMap<>(options); + remainingOptions.remove(requireKey); + Configuration conf = Configuration.fromMap(remainingOptions); + + Assertions.assertThatThrownBy( + () -> + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, + conf, + Thread.currentThread() + .getContextClassLoader()))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + requireKey)); + } + } + + @Test + void testUnsupportedOption() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.builder() + .put("jdbc-url", "jdbc:mysql://127.0.0.1:9030") + .put("load-url", "127.0.0.1:8030") + .put("username", "root") + .put("password", "") + .put("unsupported_key", "unsupported_value") + .build()); + + Assertions.assertThatThrownBy( + () -> + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, + conf, + Thread.currentThread().getContextClassLoader()))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Unsupported options found for 'starrocks'.\n\n" + + "Unsupported options:\n\n" + + "unsupported_key"); + } + + @Test + void testPrefixRequireOption() { + DataSinkFactory sinkFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier("starrocks", DataSinkFactory.class); + Assertions.assertThat(sinkFactory).isInstanceOf(StarRocksDataSinkFactory.class); + + Configuration conf = + Configuration.fromMap( + ImmutableMap.builder() + .put("jdbc-url", "jdbc:mysql://127.0.0.1:9030") + .put("load-url", "127.0.0.1:8030") + .put("username", "root") + .put("password", "") + .put("table.create.properties.replication_num", "1") + .put("sink.properties.format", "json") + .build()); + DataSink dataSink = + sinkFactory.createDataSink( + new FactoryHelper.DefaultContext( + conf, conf, Thread.currentThread().getContextClassLoader())); + Assertions.assertThat(dataSink).isInstanceOf(StarRocksDataSink.class); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java index ee8411d2ba..1b5983f1da 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.factories.DataSinkFactory; import org.apache.flink.cdc.common.factories.DataSourceFactory; import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink; @@ -41,6 +42,7 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory { @Override public DataSource createDataSource(Context context) { + FactoryHelper.createFactoryHelper(this, context).validate(); ValuesDataSourceHelper.EventSetId eventType = context.getFactoryConfiguration().get(ValuesDataSourceOptions.EVENT_SET_ID); int failAtPos = @@ -51,6 +53,7 @@ public DataSource createDataSource(Context context) { @Override public DataSink createDataSink(Context context) { + FactoryHelper.createFactoryHelper(this, context).validate(); return new ValuesDataSink( context.getFactoryConfiguration().get(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY), context.getFactoryConfiguration().get(ValuesDataSinkOptions.PRINT_ENABLED), @@ -73,6 +76,8 @@ public Set> optionalOptions() { options.add(ValuesDataSourceOptions.EVENT_SET_ID); options.add(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX); options.add(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY); + options.add(ValuesDataSinkOptions.PRINT_ENABLED); + options.add(ValuesDataSinkOptions.SINK_API); return options; } }