From f4383f245607c084270d45be6510c8f26f44ac78 Mon Sep 17 00:00:00 2001 From: 15203203634 <15203203634@163.com> Date: Tue, 18 Oct 2022 20:30:18 +0800 Subject: [PATCH] [Feature][connectors-v2][kafka] Kafka supports custom schema #2371 --- docs/en/connector-v2/source/kafka.md | 29 ++- .../connector-kafka/pom.xml | 12 +- .../seatunnel/kafka/config/Config.java | 26 +++ .../seatunnel/kafka/source/KafkaSource.java | 51 ++++- .../kafka/source/KafkaSourceReader.java | 14 +- .../connector-kafka-flink-e2e/pom.xml | 59 ++++++ .../e2e/flink/v2/kafka/KafkaContainer.java | 151 ++++++++++++++ .../v2/kafka/KafkaSourceJsonToConsoleIT.java | 181 +++++++++++++++++ .../v2/kafka/KafkaSourceTextToConsoleIT.java | 184 ++++++++++++++++++ .../kafka/kafkasource_json_to_console.conf | 91 +++++++++ .../kafka/kafkasource_text_to_console.conf | 92 +++++++++ .../src/test/resources/log4j.properties | 22 +++ .../seatunnel-flink-connector-v2-e2e/pom.xml | 1 + .../connector-kafka-spark-e2e/pom.xml | 53 +++++ .../e2e/spark/v2/kafka/KafkaContainer.java | 151 ++++++++++++++ .../v2/kafka/KafkaSourceJsonToConsoleIT.java | 180 +++++++++++++++++ .../v2/kafka/KafkaSourceTextToConsoleIT.java | 183 +++++++++++++++++ .../kafka/kafkasource_json_to_console.conf | 90 +++++++++ .../kafka/kafkasource_text_to_console.conf | 93 +++++++++ .../src/test/resources/log4j.properties | 22 +++ .../seatunnel-spark-connector-v2-e2e/pom.xml | 1 + 21 files changed, 1665 insertions(+), 21 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 40b1ae5ab30..dc5360d89b7 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -26,6 +26,8 @@ Source connector for Apache Kafka. | commit_on_checkpoint | Boolean | no | true | | kafka.* | String | no | - | | common-options | | no | - | +| schema | | no | - | +| format | String | no | json | ### topic [string] @@ -57,6 +59,13 @@ The way to specify parameters is to add the prefix `kafka.` to the original para Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. +### schema +The structure of the data, including field names and field types. + +## format +Data format. The default format is json. Optional text format. The default field separator is ", ". +If you customize the delimiter, add the "field_delimiter" option. + ## Example ### Simple @@ -64,12 +73,22 @@ Source plugin common parameters, please refer to [Source Common Options](common- ```hocon source { - Kafka { - topic = "seatunnel" - bootstrap.servers = "localhost:9092" - consumer.group = "seatunnel_group" + Kafka { + result_table_name = "kafka_name" + schema = { + fields { + name = "string" + age = "int" + } } - + format = text + field_delimiter = "#“ + topic = "topic_1,topic_2,topic_3" + bootstrap.server = "localhost:9092" + kafka.max.poll.records = 500 + kafka.client.id = client_1 + } + } ``` diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml index 4159e8783aa..3f8fe0ea4a6 100644 --- a/seatunnel-connectors-v2/connector-kafka/pom.xml +++ b/seatunnel-connectors-v2/connector-kafka/pom.xml @@ -28,7 +28,7 @@ 4.0.0 connector-kafka - + 3.2.0 @@ -36,6 +36,11 @@ + + org.apache.seatunnel + connector-common + ${project.version} + org.apache.kafka kafka-clients @@ -46,6 +51,11 @@ seatunnel-format-json ${project.version} + + org.apache.seatunnel + seatunnel-format-text + ${project.version} + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index d577b2badfa..2dedcd13df6 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -50,6 +50,31 @@ public class Config { */ public static final String TRANSACTION_PREFIX = "transaction_prefix"; + /** + * User-defined schema + */ + public static final String SCHEMA = "schema"; + + /** + * data format + */ + public static final String FORMAT = "format"; + + /** + * The default data format is JSON + */ + public static final String DEFAULT_FORMAT = "json"; + + /** + * field delimiter + */ + public static final String FIELD_DELIMITER = "field_delimiter"; + + /** + * The default field delimiter is “,” + */ + public static final String DEFAULT_FIELD_DELIMITER = ","; + /** * Send information according to the specified partition. */ @@ -64,4 +89,5 @@ public class Config { * Determine the key of the kafka send partition */ public static final String PARTITION_KEY = "partition_key"; + } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index a6b6199f5f0..3c608f4a59b 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -20,17 +20,21 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC; import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; @@ -38,7 +42,10 @@ import org.apache.seatunnel.common.config.TypesafeConfigUtils; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; +import org.apache.seatunnel.format.text.TextDeserializationSchema; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -52,6 +59,7 @@ public class KafkaSource implements SeaTunnelSource deserializationSchema; private SeaTunnelRowType typeInfo; private JobContext jobContext; @@ -92,10 +100,7 @@ public void prepare(Config config) throws PrepareFailException { this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped())); }); - // TODO support user custom row type - this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"}, - new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE}); - + setDeserialization(config); } @Override @@ -105,7 +110,7 @@ public SeaTunnelRowType getProducedType() { @Override public SourceReader createReader(SourceReader.Context readerContext) throws Exception { - return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext); + return new KafkaSourceReader(this.metadata, deserializationSchema, readerContext); } @Override @@ -122,4 +127,36 @@ public SourceSplitEnumerator restoreEnumerat public void setJobContext(JobContext jobContext) { this.jobContext = jobContext; } + + private void setDeserialization(Config config) { + if (config.hasPath(SCHEMA)) { + Config schema = config.getConfig(SCHEMA); + typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); + String format = DEFAULT_FORMAT; + if (config.hasPath(FORMAT)) { + format = config.getString(FORMAT); + } + if (DEFAULT_FORMAT.equals(format)) { + deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo); + } else if ("text".equals(format)) { + String delimiter = DEFAULT_FIELD_DELIMITER; + if (config.hasPath(FIELD_DELIMITER)) { + delimiter = config.getString(FIELD_DELIMITER); + } + deserializationSchema = TextDeserializationSchema.builder() + .seaTunnelRowType(typeInfo) + .delimiter(delimiter) + .build(); + } else { + // TODO: use format SPI + throw new UnsupportedOperationException("Unsupported format: " + format); + } + } else { + typeInfo = SeaTunnelSchema.buildSimpleTextSchema(); + this.deserializationSchema = TextDeserializationSchema.builder() + .seaTunnelRowType(typeInfo) + .delimiter(String.valueOf('\002')) + .build(); + } + } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java index bb548695612..24252ce68a1 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source; +import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -59,15 +59,15 @@ public class KafkaSourceReader implements SourceReader sourceSplitMap; private final Map consumerThreadMap; private final ExecutorService executorService; - // TODO support user custom type - private SeaTunnelRowType typeInfo; + private final DeserializationSchema deserializationSchema; - KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo, + KafkaSourceReader(ConsumerMetadata metadata, + DeserializationSchema deserializationSchema, SourceReader.Context context) { this.metadata = metadata; this.context = context; - this.typeInfo = typeInfo; this.sourceSplits = new HashSet<>(); + this.deserializationSchema = deserializationSchema; this.consumerThreadMap = new ConcurrentHashMap<>(); this.sourceSplitMap = new ConcurrentHashMap<>(); this.checkpointOffsetMap = new ConcurrentHashMap<>(); @@ -114,9 +114,7 @@ public void pollNext(Collector output) throws Exception { List> recordList = records.records(partition); for (ConsumerRecord record : recordList) { - String v = stringDeserializer.deserialize(partition.topic(), record.value()); - String t = partition.topic(); - output.collect(new SeaTunnelRow(new Object[]{t, v})); + deserializationSchema.deserialize(record.value(), output); if (Boundedness.BOUNDED.equals(context.getBoundedness()) && record.offset() >= sourceSplit.getEndOffset()) { diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml new file mode 100644 index 00000000000..2eccfa9c567 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml @@ -0,0 +1,59 @@ + + + + + org.apache.seatunnel + seatunnel-flink-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-kafka-flink-e2e + + + + org.apache.seatunnel + connector-flink-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-kafka + ${project.version} + test + + + org.apache.seatunnel + connector-console + ${project.version} + test + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java new file mode 100644 index 00000000000..7d7fe192046 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import lombok.SneakyThrows; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; + +/** + * This container wraps Confluent Kafka and Zookeeper (optionally) + */ +public class KafkaContainer extends GenericContainer { + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); + + public static final int KAFKA_PORT = 9093; + + public static final int ZOOKEEPER_PORT = 2181; + + private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + + protected String externalZookeeperConnect = null; + + public KafkaContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + + withExposedPorts(KAFKA_PORT); + + // Use two listeners with different names, it will force Kafka to communicate with itself via internal + // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092"); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + + withEnv("KAFKA_BROKER_ID", "1"); + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); + withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + } + + public KafkaContainer withEmbeddedZookeeper() { + externalZookeeperConnect = null; + return self(); + } + + public KafkaContainer withExternalZookeeper(String connectString) { + externalZookeeperConnect = connectString; + return self(); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), getMappedPort(KAFKA_PORT)); + } + + @Override + protected void configure() { + withEnv( + "KAFKA_ADVERTISED_LISTENERS", + String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost") + ); + + String command = "#!/bin/bash\n"; + if (externalZookeeperConnect != null) { + withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect); + } else { + addExposedPort(ZOOKEEPER_PORT); + withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT); + command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n"; + command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; + command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; + command += "zookeeper-server-start zookeeper.properties &\n"; + } + + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + // Run the original command + command += "/etc/confluent/docker/run \n"; + withCommand("sh", "-c", command); + } + + @Override + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo); + ExecResult result = execInContainer( + "kafka-configs", + "--alter", + "--bootstrap-server", + brokerAdvertisedListener, + "--entity-type", + "brokers", + "--entity-name", + getEnvMap().get("KAFKA_BROKER_ID"), + "--add-config", + "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]" + ); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } + + protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); + } + + public String getLinuxLocalIp() { + String ip = ""; + try { + Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces(); + while (networkInterfaces.hasMoreElements()) { + NetworkInterface networkInterface = networkInterfaces.nextElement(); + Enumeration inetAddresses = networkInterface.getInetAddresses(); + while (inetAddresses.hasMoreElements()) { + InetAddress inetAddress = inetAddresses.nextElement(); + if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) { + ip = inetAddress.getHostAddress(); + } + } + } + } catch (SocketException ex) { + ex.printStackTrace(); + } + return ip; + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java new file mode 100644 index 00000000000..e3468e8dd34 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +/** + * This test case is used to verify that the kafka source is able to send data to the console. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +@Slf4j +public class KafkaSourceJsonToConsoleIT extends FlinkContainer { + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData() { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + } + ); + + DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType); + + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + ProducerRecord producerRecord = serializer.serializeRow(row); + producer.send(producerRecord); + } + } + + @Test + public void testKafkaSourceJsonToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_json_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java new file mode 100644 index 00000000000..f0855b4551d --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.e2e.flink.FlinkContainer; +import org.apache.seatunnel.format.text.TextSerializationSchema; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +/** + * This test case is used to verify that the kafka source is able to send data to the console. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +@Slf4j +public class KafkaSourceTextToConsoleIT extends FlinkContainer { + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData() { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + TextSerializationSchema serializationSchema = TextSerializationSchema.builder() + .seaTunnelRowType(seatunnelRowType) + .delimiter(",") + .build(); + + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + + ProducerRecord producerRecord = new ProducerRecord<>("test_topic", null, serializationSchema.serialize(row)); + producer.send(producerRecord); + } + } + + @Test + public void testKafkaSourceTextToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_text_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf new file mode 100644 index 00000000000..62a1dc96742 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + kafka.auto.offset.reset = "earliest" + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + # The default format is json, which is optional + format = json + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { +} + +sink { + Console {} + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf new file mode 100644 index 00000000000..c1b3c0d4713 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + kafka.auto.offset.reset = "earliest" + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + format = text + # The default field delimiter is "," + field_delimiter = "," + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { +} + +sink { + Console {} + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index 95d24c889e9..aea126d7f01 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -37,6 +37,7 @@ connector-mongodb-flink-e2e connector-iceberg-flink-e2e connector-influxdb-flink-e2e + connector-kafka-flink-e2e diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml new file mode 100644 index 00000000000..5f92a2599b9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml @@ -0,0 +1,53 @@ + + + + + org.apache.seatunnel + seatunnel-spark-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-kafka-spark-e2e + + + + org.apache.seatunnel + connector-spark-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-kafka + ${project.version} + test + + + org.apache.seatunnel + connector-console + ${project.version} + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java new file mode 100644 index 00000000000..431df9205e9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.kafka; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import lombok.SneakyThrows; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; + +/** + * This container wraps Confluent Kafka and Zookeeper (optionally) + */ +public class KafkaContainer extends GenericContainer { + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); + + public static final int KAFKA_PORT = 9093; + + public static final int ZOOKEEPER_PORT = 2181; + + private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + + protected String externalZookeeperConnect = null; + + public KafkaContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + + withExposedPorts(KAFKA_PORT); + + // Use two listeners with different names, it will force Kafka to communicate with itself via internal + // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092"); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + + withEnv("KAFKA_BROKER_ID", "1"); + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); + withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + } + + public KafkaContainer withEmbeddedZookeeper() { + externalZookeeperConnect = null; + return self(); + } + + public KafkaContainer withExternalZookeeper(String connectString) { + externalZookeeperConnect = connectString; + return self(); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), getMappedPort(KAFKA_PORT)); + } + + @Override + protected void configure() { + withEnv( + "KAFKA_ADVERTISED_LISTENERS", + String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost") + ); + + String command = "#!/bin/bash\n"; + if (externalZookeeperConnect != null) { + withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect); + } else { + addExposedPort(ZOOKEEPER_PORT); + withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT); + command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n"; + command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; + command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; + command += "zookeeper-server-start zookeeper.properties &\n"; + } + + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + // Run the original command + command += "/etc/confluent/docker/run \n"; + withCommand("sh", "-c", command); + } + + @Override + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo); + ExecResult result = execInContainer( + "kafka-configs", + "--alter", + "--bootstrap-server", + brokerAdvertisedListener, + "--entity-type", + "brokers", + "--entity-name", + getEnvMap().get("KAFKA_BROKER_ID"), + "--add-config", + "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]" + ); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } + + protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); + } + + public String getLinuxLocalIp() { + String ip = ""; + try { + Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces(); + while (networkInterfaces.hasMoreElements()) { + NetworkInterface networkInterface = networkInterfaces.nextElement(); + Enumeration inetAddresses = networkInterface.getInetAddresses(); + while (inetAddresses.hasMoreElements()) { + InetAddress inetAddress = inetAddresses.nextElement(); + if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) { + ip = inetAddress.getHostAddress(); + } + } + } + } catch (SocketException ex) { + ex.printStackTrace(); + } + return ip; + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java new file mode 100644 index 00000000000..8b9a425a9e2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.kafka; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +/** + * This test case is used to verify that the kafka source is able to send data to the console. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +@SuppressWarnings("checkstyle:EmptyLineSeparator") +@Slf4j +public class KafkaSourceJsonToConsoleIT extends SparkContainer { + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData() { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + } + ); + + DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType); + + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + ProducerRecord producerRecord = serializer.serializeRow(row); + producer.send(producerRecord); + } + } + + @Test + public void testKafkaSource() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_json_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java new file mode 100644 index 00000000000..6b5894d28e0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.kafka; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.e2e.spark.SparkContainer; +import org.apache.seatunnel.format.text.TextSerializationSchema; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +/** + * This test case is used to verify that the kafka source is able to send data to the console. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +@Slf4j +public class KafkaSourceTextToConsoleIT extends SparkContainer { + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData() { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + TextSerializationSchema serializationSchema = TextSerializationSchema.builder() + .seaTunnelRowType(seatunnelRowType) + .delimiter(",") + .build(); + + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + + ProducerRecord producerRecord = new ProducerRecord<>("test_topic", null, serializationSchema.serialize(row)); + producer.send(producerRecord); + } + } + + @Test + public void testKafkaSourceTextToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_text_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf new file mode 100644 index 00000000000..91c92023722 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + kafka.auto.offset.reset = "earliest" + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { +} + +sink { + Console {} + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf new file mode 100644 index 00000000000..369f34a2f22 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + kafka.auto.offset.reset = "earliest" + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + format = text + # The default field delimiter is "," + field_delimiter = "," + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { +} + +sink { + Console {} + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml index 64fc9205633..885905feb84 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml @@ -34,6 +34,7 @@ connector-iotdb-spark-e2e connector-jdbc-spark-e2e connector-mongodb-spark-e2e + connector-kafka-spark-e2e connector-influxdb-spark-e2e