From 87c53d39e4c966acc8ae035d7130a9ecbb3c934a Mon Sep 17 00:00:00 2001 From: 15203203634 <15203203634@163.com> Date: Fri, 7 Oct 2022 07:09:39 +0800 Subject: [PATCH] [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema --- docs/en/connector-v2/source/Kafka.md | 90 +++++++++ docs/zh-CN/connector-v2/source/kafka.md | 89 +++++++++ .../seatunnel/kafka/config/KafkaConfig.java | 69 +++++++ .../connector-kafka-flink-e2e/pom.xml | 59 ++++++ .../e2e/flink/v2/kafka/KafkaContainer.java | 125 ++++++++++++ .../v2/kafka/KafkaSourceToConsoleIT.java | 180 ++++++++++++++++++ .../kafka/kafkasource_to_console.conf | 85 +++++++++ .../src/test/resources/log4j.properties | 22 +++ .../connector-kafka-spark-e2e/pom.xml | 58 ++++++ .../e2e/spark/v2/kafka/KafkaContainer.java | 125 ++++++++++++ .../v2/kafka/KafkaSourceToConsoleIT.java | 179 +++++++++++++++++ .../kafka/kafkasource_to_console.conf | 86 +++++++++ .../src/test/resources/log4j.properties | 22 +++ 13 files changed, 1189 insertions(+) create mode 100644 docs/en/connector-v2/source/Kafka.md create mode 100644 docs/zh-CN/connector-v2/source/kafka.md create mode 100644 seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md new file mode 100644 index 00000000000..c7fae4f10bd --- /dev/null +++ b/docs/en/connector-v2/source/Kafka.md @@ -0,0 +1,90 @@ +# Apache Kafka + +> Apache Kafka source connector + +## Description + +Source connector for Apache Kafka. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +| --- | --- | --- | --- | +| topic | String | Yes | - | +| pattern | Boolean | No | - | +| bootstrap.servers | String | Yes | - | +| consumer.group | String | No | SeaTunnel-Consumer-Group | +| commit_on_checkpoint | Boolean | No | - | +| schema | Config | No | content | +| format | String | No | json | +| result_table_name | String | No | - | + + +### topic [String] + +Kafka topic name, If there are multiple topics, use , to split, for example: "tpc1,tpc2", If Pattern is set to True, +Support regular matching topic, for example: `tpc.*`; + +### pattern [Boolean] + +Whether to enable the regular matching topic, use java pattern match topic, Set to `true` to start the regular matching topic; + +### bootstrap.servers [String] + +The server address of kafka cluster, for example: `hadoop101:9092,hadoop102:9092,hadoop103:9092`; + +### consumer.group [String] + +Kafka consumer group. The default value is `SeaTunnel-Consumer-Group`; + +### commit_on_checkpoint [Boolean] + +If `true` the consumer's offset will be periodically committed in the background; + +### schema [Config] + +User - defined data type, refer to the article: Schema ; + +### format [String] + +Data format, By default, data of the JSON type is read. Other Settings will be treated as strings, for example `json`; + +### kafka. [String] + +Used to set up Kafka's configuration, for example: `kafka.max.poll.records = 500`, You can configure multiple, Will be added to the consumer's configuration; +For details, see Configuration of KafkaConsumer; + +### result_table_name [String] + +The table name that is converted after reading and used in the transformed SQL query; + + +## Example + +```kafka { +source { + Kafka { + result_table_name = "kafka_name" + schema = { + fields { + name = "string" + age = "int" + } + } + format = json + topic = "topic_1,topic_2,topic_3" + bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092" + kafka.max.poll.records = 500 + kafka.client.id = client_1 + } +} +``` \ No newline at end of file diff --git a/docs/zh-CN/connector-v2/source/kafka.md b/docs/zh-CN/connector-v2/source/kafka.md new file mode 100644 index 00000000000..161bf72aae0 --- /dev/null +++ b/docs/zh-CN/connector-v2/source/kafka.md @@ -0,0 +1,89 @@ +# Apache Kafka + +> Apache Kafka 源连接器 + +## 描述 + +Apache Kafka 的源连接器。 + +## 主要特性 + +- [x] [批](../../concept/connector-v2-features.md) +- [x] [流](../../concept/connector-v2-features.md) +- [x] [精准一次](../../concept/connector-v2-features.md) +- [x] [模式投影](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [ ] [支持用户自定义切分](../../concept/connector-v2-features.md) + +## 选项 + +| 名字 | 类型 | 是否必须 | 默认值 | +| --- | --- | --- | --- | +| topic | String | Yes | - | +| pattern | Boolean | No | - | +| bootstrap.servers | String | Yes | - | +| consumer.group | String | No | SeaTunnel-Consumer-Group | +| commit_on_checkpoint | Boolean | No | - | +| schema | Config | No | content | +| format | String | No | json | +| result_table_name | String | No | - | + + +### topic [String] + +Kafka topic 名称,如果有多个 topic,使用`,`来分割,例如:`tpc1,tpc2`,如果Pattern设置为`true`, +支持常规匹配主题,例如: `tpc.*`; + +### pattern [Boolean] + +是否启用常规匹配主题,使用java模式匹配 topic,设置为`true`启动常规匹配主题; + +### bootstrap.servers [String] + +kafka 集群的服务器地址,例如 : `hadoop101:9092,hadoop102:9092,hadoop103:9092`; + +### consumer.group [String] + +Kafka 消费者组,默认值是 `SeaTunnel-Consumer-Group`; + +### commit_on_checkpoint [Boolean] + +设置为`true`,消费者的偏移量将在后台定期提交; + +### schema [Config] + +用户定义的数据类型,参见文章:Schema; + +### format [String] + +数据格式,缺省情况下,读取`json`类型的数据。其他设置将被视为字符串,例如`json`; + +### kafka. [String] + +用于设置 Kafka 的配置,例如:`Kafka .max.poll.records = 500`,可以配置多个,依次添加到消费者的配置中; +详情请参见KafkaConsumer的配置; + +### result_table_name [String] + +读取后转换并在转换后的SQL查询中使用的表名; + +## 比如 + +```kafka { +source { + Kafka { + result_table_name = "kafka_name" + schema = { + fields { + name = "string" + age = "int" + } + } + format = json + topic = "topic_1,topic_2,topic_3" + bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092" + kafka.max.poll.records = 500 + kafka.client.id = client_1 + } +} +``` \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java new file mode 100644 index 00000000000..059e3078ec7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaConfig.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.config; + +public class KafkaConfig { + /** + * The topic of kafka. + */ + public static final String TOPIC = "topic"; + + /** + * The topic of kafka is java pattern or list. + */ + public static final String PATTERN = "pattern"; + + /** + * The server address of kafka cluster. + */ + public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + + public static final String KAFKA_CONFIG_PREFIX = "kafka."; + + /** + * consumer group of kafka client consume message. + */ + public static final String CONSUMER_GROUP = "consumer.group"; + + + /** + * consumer group of kafka client consume message. + */ + public static final String COMMIT_ON_CHECKPOINT = "commit_on_checkpoint"; + + /** + * The prefix of kafka's transactionId, make sure different job use different prefix. + */ + public static final String TRANSACTION_PREFIX = "transaction_prefix"; + + /** + * User-defined schema + */ + public static final String SCHEMA = "schema"; + + /** + * data format + */ + public static final String FORMAT = "format"; + + /** + * The default data format is JSON + */ + public static final String DEFAULT_FORMAT = "json"; + +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml new file mode 100644 index 00000000000..2eccfa9c567 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml @@ -0,0 +1,59 @@ + + + + + org.apache.seatunnel + seatunnel-flink-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-kafka-flink-e2e + + + + org.apache.seatunnel + connector-flink-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-kafka + ${project.version} + test + + + org.apache.seatunnel + connector-console + ${project.version} + test + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java new file mode 100644 index 00000000000..f35ae73a592 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import lombok.SneakyThrows; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * This container wraps Confluent Kafka and Zookeeper (optionally) + */ +public class KafkaContainer extends GenericContainer { + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); + + public static final int KAFKA_PORT = 9093; + + public static final int ZOOKEEPER_PORT = 2181; + + private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + + protected String externalZookeeperConnect = null; + + public KafkaContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + + withExposedPorts(KAFKA_PORT); + + // Use two listeners with different names, it will force Kafka to communicate with itself via internal + // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092"); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + + withEnv("KAFKA_BROKER_ID", "1"); + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); + withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + } + + public KafkaContainer withEmbeddedZookeeper() { + externalZookeeperConnect = null; + return self(); + } + + public KafkaContainer withExternalZookeeper(String connectString) { + externalZookeeperConnect = connectString; + return self(); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(KAFKA_PORT)); + } + + @Override + protected void configure() { + withEnv( + "KAFKA_ADVERTISED_LISTENERS", + String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost") + ); + + String command = "#!/bin/bash\n"; + if (externalZookeeperConnect != null) { + withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect); + } else { + addExposedPort(ZOOKEEPER_PORT); + withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT); + command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n"; + command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; + command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; + command += "zookeeper-server-start zookeeper.properties &\n"; + } + + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + // Run the original command + command += "/etc/confluent/docker/run \n"; + withCommand("sh", "-c", command); + } + + @Override + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo); + ExecResult result = execInContainer( + "kafka-configs", + "--alter", + "--bootstrap-server", + brokerAdvertisedListener, + "--entity-type", + "brokers", + "--entity-name", + getEnvMap().get("KAFKA_BROKER_ID"), + "--add-config", + "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]" + ); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } + + protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceToConsoleIT.java new file mode 100644 index 00000000000..ca5c39a1a12 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceToConsoleIT.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + + +/** + * This test case is used to verify that the kafka source is able to send data to the console. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +@Slf4j +public class KafkaSourceToConsoleIT extends FlinkContainer { + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData() { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + } + ); + + DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType); + + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + ProducerRecord producerRecord = serializer.serializeRow(row); + producer.send(producerRecord); + } + } + + @Test + public void testKafkaSource() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_to_console.conf new file mode 100644 index 00000000000..d9a13ee19e2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_to_console.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + kafka.auto.offset.reset = "earliest" + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { +} + +sink { + Console {} + Assert { + rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml new file mode 100644 index 00000000000..ce014673e47 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml @@ -0,0 +1,58 @@ + + + + + org.apache.seatunnel + seatunnel-spark-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-kafka-spark-e2e + + + + org.apache.seatunnel + connector-spark-e2e-base + ${project.version} + tests + test-jar + test + + + + + org.apache.seatunnel + connector-kafka + ${project.version} + test + + + org.apache.seatunnel + connector-console + ${project.version} + test + + + com.alibaba + fastjson + 1.2.75 + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java new file mode 100644 index 00000000000..b0d9a96931c --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.kafka; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import lombok.SneakyThrows; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * This container wraps Confluent Kafka and Zookeeper (optionally) + */ +public class KafkaContainer extends GenericContainer { + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); + + public static final int KAFKA_PORT = 9093; + + public static final int ZOOKEEPER_PORT = 2181; + + private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; + + protected String externalZookeeperConnect = null; + + public KafkaContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + + withExposedPorts(KAFKA_PORT); + + // Use two listeners with different names, it will force Kafka to communicate with itself via internal + // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener + withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092"); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + + withEnv("KAFKA_BROKER_ID", "1"); + withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF); + withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + ""); + withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); + } + + public KafkaContainer withEmbeddedZookeeper() { + externalZookeeperConnect = null; + return self(); + } + + public KafkaContainer withExternalZookeeper(String connectString) { + externalZookeeperConnect = connectString; + return self(); + } + + public String getBootstrapServers() { + return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(KAFKA_PORT)); + } + + @Override + protected void configure() { + withEnv( + "KAFKA_ADVERTISED_LISTENERS", + String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost") + ); + + String command = "#!/bin/bash\n"; + if (externalZookeeperConnect != null) { + withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect); + } else { + addExposedPort(ZOOKEEPER_PORT); + withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT); + command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n"; + command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; + command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; + command += "zookeeper-server-start zookeeper.properties &\n"; + } + + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + // Run the original command + command += "/etc/confluent/docker/run \n"; + withCommand("sh", "-c", command); + } + + @Override + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo); + ExecResult result = execInContainer( + "kafka-configs", + "--alter", + "--bootstrap-server", + brokerAdvertisedListener, + "--entity-type", + "brokers", + "--entity-name", + getEnvMap().get("KAFKA_BROKER_ID"), + "--add-config", + "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]" + ); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } + + protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceToConsoleIT.java new file mode 100644 index 00000000000..1f9567522b0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceToConsoleIT.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.kafka; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +/** + * This test case is used to verify that the kafka source is able to send data to the console. + * Make sure the SeaTunnel job can submit successfully on spark engine. + */ +@SuppressWarnings("checkstyle:EmptyLineSeparator") +@Slf4j +public class KafkaSourceToConsoleIT extends SparkContainer { + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(log)); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData() { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + } + ); + + DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType); + + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + ProducerRecord producerRecord = serializer.serializeRow(row); + producer.send(producerRecord); + } + } + + @Test + public void testKafkaSource() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_to_console.conf new file mode 100644 index 00000000000..b7d13256cb3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_to_console.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + kafka.auto.offset.reset = "earliest" + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { +} + +sink { + Console {} + Assert { + rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n