From a1536c27e3e8378e73782a0075f258bc9886457b Mon Sep 17 00:00:00 2001 From: 15203203634 <15203203634@163.com> Date: Mon, 12 Sep 2022 21:55:08 +0800 Subject: [PATCH] [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema --- .../connector-kafka/pom.xml | 11 +++ .../config/{Config.java => KafkaConfig.java} | 10 ++- .../seatunnel/kafka/sink/KafkaSinkWriter.java | 5 +- .../seatunnel/kafka/source/KafkaSource.java | 70 +++++++++++-------- .../kafka/source/KafkaSourceReader.java | 24 ++++--- .../connector-kafka-flink-e2e/pom.xml | 47 +++++++++++++ .../seatunnel/e2e/flink/v2/kafka/KafkaIT.java | 20 ++++++ .../resources/kafka/kafka_to_console.conf | 58 +++++++++++++++ .../src/test/resources/log4j.properties | 22 ++++++ .../seatunnel-flink-connector-v2-e2e/pom.xml | 1 + .../flink/v2/KafkaToConsoleExample.java | 51 ++++++++++++++ .../resources/examples/kafka_to_console.conf | 52 ++++++++++++++ 12 files changed, 330 insertions(+), 41 deletions(-) rename seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/{Config.java => KafkaConfig.java} (86%) create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafka_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/KafkaToConsoleExample.java create mode 100644 seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/kafka_to_console.conf diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml index 75e56237b24..50e469d3982 100644 --- a/seatunnel-connectors-v2/connector-kafka/pom.xml +++ b/seatunnel-connectors-v2/connector-kafka/pom.xml @@ -34,6 +34,17 @@ + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + + + + org.apache.seatunnel + connector-common + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java similarity index 86% rename from seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java rename to seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java index ee1492044e6..a21993ee0c3 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.config; -public class Config { +public class KafkaConfig { /** * The topic of kafka. */ @@ -50,4 +50,12 @@ public class Config { * The prefix of kafka's transactionId, make sure different job use different prefix. */ public static final String TRANSACTION_PREFIX = "transaction_prefix"; + + public static final String SCHEMA = "schema"; + + public static final String FORMAT = "format"; + + public static final String DEFAULT_FORMAT = "json"; + + public static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group"; } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java index b3a20d0b152..dfd23f042c4 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java @@ -17,12 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink; -import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.TRANSACTION_PREFIX; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig; import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics; import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer; @@ -124,7 +125,7 @@ public void close() { private Properties getKafkaProperties(Config pluginConfig) { Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig, - org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, true); + KafkaConfig.KAFKA_CONFIG_PREFIX, true); Properties kafkaProperties = new Properties(); kafkaConfig.entrySet().forEach(entry -> { kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped()); diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index a4d534e6775..113e3844a15 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -17,14 +17,15 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source; -import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVER; -import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT; -import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP; -import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN; -import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.BOOTSTRAP_SERVER; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.COMMIT_ON_CHECKPOINT; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.CONSUMER_GROUP; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.PATTERN; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.TOPIC; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; @@ -38,8 +39,11 @@ import org.apache.seatunnel.common.config.TypesafeConfigUtils; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.google.auto.service.AutoService; @@ -47,9 +51,9 @@ import java.util.Properties; @AutoService(SeaTunnelSource.class) -public class KafkaSource implements SeaTunnelSource { - - private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group"; +public class KafkaSource implements SeaTunnelSource { + public static final String IDENTIFIER = "Kafka"; + private DeserializationSchema deserializationSchema; private final ConsumerMetadata metadata = new ConsumerMetadata(); private SeaTunnelRowType typeInfo; @@ -62,50 +66,47 @@ public Boundedness getBoundedness() { @Override public String getPluginName() { - return "Kafka"; + return IDENTIFIER; } @Override - public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVER); + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TOPIC, BOOTSTRAP_SERVER); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } - this.metadata.setTopic(config.getString(TOPIC)); - if (config.hasPath(PATTERN)) { - this.metadata.setPattern(config.getBoolean(PATTERN)); + this.metadata.setTopic(pluginConfig.getString(TOPIC)); + if (pluginConfig.hasPath(PATTERN)) { + this.metadata.setPattern(pluginConfig.getBoolean(PATTERN)); } - this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER)); + this.metadata.setBootstrapServer(pluginConfig.getString(BOOTSTRAP_SERVER)); this.metadata.setProperties(new Properties()); - if (config.hasPath(CONSUMER_GROUP)) { - this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP)); + if (pluginConfig.hasPath(CONSUMER_GROUP)) { + this.metadata.setConsumerGroup(pluginConfig.getString(CONSUMER_GROUP)); } else { - this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP); + this.metadata.setConsumerGroup(KafkaConfig.DEFAULT_CONSUMER_GROUP); } - if (config.hasPath(COMMIT_ON_CHECKPOINT)) { - this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT)); + if (pluginConfig.hasPath(COMMIT_ON_CHECKPOINT)) { + this.metadata.setCommitOnCheckpoint(pluginConfig.getBoolean(COMMIT_ON_CHECKPOINT)); } - TypesafeConfigUtils.extractSubConfig(config, "kafka.", false).entrySet().forEach(e -> { + TypesafeConfigUtils.extractSubConfig(pluginConfig, "kafka.", false).entrySet().forEach(e -> { this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped())); }); - // TODO support user custom row type - this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"}, - new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE}); - + setDeserialization(pluginConfig); } @Override - public SeaTunnelRowType getProducedType() { - return this.typeInfo; + public SeaTunnelDataType getProducedType() { + return deserializationSchema.getProducedType(); } @Override - public SourceReader createReader(SourceReader.Context readerContext) throws Exception { - return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext); + public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + return new KafkaSourceReader<>(this.metadata, this.deserializationSchema, readerContext); } @Override @@ -122,4 +123,15 @@ public SourceSplitEnumerator restoreEnumerat public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) { this.seaTunnelContext = seaTunnelContext; } + + private void setDeserialization(Config pluginConfig) { + SeaTunnelRowType rowType; + if (pluginConfig.hasPath(KafkaConfig.SCHEMA)) { + Config schema = pluginConfig.getConfig(KafkaConfig.SCHEMA); + rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); + } else { + rowType = SeaTunnelSchema.buildSimpleTextSchema(); + } + deserializationSchema = (DeserializationSchema) new JsonDeserializationSchema(false, false, rowType); + } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java index d4272e4a3fd..432032e1f19 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source; +import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; @@ -46,7 +47,7 @@ import java.util.Set; import java.util.stream.Collectors; -public class KafkaSourceReader implements SourceReader { +public class KafkaSourceReader implements SourceReader { private static final long THREAD_WAIT_TIME = 500L; private static final long POLL_TIMEOUT = 10000L; @@ -58,15 +59,16 @@ public class KafkaSourceReader implements SourceReader sourceSplits; private final Map endOffset; - // TODO support user custom type - private SeaTunnelRowType typeInfo; + + private final DeserializationSchema deserializationSchema; private volatile boolean isRunning; - KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo, + KafkaSourceReader(ConsumerMetadata metadata, + DeserializationSchema deserializationSchema, SourceReader.Context context) { this.metadata = metadata; this.context = context; - this.typeInfo = typeInfo; + this.deserializationSchema = deserializationSchema; this.sourceSplits = new HashSet<>(); this.endOffset = new HashMap<>(); } @@ -87,7 +89,7 @@ public void close() throws IOException { } @Override - public void pollNext(Collector output) throws Exception { + public void pollNext(Collector output) throws Exception { if (sourceSplits.isEmpty() || sourceSplits.size() != this.endOffset.size()) { Thread.sleep(THREAD_WAIT_TIME); return; @@ -101,9 +103,13 @@ public void pollNext(Collector output) throws Exception { for (TopicPartition partition : partitions) { for (ConsumerRecord record : records.records(partition)) { - String v = stringDeserializer.deserialize(partition.topic(), record.value()); - String t = partition.topic(); - output.collect(new SeaTunnelRow(new Object[]{t, v})); + if(deserializationSchema != null){ + deserializationSchema.deserialize(record.value(),output); + }else{ + String v = stringDeserializer.deserialize(partition.topic(), record.value()); + String t = partition.topic(); + output.collect((T) new SeaTunnelRow(new Object[]{t, v})); + } if (Boundedness.BOUNDED.equals(context.getBoundedness()) && record.offset() >= endOffset.get(partition)) { diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml new file mode 100644 index 00000000000..368db8e2892 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml @@ -0,0 +1,47 @@ + + + + + org.apache.seatunnel + seatunnel-flink-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-kafka-flink-e2e + + + + org.apache.seatunnel + connector-flink-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-kafka + ${project.version} + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaIT.java new file mode 100644 index 00000000000..df35eaac284 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaIT.java @@ -0,0 +1,20 @@ +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +public class KafkaIT extends FlinkContainer { + + /** + * kafka source -> console sink + */ + @Test + public void testFakeSourceToLocalFileText() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafka_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafka_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafka_to_console.conf new file mode 100644 index 00000000000..6e5da70b53f --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafka_to_console.conf @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + result_table_name = "kafka" + schema = { + fields { + name = "string" + age = "int" + } + } + #format = text // or json + #text.column.delimiter = "," // example | ' ' @ _ + topic = "test_csv" + bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + sql { + sql = "select name,age from kafka" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index 61d7911a548..c5b66fba959 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -36,6 +36,7 @@ connector-assert-flink-e2e connector-fake-flink-e2e connector-mongodb-flink-e2e + connector-kafka-flink-e2e diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/KafkaToConsoleExample.java b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/KafkaToConsoleExample.java new file mode 100644 index 00000000000..ecb4ef72751 --- /dev/null +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/KafkaToConsoleExample.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.example.flink.v2; + +import org.apache.seatunnel.core.starter.Seatunnel; +import org.apache.seatunnel.core.starter.command.Command; +import org.apache.seatunnel.core.starter.exception.CommandException; +import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; +import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder; + +import java.io.FileNotFoundException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; + +public class KafkaToConsoleExample { + + public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { + String configFile = getTestConfigFile("/examples/kafka_to_console.conf"); + FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); + flinkCommandArgs.setConfigFile(configFile); + flinkCommandArgs.setCheckConfig(false); + flinkCommandArgs.setVariables(null); + Command flinkCommand = + new FlinkCommandBuilder().buildCommand(flinkCommandArgs); + Seatunnel.run(flinkCommand); + } + + public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { + URL resource = FakeToLocalFileExample.class.getResource(configFile); + if (resource == null) { + throw new FileNotFoundException("Can't find config file: " + configFile); + } + return Paths.get(resource.toURI()).toString(); + } +} diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/kafka_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/kafka_to_console.conf new file mode 100644 index 00000000000..f4120af17a5 --- /dev/null +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/kafka_to_console.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + result_table_name = "kafka" + schema = { + fields { + name = "string" + age = "int" + } + } + #format = text // or json + #text.column.delimiter = "," // example | ' ' @ _ + topic = "test_csv" + bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092" + } +} + +transform { + sql { + sql = "select name,age from kafka" + } +} + +sink { + Console {} +} \ No newline at end of file