Skip to content

Commit

Permalink
[FLINK-35121][common] Adds validation for pipeline definition options
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jun 14, 2024
1 parent c958daf commit 2bd2e4c
Show file tree
Hide file tree
Showing 23 changed files with 876 additions and 45 deletions.
15 changes: 15 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,21 @@ pipeline:
- `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

例如,可以在 YAML 配置文件中这样指定启动模式:

```yaml
source:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
```

## 数据类型映射

Expand Down
15 changes: 15 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,21 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c
specified with binlog filename and position, or a GTID set if GTID is enabled on server.
- `timestamp`: Skip snapshot phase and start reading binlog events from a specific timestamp.

For example in YAML definition:

```yaml
source:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
```

## Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ void testGlobalPipelineConfigParsing() throws Exception {
"--global-config",
globalPipelineConfig());
assertThat(executor.getGlobalPipelineConfig().toMap().get("parallelism")).isEqualTo("1");
assertThat(executor.getGlobalPipelineConfig().toMap().get("foo")).isEqualTo("bar");
assertThat(executor.getGlobalPipelineConfig().toMap().get("schema.change.behavior"))
.isEqualTo("ignore");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ void testOverridingGlobalConfig() throws Exception {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.put("foo", "bar")
.build()));
assertThat(pipelineDef).isEqualTo(fullDefWithGlobalConf);
}
Expand Down Expand Up @@ -224,7 +223,7 @@ void testInvalidTimeZone() throws Exception {
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
.build()));

Expand Down Expand Up @@ -285,9 +284,8 @@ void testInvalidTimeZone() throws Exception {
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
.put("parallelism", "4")
.put("enable-schema-evolution", "false")
.put("schema.change.behavior", "evolve")
.put("schema-operator.rpc-timeout", "1 h")
.put("foo", "bar")
.build()));

private final PipelineDef defWithOptional =
Expand Down Expand Up @@ -327,5 +325,5 @@ void testInvalidTimeZone() throws Exception {
new SinkDef("kafka", null, new Configuration()),
Collections.emptyList(),
Collections.emptyList(),
new Configuration());
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ transform:
pipeline:
name: source-database-sync-pipe
parallelism: 4
enable-schema-evolution: false
schema.change.behavior: evolve
schema-operator.rpc-timeout: 1 h
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ source:

sink:
type: kafka

pipeline:
parallelism: 1
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
################################################################################
parallelism: 1
foo: bar
schema.change.behavior: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;

import static org.apache.flink.cdc.common.configuration.ConfigurationUtils.canBePrefixMap;
Expand Down Expand Up @@ -131,6 +133,17 @@ public <T> Configuration set(ConfigOption<T> option, T value) {
return this;
}

/**
* Returns the keys of all key/value pairs stored inside this configuration object.
*
* @return the keys of all key/value pairs stored inside this configuration object
*/
public Set<String> keySet() {
synchronized (this.confData) {
return new HashSet<>(this.confData.keySet());
}
}

public Map<String, String> toMap() {
synchronized (this.confData) {
Map<String, String> ret = new HashMap<>(this.confData.size());
Expand Down Expand Up @@ -247,6 +260,10 @@ private <T> Optional<T> applyWithOption(
return Optional.empty();
}

public Set<String> getKeys() {
return confData.keySet();
}

@Override
public int hashCode() {
int hash = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,133 @@
package org.apache.flink.cdc.common.factories;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.table.api.ValidationException;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** A helper for working with {@link Factory}. */
@PublicEvolving
public class FactoryHelper {

private final Factory factory;
private final Factory.Context context;

private FactoryHelper(Factory factory, Factory.Context context) {
this.factory = factory;
this.context = context;
}

public static FactoryHelper createFactoryHelper(Factory factory, Factory.Context context) {
return new FactoryHelper(factory, context);
}

/**
* Validates the required and optional {@link ConfigOption}s of a factory.
*
* <p>Note: It does not check for left-over options.
*/
public static void validateFactoryOptions(Factory factory, Configuration configuration) {
validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), configuration);
}

/**
* Validates the required options and optional options.
*
* <p>Note: It does not check for left-over options.
*/
public static void validateFactoryOptions(
Set<ConfigOption<?>> requiredOptions,
Set<ConfigOption<?>> optionalOptions,
Configuration configuration) {
final List<String> missingRequiredOptions =
requiredOptions.stream()
.filter(option -> configuration.get(option) == null)
.map(ConfigOption::key)
.sorted()
.collect(Collectors.toList());

if (!missingRequiredOptions.isEmpty()) {
throw new ValidationException(
String.format(
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "%s",
String.join("\n", missingRequiredOptions)));
}

optionalOptions.forEach(configuration::getOptional);
}

/** Validates unconsumed option keys. */
public static void validateUnconsumedKeys(
String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys) {
final Set<String> remainingOptionKeys = new HashSet<>(allOptionKeys);
remainingOptionKeys.removeAll(consumedOptionKeys);
if (!remainingOptionKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Unsupported options found for '%s'.\n\n"
+ "Unsupported options:\n\n"
+ "%s\n\n"
+ "Supported options:\n\n"
+ "%s",
factoryIdentifier,
remainingOptionKeys.stream().sorted().collect(Collectors.joining("\n")),
String.join("\n", consumedOptionKeys)));
}
}

/** Validates the options of the factory. It checks for unconsumed option keys. */
public void validate() {
Set<String> allOptionKeys =
Stream.concat(
factory.requiredOptions().stream().map(ConfigOption::key),
factory.optionalOptions().stream().map(ConfigOption::key))
.collect(Collectors.toSet());

validateFactoryOptions(factory, context.getFactoryConfiguration());
validateUnconsumedKeys(
factory.identifier(), context.getFactoryConfiguration().getKeys(), allOptionKeys);
}

/**
* Validates the options of the factory. It checks for unconsumed option keys while ignoring the
* options with given prefixes.
*
* <p>The option keys that have given prefix {@code prefixToSkip} would just be skipped for
* validation.
*
* @param prefixesToSkip Set of option key prefixes to skip validation
*/
public void validateExcept(String... prefixesToSkip) {
Preconditions.checkArgument(
prefixesToSkip.length > 0, "Prefixes to skip can not be empty.");

final List<String> prefixesList = Arrays.asList(prefixesToSkip);

Set<String> allOptionKeys =
Stream.concat(
factory.requiredOptions().stream().map(ConfigOption::key),
factory.optionalOptions().stream().map(ConfigOption::key))
.collect(Collectors.toSet());

Set<String> filteredOptionKeys =
context.getFactoryConfiguration().getKeys().stream()
.filter(key -> prefixesList.stream().noneMatch(key::startsWith))
.collect(Collectors.toSet());

validateFactoryOptions(factory, context.getFactoryConfiguration());
validateUnconsumedKeys(factory.identifier(), filteredOptionKeys, allOptionKeys);
}

/** Default implementation of {@link Factory.Context}. */
public static class DefaultContext implements Factory.Context {

Expand Down
Loading

0 comments on commit 2bd2e4c

Please sign in to comment.