Skip to content

Commit 38f1903

Browse files
authored
[Feature][API] Add options check before create source and sink and transform in FactoryUtil (#4424)
1 parent 9ce220b commit 38f1903

File tree

10 files changed

+69
-75
lines changed

10 files changed

+69
-75
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.api.configuration.Option;
2222
import org.apache.seatunnel.api.configuration.Options;
2323
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
24+
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
2425
import org.apache.seatunnel.api.configuration.util.OptionRule;
2526
import org.apache.seatunnel.api.sink.DataSaveMode;
2627
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -114,6 +115,7 @@ SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
114115
ReadonlyConfig options,
115116
ClassLoader classLoader) {
116117
TableFactoryContext context = new TableFactoryContext(acceptedTables, options, classLoader);
118+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
117119
TableSource<T, SplitT, StateT> tableSource = factory.createSource(context);
118120
validateAndApplyMetadata(acceptedTables, tableSource);
119121
return tableSource.createSource();
@@ -136,6 +138,7 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
136138
TableFactoryContext context =
137139
new TableFactoryContext(
138140
Collections.singletonList(catalogTable), options, classLoader);
141+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
139142
return factory.createSink(context).createSink();
140143
} catch (Throwable t) {
141144
throw new FactoryException(
@@ -321,6 +324,7 @@ public static SeaTunnelTransform<?> createAndPrepareTransform(
321324
TableFactoryContext context =
322325
new TableFactoryContext(
323326
Collections.singletonList(catalogTable), options, classLoader);
327+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
324328
return factory.createTransform(context).createTransform();
325329
}
326330
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,11 @@ public class Config {
164164
"The processing method of data format error. The default value is fail, and the optional value is (fail, skip). "
165165
+ "When fail is selected, data format error will block and an exception will be thrown. "
166166
+ "When skip is selected, data format error will skip this line data.");
167+
168+
public static final Option<KafkaSemantics> SEMANTICS =
169+
Options.key("semantics")
170+
.enumType(KafkaSemantics.class)
171+
.defaultValue(KafkaSemantics.NON)
172+
.withDescription(
173+
"Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
167174
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
23+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
24+
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
2425
import org.apache.seatunnel.api.serialization.DefaultSerializer;
2526
import org.apache.seatunnel.api.serialization.Serializer;
2627
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -29,60 +30,40 @@
2930
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3031
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3132
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
32-
import org.apache.seatunnel.common.config.CheckConfigUtil;
33-
import org.apache.seatunnel.common.config.CheckResult;
34-
import org.apache.seatunnel.common.constants.PluginType;
35-
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
3633
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
3734
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
3835
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
3936

4037
import com.google.auto.service.AutoService;
38+
import lombok.NoArgsConstructor;
4139

4240
import java.util.Collections;
4341
import java.util.List;
4442
import java.util.Optional;
4543

46-
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
47-
4844
/**
4945
* Kafka Sink implementation by using SeaTunnel sink API. This class contains the method to create
5046
* {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
5147
*/
5248
@AutoService(SeaTunnelSink.class)
49+
@NoArgsConstructor
5350
public class KafkaSink
5451
implements SeaTunnelSink<
5552
SeaTunnelRow, KafkaSinkState, KafkaCommitInfo, KafkaAggregatedCommitInfo> {
5653

57-
private Config pluginConfig;
54+
private ReadonlyConfig pluginConfig;
5855
private SeaTunnelRowType seaTunnelRowType;
5956

60-
public KafkaSink() {}
61-
62-
public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
63-
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
64-
if (!result.isSuccess()) {
65-
throw new KafkaConnectorException(
66-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
67-
String.format(
68-
"PluginName: %s, PluginType: %s, Message: %s",
69-
getPluginName(), PluginType.SINK, result.getMsg()));
70-
}
57+
public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
7158
this.pluginConfig = pluginConfig;
7259
this.seaTunnelRowType = rowType;
7360
}
7461

7562
@Override
7663
public void prepare(Config pluginConfig) throws PrepareFailException {
77-
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
78-
if (!result.isSuccess()) {
79-
throw new KafkaConnectorException(
80-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
81-
String.format(
82-
"PluginName: %s, PluginType: %s, Message: %s",
83-
getPluginName(), PluginType.SINK, result.getMsg()));
84-
}
85-
this.pluginConfig = pluginConfig;
64+
ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
65+
.validate(new KafkaSinkFactory().optionRule());
66+
this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
8667
}
8768

8869
@Override

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2221
import org.apache.seatunnel.api.sink.SinkCommitter;
2322
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
2423

@@ -33,11 +32,11 @@
3332
@Slf4j
3433
public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
3534

36-
private final Config pluginConfig;
35+
private final ReadonlyConfig pluginConfig;
3736

3837
private KafkaInternalProducer<?, ?> kafkaProducer;
3938

40-
public KafkaSinkCommitter(Config pluginConfig) {
39+
public KafkaSinkCommitter(ReadonlyConfig pluginConfig) {
4140
this.pluginConfig = pluginConfig;
4241
}
4342

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
21-
2220
import org.apache.seatunnel.api.configuration.util.OptionRule;
2321
import org.apache.seatunnel.api.table.connector.TableSink;
2422
import org.apache.seatunnel.api.table.factory.Factory;
@@ -47,7 +45,11 @@ public OptionRule optionRule() {
4745
Arrays.asList(
4846
MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
4947
Config.TOPIC)
50-
.optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
48+
.optional(
49+
Config.KAFKA_CONFIG,
50+
Config.ASSIGN_PARTITIONS,
51+
Config.TRANSACTION_PREFIX,
52+
Config.SEMANTICS)
5153
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
5254
.build();
5355
}
@@ -56,7 +58,7 @@ public OptionRule optionRule() {
5658
public TableSink createSink(TableFactoryContext context) {
5759
return () ->
5860
new KafkaSink(
59-
ConfigFactory.parseMap(context.getOptions().toMap()),
61+
context.getOptions(),
6062
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
6163
}
6264
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321
import org.apache.seatunnel.api.sink.SinkWriter;
2422
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2523
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
26-
import org.apache.seatunnel.common.config.CheckConfigUtil;
2724
import org.apache.seatunnel.common.exception.CommonErrorCode;
2825
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
2926
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
@@ -33,6 +30,7 @@
3330
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
3431
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
3532

33+
import org.apache.commons.collections4.CollectionUtils;
3634
import org.apache.kafka.clients.producer.ProducerConfig;
3735
import org.apache.kafka.clients.producer.ProducerRecord;
3836
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -45,12 +43,14 @@
4543
import java.util.Random;
4644

4745
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
46+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
4847
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
4948
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
5049
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
5150
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
5251
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
5352
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
53+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SEMANTICS;
5454
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
5555
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
5656

@@ -71,20 +71,22 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
7171
public KafkaSinkWriter(
7272
SinkWriter.Context context,
7373
SeaTunnelRowType seaTunnelRowType,
74-
Config pluginConfig,
74+
ReadonlyConfig pluginConfig,
7575
List<KafkaSinkState> kafkaStates) {
7676
this.context = context;
7777
this.seaTunnelRowType = seaTunnelRowType;
78-
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
79-
MessageContentPartitioner.setAssignPartitions(
80-
pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
78+
if (pluginConfig.get(ASSIGN_PARTITIONS) != null
79+
&& !CollectionUtils.isEmpty(pluginConfig.get(ASSIGN_PARTITIONS))) {
80+
MessageContentPartitioner.setAssignPartitions(pluginConfig.get(ASSIGN_PARTITIONS));
8181
}
82-
if (pluginConfig.hasPath(TRANSACTION_PREFIX.key())) {
83-
this.transactionPrefix = pluginConfig.getString(TRANSACTION_PREFIX.key());
82+
83+
if (pluginConfig.get(TRANSACTION_PREFIX) != null) {
84+
this.transactionPrefix = pluginConfig.get(TRANSACTION_PREFIX);
8485
} else {
8586
Random random = new Random();
8687
this.transactionPrefix = String.format("SeaTunnel%04d", random.nextInt(PREFIX_RANGE));
8788
}
89+
8890
restoreState(kafkaStates);
8991
this.seaTunnelRowSerializer = getSerializer(pluginConfig, seaTunnelRowType);
9092
if (KafkaSemantics.EXACTLY_ONCE.equals(getKafkaSemantics(pluginConfig))) {
@@ -141,21 +143,20 @@ public void close() {
141143
}
142144
}
143145

144-
private Properties getKafkaProperties(Config pluginConfig) {
146+
private Properties getKafkaProperties(ReadonlyConfig pluginConfig) {
145147
Properties kafkaProperties = new Properties();
146-
if (CheckConfigUtil.isValidParam(pluginConfig, KAFKA_CONFIG.key())) {
147-
pluginConfig
148-
.getObject(KAFKA_CONFIG.key())
149-
.forEach((key, value) -> kafkaProperties.put(key, value.unwrapped()));
148+
if (pluginConfig.get(KAFKA_CONFIG) != null) {
149+
pluginConfig.get(KAFKA_CONFIG).forEach((key, value) -> kafkaProperties.put(key, value));
150150
}
151-
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
151+
152+
if (pluginConfig.get(ASSIGN_PARTITIONS) != null) {
152153
kafkaProperties.put(
153154
ProducerConfig.PARTITIONER_CLASS_CONFIG,
154155
"org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
155156
}
157+
156158
kafkaProperties.put(
157-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
158-
pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
159+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, pluginConfig.get(BOOTSTRAP_SERVERS));
159160
kafkaProperties.put(
160161
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
161162
kafkaProperties.put(
@@ -164,24 +165,18 @@ private Properties getKafkaProperties(Config pluginConfig) {
164165
}
165166

166167
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
167-
Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
168-
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
169-
MessageFormat messageFormat = readonlyConfig.get(FORMAT);
168+
ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
169+
MessageFormat messageFormat = pluginConfig.get(FORMAT);
170170
String delimiter = DEFAULT_FIELD_DELIMITER;
171-
if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
172-
delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
173-
}
174-
String topic = null;
175-
if (pluginConfig.hasPath(TOPIC.key())) {
176-
topic = pluginConfig.getString(TOPIC.key());
171+
172+
if (pluginConfig.get(FIELD_DELIMITER) != null) {
173+
delimiter = pluginConfig.get(FIELD_DELIMITER);
177174
}
178-
if (pluginConfig.hasPath(PARTITION.key())) {
175+
176+
String topic = pluginConfig.get(TOPIC);
177+
if (pluginConfig.get(PARTITION) != null) {
179178
return DefaultSeaTunnelRowSerializer.create(
180-
topic,
181-
pluginConfig.getInt(PARTITION.key()),
182-
seaTunnelRowType,
183-
messageFormat,
184-
delimiter);
179+
topic, pluginConfig.get(PARTITION), seaTunnelRowType, messageFormat, delimiter);
185180
} else {
186181
return DefaultSeaTunnelRowSerializer.create(
187182
topic,
@@ -192,9 +187,9 @@ private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
192187
}
193188
}
194189

195-
private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
196-
if (pluginConfig.hasPath("semantics")) {
197-
return pluginConfig.getEnum(KafkaSemantics.class, "semantics");
190+
private KafkaSemantics getKafkaSemantics(ReadonlyConfig pluginConfig) {
191+
if (pluginConfig.get(SEMANTICS) != null) {
192+
return pluginConfig.get(SEMANTICS);
198193
}
199194
return KafkaSemantics.NON;
200195
}
@@ -211,10 +206,10 @@ private void restoreState(List<KafkaSinkState> states) {
211206
}
212207

213208
private List<String> getPartitionKeyFields(
214-
Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
215-
if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
216-
List<String> partitionKeyFields =
217-
pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
209+
ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
210+
211+
if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
212+
List<String> partitionKeyFields = pluginConfig.get(PARTITION_KEY_FIELDS);
218213
List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
219214
for (String partitionKeyField : partitionKeyFields) {
220215
if (!rowTypeFieldNames.contains(partitionKeyField)) {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.net.UnknownHostException;
6969
import java.nio.ByteBuffer;
7070
import java.nio.charset.StandardCharsets;
71+
import java.time.Duration;
7172
import java.time.Instant;
7273
import java.time.LocalDate;
7374
import java.util.ArrayList;
@@ -141,10 +142,12 @@ private void initializeCassandraTable() {
141142
session.execute(
142143
SimpleStatement.builder(config.getString(SOURCE_TABLE))
143144
.setKeyspace(KEYSPACE)
145+
.setTimeout(Duration.ofSeconds(10))
144146
.build());
145147
session.execute(
146148
SimpleStatement.builder(config.getString(SINK_TABLE))
147149
.setKeyspace(KEYSPACE)
150+
.setTimeout(Duration.ofSeconds(10))
148151
.build());
149152
} catch (Exception e) {
150153
throw new RuntimeException("Initializing Cassandra table failed!", e);

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ sink {
7373
source_table_name = "fake1"
7474
bootstrap.servers = "kafkaCluster:9092"
7575
topic = "${c_string}"
76+
format = json
7677
partition_key_fields = ["c_map","c_string"]
7778
}
7879
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ sink {
6363
Kafka {
6464
bootstrap.servers = "kafkaCluster:9092"
6565
topic = "test_topic"
66+
format = json
6667
partition_key_fields = ["c_map","c_string"]
6768
}
6869
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,6 @@ sink {
5353
bootstrap.servers = "kafkaCluster:9092"
5454
topic = "test-canal-sink"
5555
format = canal_json
56+
partition = 0
5657
}
5758
}

0 commit comments

Comments
 (0)