From c5251de06cd4735e6baf3055909d4c4f211b5518 Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Wed, 21 Sep 2022 17:07:24 +0800 Subject: [PATCH] [Improve][e2e] Unified e2e IT for Redis --- .../connector-assert-e2e/pom.xml | 2 +- .../assertion/fakesource_to_assert.conf | 3 - .../connector-redis-e2e/pom.xml | 37 ++++ .../e2e/connector/redis/RedisIT.java | 182 ++++++++++++++++++ .../src/test/resources/log4j.properties | 22 +++ .../src/test/resources/redis-to-redis.conf | 51 +++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 7 files changed, 294 insertions(+), 4 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml index a4147738875..1dfa220b126 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml @@ -17,8 +17,8 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - seatunnel-connector-v2-e2e org.apache.seatunnel + seatunnel-connector-v2-e2e ${revision} 4.0.0 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf index 9b628ba46ba..33ed73bc531 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf @@ -14,9 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### env { execution.parallelism = 1 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml new file mode 100644 index 00000000000..04af9f8b894 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml @@ -0,0 +1,37 @@ + + + + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-redis-e2e + + + + + org.apache.seatunnel + connector-redis + ${project.version} + test + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java new file mode 100644 index 00000000000..8b97543a42d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.redis; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.format.json.JsonSerializationSchema; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import redis.clients.jedis.Jedis; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import scala.Tuple2; + +@Slf4j +public class RedisIT extends TestSuiteBase implements TestResource { + private static final String IMAGE = "redis:latest"; + private static final String HOST = "redis-e2e"; + private static final int PORT = 6379; + private static final String PASSWORD = "SeaTunnel"; + + private static final Tuple2> TEST_DATASET = generateTestDataSet(); + + private GenericContainer redisContainer; + + private Jedis jedis; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.redisContainer = new GenericContainer<>(DockerImageName.parse(IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(log)) + .withCommand(String.format("redis-server --requirepass %s", PASSWORD)) + .waitingFor(new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + Startables.deepStart(Stream.of(redisContainer)).join(); + log.info("Redis container started"); + this.initJedis(); + this.initSourceData(); + } + + private void initSourceData() { + JsonSerializationSchema jsonSerializationSchema = new JsonSerializationSchema(TEST_DATASET._1()); + List rows = TEST_DATASET._2(); + for (int i = 0; i < rows.size(); i++) { + jedis.set("key_test" + i, new String(jsonSerializationSchema.serialize(rows.get(i)))); + } + } + + private static Tuple2> generateTestDataSet() { + SeaTunnelRowType rowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType<>(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + } + ); + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + rows.add(row); + } + return Tuple2.apply(rowType, rows); + } + + private void initJedis() { + Jedis jedis = new Jedis(redisContainer.getHost(), redisContainer.getFirstMappedPort()); + jedis.auth(PASSWORD); + this.jedis = jedis; + } + + @AfterAll + @Override + public void tearDown() throws Exception { + jedis.close(); + redisContainer.close(); + } + + @TestTemplate + public void testRedis(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/redis-to-redis.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(100, jedis.llen("key_list")); + // Clear data to prevent data duplication in the next TestContainer + jedis.del("key_list"); + Assertions.assertEquals(0, jedis.llen("key_list")); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf new file mode 100644 index 00000000000..1c15644f197 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "SeaTunnel" + keys = "key_test*" + data_type = key + } +} + +transform { +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "SeaTunnel" + key = "key_list" + data_type = list + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 0c42e9ba863..8066bb049b7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -26,6 +26,7 @@ connector-assert-e2e connector-jdbc-it + connector-redis-e2e seatunnel-connector-v2-e2e