Skip to content

Commit

Permalink
[fix][starrocks] add e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Mar 28, 2024
1 parent 2bbe781 commit 6de5d25
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,16 @@
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,40 @@

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -52,6 +68,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -110,6 +127,31 @@ public class StarRocksIT extends TestSuiteBase implements TestResource {
+ "\"storage_format\" = \"DEFAULT\""
+ ")";

private static final String DDL_KAFKA_SINK_TABLE =
"create table "
+ DATABASE
+ "."
+ "kafka_table_sink"
+ " (\n"
+ " id BIGINT,\n"
+ " c_string STRING,\n"
+ " c_boolean BOOLEAN,\n"
+ " c_tinyint TINYINT,\n"
+ " c_int INT,\n"
+ " c_bigint BIGINT,\n"
+ " c_float FLOAT,\n"
+ " c_double DOUBLE,\n"
+ " c_decimal Decimal(2, 1),\n"
+ " c_date DATE\n"
+ ")ENGINE=OLAP\n"
+ "DUPLICATE KEY(`id`)\n"
+ "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\",\n"
+ "\"in_memory\" = \"false\","
+ "\"storage_format\" = \"DEFAULT\""
+ ")";

private static final String INIT_DATA_SQL =
"insert into "
+ DATABASE
Expand Down Expand Up @@ -138,6 +180,45 @@ public class StarRocksIT extends TestSuiteBase implements TestResource {
private GenericContainer<?> starRocksServer;
private static final List<SeaTunnelRow> TEST_DATASET = generateTestDataSet();

private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
new SeaTunnelRowType(
new String[] {
"id",
"c_string",
"c_boolean",
"c_tinyint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_date"
},
new SeaTunnelDataType[] {
BasicType.LONG_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.BYTE_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
new DecimalType(2, 1),
LocalTimeType.LOCAL_DATE_TYPE
});

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9";

private static final String KAFKA_HOST = "kafkaCluster";

private static final MessageFormat DEFAULT_FORMAT = MessageFormat.JSON;

private static final String DEFAULT_FIELD_DELIMITER = ",";

private KafkaProducer<byte[], byte[]> producer;

private KafkaContainer kafkaContainer;

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
Expand Down Expand Up @@ -169,6 +250,61 @@ public void startUp() throws Exception {
.untilAsserted(this::initializeJdbcConnection);
initializeJdbcTable();
batchInsertData();

kafkaContainer =
new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withNetwork(NETWORK)
.withNetworkAliases(KAFKA_HOST)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
Startables.deepStart(Stream.of(kafkaContainer)).join();
log.info("Kafka container started");
Awaitility.given()
.ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initKafkaProducer);

log.info("Write 100 records to topic test_topic_source");
DefaultSeaTunnelRowSerializer serializer =
DefaultSeaTunnelRowSerializer.create(
"test_topic_source",
SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
generateTestData(serializer::serializeRow, 0, 100);
}

private void initKafkaProducer() {
Properties props = new Properties();
String bootstrapServers = kafkaContainer.getBootstrapServers();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producer = new KafkaProducer<>(props);
}

private void generateTestData(ProducerRecordConverter converter, int start, int end) {
for (int i = start; i < end; i++) {
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
Long.valueOf(i),
"string",
Boolean.FALSE,
Byte.parseByte("1"),
Integer.parseInt("1"),
Long.parseLong("1"),
Float.parseFloat("1.1"),
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
LocalDate.of(2024, 1, 1)
});
ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row);
producer.send(producerRecord);
}
}

private static List<SeaTunnelRow> generateTestDataSet() {
Expand Down Expand Up @@ -207,6 +343,12 @@ public void tearDown() throws Exception {
if (starRocksServer != null) {
starRocksServer.close();
}
if (producer != null) {
producer.close();
}
if (kafkaContainer != null) {
kafkaContainer.close();
}
}

@TestTemplate
Expand Down Expand Up @@ -253,6 +395,12 @@ public void testStarRocksSink(TestContainer container)
}
}

@TestTemplate
public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/kafka-to-starrocks.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

private void initializeJdbcConnection()
throws SQLException, ClassNotFoundException, MalformedURLException,
InstantiationException, IllegalAccessException {
Expand All @@ -274,7 +422,7 @@ private void initializeJdbcTable() {
// create source table
statement.execute(DDL_SOURCE);
// create sink table
// statement.execute(DDL_SINK);
statement.execute(DDL_KAFKA_SINK_TABLE);
} catch (SQLException e) {
throw new RuntimeException("Initializing table failed!", e);
}
Expand Down Expand Up @@ -378,4 +526,8 @@ public void testCatalog() {
Assertions.assertFalse(starRocksCatalog.tableExists(tablePathStarRocksSink));
starRocksCatalog.close();
}

interface ProducerRecordConverter {
ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_source"
result_table_name = "kafka_table"
start_mode = "earliest"
format_error_handle_way = skip
schema = {
fields {
id = bigint
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_date = date
}
}
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
}

transform {
}

sink {
StarRocks {
source_table_name = "kafka_table"
nodeUrls = ["starrocks_e2e:8030"]
username = root
password = ""
database = "test"
table = "kafka_table_sink"
batch_max_rows = 100
max_retries = 3
base-url="jdbc:mysql://starrocks_e2e:9030/test"
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}

0 comments on commit 6de5d25

Please sign in to comment.