diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md
new file mode 100644
index 00000000000..c7fae4f10bd
--- /dev/null
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -0,0 +1,90 @@
+# Apache Kafka
+
+> Apache Kafka source connector
+
+## Description
+
+Source connector for Apache Kafka.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+| --- | --- | --- | --- |
+| topic | String | Yes | - |
+| pattern | Boolean | No | - |
+| bootstrap.servers | String | Yes | - |
+| consumer.group | String | No | SeaTunnel-Consumer-Group |
+| commit_on_checkpoint | Boolean | No | - |
+| schema | Config | No | content |
+| format | String | No | json |
+| result_table_name | String | No | - |
+
+
+### topic [String]
+
+Kafka topic name, If there are multiple topics, use , to split, for example: "tpc1,tpc2", If Pattern is set to True,
+Support regular matching topic, for example: `tpc.*`;
+
+### pattern [Boolean]
+
+Whether to enable the regular matching topic, use java pattern match topic, Set to `true` to start the regular matching topic;
+
+### bootstrap.servers [String]
+
+The server address of kafka cluster, for example: `hadoop101:9092,hadoop102:9092,hadoop103:9092`;
+
+### consumer.group [String]
+
+Kafka consumer group. The default value is `SeaTunnel-Consumer-Group`;
+
+### commit_on_checkpoint [Boolean]
+
+If `true` the consumer's offset will be periodically committed in the background;
+
+### schema [Config]
+
+User - defined data type, refer to the article: Schema ;
+
+### format [String]
+
+Data format, By default, data of the JSON type is read. Other Settings will be treated as strings, for example `json`;
+
+### kafka. [String]
+
+Used to set up Kafka's configuration, for example: `kafka.max.poll.records = 500`, You can configure multiple, Will be added to the consumer's configuration;
+For details, see Configuration of KafkaConsumer;
+
+### result_table_name [String]
+
+The table name that is converted after reading and used in the transformed SQL query;
+
+
+## Example
+
+```kafka {
+source {
+ Kafka {
+ result_table_name = "kafka_name"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ format = json
+ topic = "topic_1,topic_2,topic_3"
+ bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092"
+ kafka.max.poll.records = 500
+ kafka.client.id = client_1
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/zh-CN/connector-v2/source/kafka.md b/docs/zh-CN/connector-v2/source/kafka.md
new file mode 100644
index 00000000000..161bf72aae0
--- /dev/null
+++ b/docs/zh-CN/connector-v2/source/kafka.md
@@ -0,0 +1,89 @@
+# Apache Kafka
+
+> Apache Kafka 源连接器
+
+## 描述
+
+Apache Kafka 的源连接器。
+
+## 主要特性
+
+- [x] [批](../../concept/connector-v2-features.md)
+- [x] [流](../../concept/connector-v2-features.md)
+- [x] [精准一次](../../concept/connector-v2-features.md)
+- [x] [模式投影](../../concept/connector-v2-features.md)
+- [x] [并行度](../../concept/connector-v2-features.md)
+- [ ] [支持用户自定义切分](../../concept/connector-v2-features.md)
+
+## 选项
+
+| 名字 | 类型 | 是否必须 | 默认值 |
+| --- | --- | --- | --- |
+| topic | String | Yes | - |
+| pattern | Boolean | No | - |
+| bootstrap.servers | String | Yes | - |
+| consumer.group | String | No | SeaTunnel-Consumer-Group |
+| commit_on_checkpoint | Boolean | No | - |
+| schema | Config | No | content |
+| format | String | No | json |
+| result_table_name | String | No | - |
+
+
+### topic [String]
+
+Kafka topic 名称,如果有多个 topic,使用`,`来分割,例如:`tpc1,tpc2`,如果Pattern设置为`true`,
+支持常规匹配主题,例如: `tpc.*`;
+
+### pattern [Boolean]
+
+是否启用常规匹配主题,使用java模式匹配 topic,设置为`true`启动常规匹配主题;
+
+### bootstrap.servers [String]
+
+kafka 集群的服务器地址,例如 : `hadoop101:9092,hadoop102:9092,hadoop103:9092`;
+
+### consumer.group [String]
+
+Kafka 消费者组,默认值是 `SeaTunnel-Consumer-Group`;
+
+### commit_on_checkpoint [Boolean]
+
+设置为`true`,消费者的偏移量将在后台定期提交;
+
+### schema [Config]
+
+用户定义的数据类型,参见文章:Schema;
+
+### format [String]
+
+数据格式,缺省情况下,读取`json`类型的数据。其他设置将被视为字符串,例如`json`;
+
+### kafka. [String]
+
+用于设置 Kafka 的配置,例如:`Kafka .max.poll.records = 500`,可以配置多个,依次添加到消费者的配置中;
+详情请参见KafkaConsumer的配置;
+
+### result_table_name [String]
+
+读取后转换并在转换后的SQL查询中使用的表名;
+
+## 比如
+
+```kafka {
+source {
+ Kafka {
+ result_table_name = "kafka_name"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ format = json
+ topic = "topic_1,topic_2,topic_3"
+ bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092"
+ kafka.max.poll.records = 500
+ kafka.client.id = client_1
+ }
+}
+```
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 4159e8783aa..007adf348b4 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -46,6 +46,11 @@
seatunnel-format-json
${project.version}
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java
similarity index 84%
rename from seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
rename to seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java
index d48d12cf646..059e3078ec7 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.config;
-public class Config {
+public class KafkaConfig {
/**
* The topic of kafka.
*/
@@ -50,4 +50,20 @@ public class Config {
* The prefix of kafka's transactionId, make sure different job use different prefix.
*/
public static final String TRANSACTION_PREFIX = "transaction_prefix";
+
+ /**
+ * User-defined schema
+ */
+ public static final String SCHEMA = "schema";
+
+ /**
+ * data format
+ */
+ public static final String FORMAT = "format";
+
+ /**
+ * The default data format is JSON
+ */
+ public static final String DEFAULT_FORMAT = "json";
+
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 3f71c308525..665a40f4505 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,12 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.TRANSACTION_PREFIX;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
@@ -123,7 +124,7 @@ public void close() {
private Properties getKafkaProperties(Config pluginConfig) {
Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
- org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, false);
+ KafkaConfig.KAFKA_CONFIG_PREFIX, false);
Properties kafkaProperties = new Properties();
kafkaConfig.entrySet().forEach(entry -> {
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index a6b6199f5f0..35a5c583cfb 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -17,20 +17,22 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.BOOTSTRAP_SERVERS;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.COMMIT_ON_CHECKPOINT;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.CONSUMER_GROUP;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.PATTERN;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.SCHEMA;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig.DEFAULT_FORMAT;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -38,8 +40,10 @@
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
@@ -54,6 +58,7 @@ public class KafkaSource implements SeaTunnelSource deserializationSchema;
@Override
public Boundedness getBoundedness() {
@@ -92,10 +97,7 @@ public void prepare(Config config) throws PrepareFailException {
this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
});
- // TODO support user custom row type
- this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
- new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});
-
+ setDeserialization(config);
}
@Override
@@ -105,7 +107,7 @@ public SeaTunnelRowType getProducedType() {
@Override
public SourceReader createReader(SourceReader.Context readerContext) throws Exception {
- return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext);
+ return new KafkaSourceReader(this.metadata, this.typeInfo, deserializationSchema, readerContext);
}
@Override
@@ -122,4 +124,27 @@ public SourceSplitEnumerator restoreEnumerat
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}
+
+ private void setDeserialization(Config config) {
+ if (config.hasPath(SCHEMA)) {
+ Config schema = config.getConfig(SCHEMA);
+ typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ } else {
+ typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
+ }
+ // TODO: use format SPI
+ // default use json format
+ String format;
+ if(config.hasPath(FORMAT)){
+ if(DEFAULT_FORMAT.equals(config.getString(FORMAT))){
+ deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+ }else{
+ format = config.getString(FORMAT);
+ this.deserializationSchema = null;
+ }
+ }else {
+ format = DEFAULT_FORMAT;
+ deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+ }
+ }
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index dc9c6209375..65893af5102 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
@@ -63,14 +64,17 @@ public class KafkaSourceReader implements SourceReader sourceSplitMap;
private final Map consumerThreadMap;
private final ExecutorService executorService;
+ private final DeserializationSchema deserializationSchema;
// TODO support user custom type
private SeaTunnelRowType typeInfo;
KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo,
+ DeserializationSchema deserializationSchema,
SourceReader.Context context) {
this.metadata = metadata;
this.context = context;
this.typeInfo = typeInfo;
+ this.deserializationSchema = deserializationSchema;
this.sourceSplits = new HashSet<>();
this.consumerThreadMap = new ConcurrentHashMap<>();
this.sourceSplitMap = new ConcurrentHashMap<>();
@@ -118,9 +122,12 @@ public void pollNext(Collector output) throws Exception {
List> recordList = records.records(partition);
for (ConsumerRecord record : recordList) {
- String v = stringDeserializer.deserialize(partition.topic(), record.value());
- String t = partition.topic();
- output.collect(new SeaTunnelRow(new Object[]{t, v}));
+ if(deserializationSchema != null){
+ deserializationSchema.deserialize(record.value(),output);
+ }else {
+ String content = stringDeserializer.deserialize(partition.topic(), record.value());
+ output.collect(new SeaTunnelRow(new Object[]{content}));
+ }
if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
record.offset() >= sourceSplit.getEndOffset()) {
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
new file mode 100644
index 00000000000..2eccfa9c567
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
@@ -0,0 +1,59 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-flink-connector-v2-e2e
+ ${revision}
+
+ 4.0.0
+
+ connector-kafka-flink-e2e
+
+
+
+ org.apache.seatunnel
+ connector-flink-e2e-base
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+
+
+ org.apache.seatunnel
+ connector-kafka
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-console
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-assert
+ ${project.version}
+ test
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceToConsoleIT.java
new file mode 100644
index 00000000000..959f64f2b57
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceToConsoleIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceToConsoleIT extends FlinkContainer {
+
+ private static final String KAFKA_IMAGE = "bitnami/kafka:latest";
+
+ private static final String KAFKA_HOST = "kafkaCluster";
+
+ public static final int KAFKA_PORT = 9092;
+
+ private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
+
+ private KafkaProducer producer;
+
+ private GenericContainer> kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ DockerImageName imageName = DockerImageName.parse(KAFKA_IMAGE);
+ kafkaContainer = new GenericContainer<>(imageName)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withExposedPorts(KAFKA_PORT)
+ .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092")
+ .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+ .withEnv("KAFKA_BROKER_ID", "1")
+ .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
+ .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
+ .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS);
+ this.generateTestData();
+ }
+
+ private void generateTestData() {
+
+ initKafkaProducer();
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ }
+ );
+
+ DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ ProducerRecord producerRecord = serializer.serializeRow(row);
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafkaSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasouce_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", KAFKA_HOST + ":" + KAFKA_PORT);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.BytesSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.BytesSerializer");
+ producer = new KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasouce_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasouce_to_console.conf
new file mode 100644
index 00000000000..b80be327351
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasouce_to_console.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Kafka {
+ bootstrap.server = "kafkaCluster:9092"
+ topic = "test_topic"
+ result_table_name = "kafka"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+ sql {
+ sql = "select * from kafka"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Console {}
+ Assert {
+ rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..db5d9e51220
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 9f9f264ae6c..c093fa5e924 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -37,6 +37,7 @@
connector-fake-flink-e2e
connector-mongodb-flink-e2e
connector-iceberg-flink-e2e
+ connector-kafka-flink-e2e
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
new file mode 100644
index 00000000000..26d25c4c6c2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
@@ -0,0 +1,58 @@
+
+
+
+
+ seatunnel-spark-connector-v2-e2e
+ org.apache.seatunnel
+ 2.1.3-SNAPSHOT
+
+ 4.0.0
+
+ connector-kafka-spark-e2e
+
+
+
+ org.apache.seatunnel
+ connector-spark-e2e-base
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+
+
+ org.apache.seatunnel
+ connector-kafka
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-console
+ ${project.version}
+ test
+
+
+ com.alibaba
+ fastjson
+ 1.2.75
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceToConsoleIT.java
new file mode 100644
index 00000000000..0db3688f988
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceToConsoleIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceToConsoleIT extends SparkContainer {
+
+ private static final String KAFKA_IMAGE = "bitnami/kafka:latest";
+
+ private static final String KAFKA_HOST = "kafkaCluster";
+
+ public static final int KAFKA_PORT = 9092;
+
+ private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
+
+ private KafkaProducer producer;
+
+ private GenericContainer> kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ DockerImageName imageName = DockerImageName.parse(KAFKA_IMAGE);
+ kafkaContainer = new GenericContainer<>(imageName)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withExposedPorts(KAFKA_PORT)
+ .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092")
+ .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+ .withEnv("KAFKA_BROKER_ID", "1")
+ .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF)
+ .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
+ .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
+ .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS);
+ this.generateTestData();
+ }
+
+ private void generateTestData() {
+
+ initKafkaProducer();
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ }
+ );
+
+ DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ ProducerRecord producerRecord = serializer.serializeRow(row);
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafkaSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", KAFKA_HOST + ":" + KAFKA_PORT);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.BytesSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.BytesSerializer");
+ producer = new KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_to_console.conf
new file mode 100644
index 00000000000..b80be327351
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_to_console.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Kafka {
+ bootstrap.server = "kafkaCluster:9092"
+ topic = "test_topic"
+ result_table_name = "kafka"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+ sql {
+ sql = "select * from kafka"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Console {}
+ Assert {
+ rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..db5d9e51220
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index cc5cc2bafa3..ef436c7284d 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -35,6 +35,7 @@
connector-jdbc-spark-e2e
connector-redis-spark-e2e
connector-mongodb-spark-e2e
+ connector-kafka-spark-e2e