From 6e132e054a54bee3022f4b28567fe117bcaeaea2 Mon Sep 17 00:00:00 2001
From: 15203203634 <15203203634@163.com>
Date: Tue, 18 Oct 2022 22:14:47 +0800
Subject: [PATCH] [Feature][connectors-v2][kafka] Kafka supports custom schema
#2371
---
docs/en/connector-v2/source/kafka.md | 29 ++-
.../connector-kafka/pom.xml | 12 +-
.../seatunnel/kafka/config/Config.java | 26 +++
.../seatunnel/kafka/source/KafkaSource.java | 51 ++++-
.../kafka/source/KafkaSourceReader.java | 14 +-
.../connector-kafka-flink-e2e/pom.xml | 59 ++++++
.../e2e/flink/v2/kafka/KafkaContainer.java | 151 ++++++++++++++
.../v2/kafka/KafkaSourceJsonToConsoleIT.java | 181 +++++++++++++++++
.../v2/kafka/KafkaSourceTextToConsoleIT.java | 184 ++++++++++++++++++
.../kafka/kafkasource_json_to_console.conf | 91 +++++++++
.../kafka/kafkasource_text_to_console.conf | 92 +++++++++
.../src/test/resources/log4j.properties | 22 +++
.../seatunnel-flink-connector-v2-e2e/pom.xml | 1 +
.../connector-kafka-spark-e2e/pom.xml | 53 +++++
.../e2e/spark/v2/kafka/KafkaContainer.java | 151 ++++++++++++++
.../v2/kafka/KafkaSourceJsonToConsoleIT.java | 180 +++++++++++++++++
.../v2/kafka/KafkaSourceTextToConsoleIT.java | 184 ++++++++++++++++++
.../kafka/kafkasource_json_to_console.conf | 90 +++++++++
.../kafka/kafkasource_text_to_console.conf | 93 +++++++++
.../src/test/resources/log4j.properties | 22 +++
.../seatunnel-spark-connector-v2-e2e/pom.xml | 1 +
21 files changed, 1666 insertions(+), 21 deletions(-)
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties
create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties
diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md
index 40b1ae5ab30..dc5360d89b7 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -26,6 +26,8 @@ Source connector for Apache Kafka.
| commit_on_checkpoint | Boolean | no | true |
| kafka.* | String | no | - |
| common-options | | no | - |
+| schema | | no | - |
+| format | String | no | json |
### topic [string]
@@ -57,6 +59,13 @@ The way to specify parameters is to add the prefix `kafka.` to the original para
Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+### schema
+The structure of the data, including field names and field types.
+
+## format
+Data format. The default format is json. Optional text format. The default field separator is ", ".
+If you customize the delimiter, add the "field_delimiter" option.
+
## Example
### Simple
@@ -64,12 +73,22 @@ Source plugin common parameters, please refer to [Source Common Options](common-
```hocon
source {
- Kafka {
- topic = "seatunnel"
- bootstrap.servers = "localhost:9092"
- consumer.group = "seatunnel_group"
+ Kafka {
+ result_table_name = "kafka_name"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
}
-
+ format = text
+ field_delimiter = "#“
+ topic = "topic_1,topic_2,topic_3"
+ bootstrap.server = "localhost:9092"
+ kafka.max.poll.records = 500
+ kafka.client.id = client_1
+ }
+
}
```
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 4159e8783aa..3f8fe0ea4a6 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -28,7 +28,7 @@
4.0.0
connector-kafka
-
+
3.2.0
@@ -36,6 +36,11 @@
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
org.apache.kafka
kafka-clients
@@ -46,6 +51,11 @@
seatunnel-format-json
${project.version}
+
+ org.apache.seatunnel
+ seatunnel-format-text
+ ${project.version}
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index d577b2badfa..2dedcd13df6 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -50,6 +50,31 @@ public class Config {
*/
public static final String TRANSACTION_PREFIX = "transaction_prefix";
+ /**
+ * User-defined schema
+ */
+ public static final String SCHEMA = "schema";
+
+ /**
+ * data format
+ */
+ public static final String FORMAT = "format";
+
+ /**
+ * The default data format is JSON
+ */
+ public static final String DEFAULT_FORMAT = "json";
+
+ /**
+ * field delimiter
+ */
+ public static final String FIELD_DELIMITER = "field_delimiter";
+
+ /**
+ * The default field delimiter is “,”
+ */
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+
/**
* Send information according to the specified partition.
*/
@@ -64,4 +89,5 @@ public class Config {
* Determine the key of the kafka send partition
*/
public static final String PARTITION_KEY = "partition_key";
+
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index a6b6199f5f0..3c608f4a59b 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -20,17 +20,21 @@
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -38,7 +42,10 @@
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -52,6 +59,7 @@ public class KafkaSource implements SeaTunnelSource deserializationSchema;
private SeaTunnelRowType typeInfo;
private JobContext jobContext;
@@ -92,10 +100,7 @@ public void prepare(Config config) throws PrepareFailException {
this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
});
- // TODO support user custom row type
- this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
- new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});
-
+ setDeserialization(config);
}
@Override
@@ -105,7 +110,7 @@ public SeaTunnelRowType getProducedType() {
@Override
public SourceReader createReader(SourceReader.Context readerContext) throws Exception {
- return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext);
+ return new KafkaSourceReader(this.metadata, deserializationSchema, readerContext);
}
@Override
@@ -122,4 +127,36 @@ public SourceSplitEnumerator restoreEnumerat
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}
+
+ private void setDeserialization(Config config) {
+ if (config.hasPath(SCHEMA)) {
+ Config schema = config.getConfig(SCHEMA);
+ typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ String format = DEFAULT_FORMAT;
+ if (config.hasPath(FORMAT)) {
+ format = config.getString(FORMAT);
+ }
+ if (DEFAULT_FORMAT.equals(format)) {
+ deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+ } else if ("text".equals(format)) {
+ String delimiter = DEFAULT_FIELD_DELIMITER;
+ if (config.hasPath(FIELD_DELIMITER)) {
+ delimiter = config.getString(FIELD_DELIMITER);
+ }
+ deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(typeInfo)
+ .delimiter(delimiter)
+ .build();
+ } else {
+ // TODO: use format SPI
+ throw new UnsupportedOperationException("Unsupported format: " + format);
+ }
+ } else {
+ typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
+ this.deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(typeInfo)
+ .delimiter(String.valueOf('\002'))
+ .build();
+ }
+ }
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index bb548695612..24252ce68a1 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -17,11 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -59,15 +59,15 @@ public class KafkaSourceReader implements SourceReader sourceSplitMap;
private final Map consumerThreadMap;
private final ExecutorService executorService;
- // TODO support user custom type
- private SeaTunnelRowType typeInfo;
+ private final DeserializationSchema deserializationSchema;
- KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo,
+ KafkaSourceReader(ConsumerMetadata metadata,
+ DeserializationSchema deserializationSchema,
SourceReader.Context context) {
this.metadata = metadata;
this.context = context;
- this.typeInfo = typeInfo;
this.sourceSplits = new HashSet<>();
+ this.deserializationSchema = deserializationSchema;
this.consumerThreadMap = new ConcurrentHashMap<>();
this.sourceSplitMap = new ConcurrentHashMap<>();
this.checkpointOffsetMap = new ConcurrentHashMap<>();
@@ -114,9 +114,7 @@ public void pollNext(Collector output) throws Exception {
List> recordList = records.records(partition);
for (ConsumerRecord record : recordList) {
- String v = stringDeserializer.deserialize(partition.topic(), record.value());
- String t = partition.topic();
- output.collect(new SeaTunnelRow(new Object[]{t, v}));
+ deserializationSchema.deserialize(record.value(), output);
if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
record.offset() >= sourceSplit.getEndOffset()) {
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
new file mode 100644
index 00000000000..2eccfa9c567
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
@@ -0,0 +1,59 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-flink-connector-v2-e2e
+ ${revision}
+
+ 4.0.0
+
+ connector-kafka-flink-e2e
+
+
+
+ org.apache.seatunnel
+ connector-flink-e2e-base
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+
+
+ org.apache.seatunnel
+ connector-kafka
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-console
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-assert
+ ${project.version}
+ test
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
new file mode 100644
index 00000000000..7d7fe192046
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+/**
+ * This container wraps Confluent Kafka and Zookeeper (optionally)
+ */
+public class KafkaContainer extends GenericContainer {
+
+ private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
+
+ public static final int KAFKA_PORT = 9093;
+
+ public static final int ZOOKEEPER_PORT = 2181;
+
+ private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
+
+ protected String externalZookeeperConnect = null;
+
+ public KafkaContainer(final DockerImageName dockerImageName) {
+ super(dockerImageName);
+ dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+
+ withExposedPorts(KAFKA_PORT);
+
+ // Use two listeners with different names, it will force Kafka to communicate with itself via internal
+ // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
+ withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
+ withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+ withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+
+ withEnv("KAFKA_BROKER_ID", "1");
+ withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
+ withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+ }
+
+ public KafkaContainer withEmbeddedZookeeper() {
+ externalZookeeperConnect = null;
+ return self();
+ }
+
+ public KafkaContainer withExternalZookeeper(String connectString) {
+ externalZookeeperConnect = connectString;
+ return self();
+ }
+
+ public String getBootstrapServers() {
+ return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), getMappedPort(KAFKA_PORT));
+ }
+
+ @Override
+ protected void configure() {
+ withEnv(
+ "KAFKA_ADVERTISED_LISTENERS",
+ String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost")
+ );
+
+ String command = "#!/bin/bash\n";
+ if (externalZookeeperConnect != null) {
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
+ } else {
+ addExposedPort(ZOOKEEPER_PORT);
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
+ command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
+ command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
+ command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
+ command += "zookeeper-server-start zookeeper.properties &\n";
+ }
+
+ // Optimization: skip the checks
+ command += "echo '' > /etc/confluent/docker/ensure \n";
+ // Run the original command
+ command += "/etc/confluent/docker/run \n";
+ withCommand("sh", "-c", command);
+ }
+
+ @Override
+ @SneakyThrows
+ protected void containerIsStarted(InspectContainerResponse containerInfo) {
+ String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo);
+ ExecResult result = execInContainer(
+ "kafka-configs",
+ "--alter",
+ "--bootstrap-server",
+ brokerAdvertisedListener,
+ "--entity-type",
+ "brokers",
+ "--entity-name",
+ getEnvMap().get("KAFKA_BROKER_ID"),
+ "--add-config",
+ "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]"
+ );
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException(result.toString());
+ }
+ }
+
+ protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
+ return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
+ }
+
+ public String getLinuxLocalIp() {
+ String ip = "";
+ try {
+ Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces();
+ while (networkInterfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = networkInterfaces.nextElement();
+ Enumeration inetAddresses = networkInterface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) {
+ ip = inetAddress.getHostAddress();
+ }
+ }
+ }
+ } catch (SocketException ex) {
+ ex.printStackTrace();
+ }
+ return ip;
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
new file mode 100644
index 00000000000..e3468e8dd34
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceJsonToConsoleIT extends FlinkContainer {
+
+ private static final int KAFKA_PORT = 9093;
+
+ private static final String KAFKA_HOST = "kafkaCluster";
+
+ private KafkaProducer producer;
+
+ private KafkaContainer kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+ kafkaContainer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initKafkaProducer());
+ generateTestData();
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ private void generateTestData() {
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ }
+ );
+
+ DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ ProducerRecord producerRecord = serializer.serializeRow(row);
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafkaSourceJsonToConsole() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_json_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
new file mode 100644
index 00000000000..f0855b4551d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceTextToConsoleIT extends FlinkContainer {
+
+ private static final int KAFKA_PORT = 9093;
+
+ private static final String KAFKA_HOST = "kafkaCluster";
+
+ private KafkaProducer producer;
+
+ private KafkaContainer kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+ kafkaContainer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initKafkaProducer());
+ generateTestData();
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ private void generateTestData() {
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+
+ TextSerializationSchema serializationSchema = TextSerializationSchema.builder()
+ .seaTunnelRowType(seatunnelRowType)
+ .delimiter(",")
+ .build();
+
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+
+ ProducerRecord producerRecord = new ProducerRecord<>("test_topic", null, serializationSchema.serialize(row));
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafkaSourceTextToConsole() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_text_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
new file mode 100644
index 00000000000..62a1dc96742
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9093"
+ topic = "test_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ # The default format is json, which is optional
+ format = json
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
new file mode 100644
index 00000000000..c1b3c0d4713
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9093"
+ topic = "test_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ format = text
+ # The default field delimiter is ","
+ field_delimiter = ","
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..db5d9e51220
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 95d24c889e9..aea126d7f01 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -37,6 +37,7 @@
connector-mongodb-flink-e2e
connector-iceberg-flink-e2e
connector-influxdb-flink-e2e
+ connector-kafka-flink-e2e
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
new file mode 100644
index 00000000000..5f92a2599b9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-spark-connector-v2-e2e
+ ${revision}
+
+ 4.0.0
+
+ connector-kafka-spark-e2e
+
+
+
+ org.apache.seatunnel
+ connector-spark-e2e-base
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+
+
+ org.apache.seatunnel
+ connector-kafka
+ ${project.version}
+ test
+
+
+ org.apache.seatunnel
+ connector-console
+ ${project.version}
+ test
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
new file mode 100644
index 00000000000..431df9205e9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+/**
+ * This container wraps Confluent Kafka and Zookeeper (optionally)
+ */
+public class KafkaContainer extends GenericContainer {
+
+ private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
+
+ public static final int KAFKA_PORT = 9093;
+
+ public static final int ZOOKEEPER_PORT = 2181;
+
+ private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
+
+ protected String externalZookeeperConnect = null;
+
+ public KafkaContainer(final DockerImageName dockerImageName) {
+ super(dockerImageName);
+ dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+
+ withExposedPorts(KAFKA_PORT);
+
+ // Use two listeners with different names, it will force Kafka to communicate with itself via internal
+ // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
+ withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
+ withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+ withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+
+ withEnv("KAFKA_BROKER_ID", "1");
+ withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
+ withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
+ withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+ }
+
+ public KafkaContainer withEmbeddedZookeeper() {
+ externalZookeeperConnect = null;
+ return self();
+ }
+
+ public KafkaContainer withExternalZookeeper(String connectString) {
+ externalZookeeperConnect = connectString;
+ return self();
+ }
+
+ public String getBootstrapServers() {
+ return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), getMappedPort(KAFKA_PORT));
+ }
+
+ @Override
+ protected void configure() {
+ withEnv(
+ "KAFKA_ADVERTISED_LISTENERS",
+ String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost")
+ );
+
+ String command = "#!/bin/bash\n";
+ if (externalZookeeperConnect != null) {
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
+ } else {
+ addExposedPort(ZOOKEEPER_PORT);
+ withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
+ command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
+ command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
+ command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
+ command += "zookeeper-server-start zookeeper.properties &\n";
+ }
+
+ // Optimization: skip the checks
+ command += "echo '' > /etc/confluent/docker/ensure \n";
+ // Run the original command
+ command += "/etc/confluent/docker/run \n";
+ withCommand("sh", "-c", command);
+ }
+
+ @Override
+ @SneakyThrows
+ protected void containerIsStarted(InspectContainerResponse containerInfo) {
+ String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo);
+ ExecResult result = execInContainer(
+ "kafka-configs",
+ "--alter",
+ "--bootstrap-server",
+ brokerAdvertisedListener,
+ "--entity-type",
+ "brokers",
+ "--entity-name",
+ getEnvMap().get("KAFKA_BROKER_ID"),
+ "--add-config",
+ "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]"
+ );
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException(result.toString());
+ }
+ }
+
+ protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
+ return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
+ }
+
+ public String getLinuxLocalIp() {
+ String ip = "";
+ try {
+ Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces();
+ while (networkInterfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = networkInterfaces.nextElement();
+ Enumeration inetAddresses = networkInterface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) {
+ ip = inetAddress.getHostAddress();
+ }
+ }
+ }
+ } catch (SocketException ex) {
+ ex.printStackTrace();
+ }
+ return ip;
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
new file mode 100644
index 00000000000..8b9a425a9e2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@SuppressWarnings("checkstyle:EmptyLineSeparator")
+@Slf4j
+public class KafkaSourceJsonToConsoleIT extends SparkContainer {
+
+ private static final int KAFKA_PORT = 9093;
+
+ private static final String KAFKA_HOST = "kafkaCluster";
+
+ private KafkaProducer producer;
+
+ private KafkaContainer kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ kafkaContainer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initKafkaProducer());
+ generateTestData();
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ private void generateTestData() {
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ }
+ );
+
+ DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ ProducerRecord producerRecord = serializer.serializeRow(row);
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafkaSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_json_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
new file mode 100644
index 00000000000..c2842557cb3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceTextToConsoleIT extends SparkContainer {
+
+ private static final int KAFKA_PORT = 9093;
+
+ private static final String KAFKA_HOST = "kafkaCluster";
+
+ private KafkaProducer producer;
+
+ private KafkaContainer kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+ kafkaContainer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initKafkaProducer());
+ generateTestData();
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ private void generateTestData() {
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+
+ TextSerializationSchema serializationSchema = TextSerializationSchema.builder()
+ .seaTunnelRowType(seatunnelRowType)
+ .delimiter(",")
+ .build();
+
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+
+ ProducerRecord producerRecord = new ProducerRecord<>("test_topic", null, serializationSchema.serialize(row));
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafkaSourceTextToConsole() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_text_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
new file mode 100644
index 00000000000..91c92023722
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ source.parallelism = 1
+ job.mode = "BATCH"
+}
+
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9093"
+ topic = "test_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
new file mode 100644
index 00000000000..369f34a2f22
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ source.parallelism = 1
+ job.mode = "BATCH"
+}
+
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9093"
+ topic = "test_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ format = text
+ # The default field delimiter is ","
+ field_delimiter = ","
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..db5d9e51220
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 64fc9205633..885905feb84 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
connector-iotdb-spark-e2e
connector-jdbc-spark-e2e
connector-mongodb-spark-e2e
+ connector-kafka-spark-e2e
connector-influxdb-spark-e2e