From b29afddec1b9f21309cf8c896d790bb2371bf15f Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Wed, 28 Sep 2022 10:33:19 +0800 Subject: [PATCH 1/3] [Improve][e2e] Remove the un-unified e2e module of Redis --- .../connector-redis-flink-e2e/pom.xml | 53 ----------- .../seatunnel/e2e/flink/v2/redis/RedisIT.java | 92 ------------------- .../src/test/resources/log4j.properties | 22 ----- .../redis/redis_source_and_sink.conf | 68 -------------- .../seatunnel-flink-connector-v2-e2e/pom.xml | 1 - .../connector-redis-spark-e2e/pom.xml | 53 ----------- .../seatunnel/e2e/spark/v2/redis/RedisIT.java | 92 ------------------- .../src/test/resources/log4j.properties | 22 ----- .../redis/redis_source_and_sink.conf | 71 -------------- .../seatunnel-spark-connector-v2-e2e/pom.xml | 1 - 10 files changed, 475 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/pom.xml delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/log4j.properties delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/redis/redis_source_and_sink.conf delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/pom.xml delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/log4j.properties delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/redis/redis_source_and_sink.conf diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/pom.xml deleted file mode 100644 index d92a1b3b55b..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/pom.xml +++ /dev/null @@ -1,53 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel-flink-connector-v2-e2e - ${revision} - - 4.0.0 - - connector-redis-flink-e2e - - - - org.apache.seatunnel - connector-flink-e2e-base - ${project.version} - tests - test-jar - test - - - - - org.apache.seatunnel - connector-redis - ${project.version} - test - - - org.apache.seatunnel - connector-assert - ${project.version} - test - - - - \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java deleted file mode 100644 index 93d4361388f..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.flink.v2.redis; - -import static org.awaitility.Awaitility.given; - -import org.apache.seatunnel.e2e.flink.FlinkContainer; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import redis.clients.jedis.Jedis; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class RedisIT extends FlinkContainer { - private static final String REDIS_IMAGE = "redis:latest"; - private static final String REDIS_CONTAINER_HOST = "flink_e2e_redis"; - private static final String REDIS_HOST = "localhost"; - private static final int REDIS_PORT = 6379; - private GenericContainer redisContainer; - private Jedis jedis; - - @BeforeEach - public void startRedisContainer() { - redisContainer = new GenericContainer<>(REDIS_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(REDIS_CONTAINER_HOST) - .withLogConsumer(new Slf4jLogConsumer(log)); - redisContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", REDIS_PORT, REDIS_PORT))); - Startables.deepStart(Stream.of(redisContainer)).join(); - log.info("Redis container started"); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initJedis); - this.generateTestData(); - } - - private void initJedis() { - jedis = new Jedis(REDIS_HOST, REDIS_PORT); - jedis.connect(); - } - - private void generateTestData() { - jedis.set("key_test", "test"); - jedis.set("key_test1", "test1"); - jedis.set("key_test2", "test2"); - jedis.set("key_test3", "test3"); - jedis.set("key_test4", "test4"); - } - - @Test - public void testRedisSourceAndSink() throws IOException, InterruptedException { - Container.ExecResult execResult = executeSeaTunnelFlinkJob("/redis/redis_source_and_sink.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertEquals(5, jedis.llen("key_list")); - } - - @AfterEach - public void close() { - jedis.close(); - redisContainer.close(); - } -} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/log4j.properties deleted file mode 100644 index db5d9e51220..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/redis/redis_source_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/redis/redis_source_and_sink.conf deleted file mode 100644 index a7deb67b2ac..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/resources/redis/redis_source_and_sink.conf +++ /dev/null @@ -1,68 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - # You can set flink configuration here - execution.parallelism = 1 - job.mode = "BATCH" - #execution.checkpoint.interval = 10000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - Redis { - host = flink_e2e_redis - port = 6379 - keys = "key_test*" - data_type = key - format = text - } - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Redis -} - -transform { - -} - -sink { - - Redis { - host = flink_e2e_redis - port = 6379 - key = "key_list" - data_type = list - } - - Assert { - rules = [ - { - field_name = content - field_type = string - field_value = [ - { - rule_type = NOT_NULL - } - ] - } - ] - } -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index 9f9f264ae6c..b492ce56c10 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -28,7 +28,6 @@ connector-flink-e2e-base - connector-redis-flink-e2e connector-file-flink-e2e connector-jdbc-flink-e2e connector-iotdb-flink-e2e diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/pom.xml deleted file mode 100644 index 25212814ad8..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/pom.xml +++ /dev/null @@ -1,53 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel-spark-connector-v2-e2e - ${revision} - - 4.0.0 - - connector-redis-spark-e2e - - - - org.apache.seatunnel - connector-spark-e2e-base - ${project.version} - tests - test-jar - test - - - - - org.apache.seatunnel - connector-assert - ${project.version} - test - - - org.apache.seatunnel - connector-redis - ${project.version} - test - - - - \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java deleted file mode 100644 index 065c4e680f2..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.spark.v2.redis; - -import static org.awaitility.Awaitility.given; - -import org.apache.seatunnel.e2e.spark.SparkContainer; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import redis.clients.jedis.Jedis; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class RedisIT extends SparkContainer { - private static final String REDIS_IMAGE = "redis:latest"; - private static final String REDIS_CONTAINER_HOST = "spark_e2e_redis"; - private static final String REDIS_HOST = "localhost"; - private static final int REDIS_PORT = 6379; - private GenericContainer redisContainer; - private Jedis jedis; - - @BeforeEach - public void startRedisContainer() { - redisContainer = new GenericContainer<>(REDIS_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(REDIS_CONTAINER_HOST) - .withLogConsumer(new Slf4jLogConsumer(log)); - redisContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", REDIS_PORT, REDIS_PORT))); - Startables.deepStart(Stream.of(redisContainer)).join(); - log.info("Redis container started"); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initJedis); - this.generateTestData(); - } - - private void initJedis() { - jedis = new Jedis(REDIS_HOST, REDIS_PORT); - jedis.connect(); - } - - private void generateTestData() { - jedis.set("key_test", "test"); - jedis.set("key_test1", "test1"); - jedis.set("key_test2", "test2"); - jedis.set("key_test3", "test3"); - jedis.set("key_test4", "test4"); - } - - @Test - public void testRedisSourceAndSink() throws IOException, InterruptedException { - Container.ExecResult execResult = executeSeaTunnelSparkJob("/redis/redis_source_and_sink.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - Assertions.assertEquals(5, jedis.llen("key_list")); - } - - @AfterEach - public void close() { - jedis.close(); - redisContainer.close(); - } -} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/log4j.properties deleted file mode 100644 index db5d9e51220..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/redis/redis_source_and_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/redis/redis_source_and_sink.conf deleted file mode 100644 index 051e2f61d4d..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/resources/redis/redis_source_and_sink.conf +++ /dev/null @@ -1,71 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -env { - # You can set spark configuration here - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local - job.mode = "BATCH" -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - Redis { - host = spark_e2e_redis - port = 6379 - keys = "key_test*" - data_type = key - format = text - } - - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Redis -} - -transform { - -} - -sink { - - Redis { - host = spark_e2e_redis - port = 6379 - key = "key_list" - data_type = list - } - - Assert { - rules = [ - { - field_name = content - field_type = string - field_value = [ - { - rule_type = NOT_NULL - } - ] - } - ] - } - - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Console -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml index cc5cc2bafa3..08bfa545fc8 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml @@ -33,7 +33,6 @@ connector-file-spark-e2e connector-iotdb-spark-e2e connector-jdbc-spark-e2e - connector-redis-spark-e2e connector-mongodb-spark-e2e From c5251de06cd4735e6baf3055909d4c4f211b5518 Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Wed, 21 Sep 2022 17:07:24 +0800 Subject: [PATCH 2/3] [Improve][e2e] Unified e2e IT for Redis --- .../connector-assert-e2e/pom.xml | 2 +- .../assertion/fakesource_to_assert.conf | 3 - .../connector-redis-e2e/pom.xml | 37 ++++ .../e2e/connector/redis/RedisIT.java | 182 ++++++++++++++++++ .../src/test/resources/log4j.properties | 22 +++ .../src/test/resources/redis-to-redis.conf | 51 +++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 7 files changed, 294 insertions(+), 4 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml index a4147738875..1dfa220b126 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml @@ -17,8 +17,8 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - seatunnel-connector-v2-e2e org.apache.seatunnel + seatunnel-connector-v2-e2e ${revision} 4.0.0 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf index 9b628ba46ba..33ed73bc531 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf @@ -14,9 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### env { execution.parallelism = 1 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml new file mode 100644 index 00000000000..04af9f8b894 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml @@ -0,0 +1,37 @@ + + + + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-redis-e2e + + + + + org.apache.seatunnel + connector-redis + ${project.version} + test + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java new file mode 100644 index 00000000000..8b97543a42d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.redis; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.format.json.JsonSerializationSchema; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import redis.clients.jedis.Jedis; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import scala.Tuple2; + +@Slf4j +public class RedisIT extends TestSuiteBase implements TestResource { + private static final String IMAGE = "redis:latest"; + private static final String HOST = "redis-e2e"; + private static final int PORT = 6379; + private static final String PASSWORD = "SeaTunnel"; + + private static final Tuple2> TEST_DATASET = generateTestDataSet(); + + private GenericContainer redisContainer; + + private Jedis jedis; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.redisContainer = new GenericContainer<>(DockerImageName.parse(IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(log)) + .withCommand(String.format("redis-server --requirepass %s", PASSWORD)) + .waitingFor(new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + Startables.deepStart(Stream.of(redisContainer)).join(); + log.info("Redis container started"); + this.initJedis(); + this.initSourceData(); + } + + private void initSourceData() { + JsonSerializationSchema jsonSerializationSchema = new JsonSerializationSchema(TEST_DATASET._1()); + List rows = TEST_DATASET._2(); + for (int i = 0; i < rows.size(); i++) { + jedis.set("key_test" + i, new String(jsonSerializationSchema.serialize(rows.get(i)))); + } + } + + private static Tuple2> generateTestDataSet() { + SeaTunnelRowType rowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType<>(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + } + ); + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + rows.add(row); + } + return Tuple2.apply(rowType, rows); + } + + private void initJedis() { + Jedis jedis = new Jedis(redisContainer.getHost(), redisContainer.getFirstMappedPort()); + jedis.auth(PASSWORD); + this.jedis = jedis; + } + + @AfterAll + @Override + public void tearDown() throws Exception { + jedis.close(); + redisContainer.close(); + } + + @TestTemplate + public void testRedis(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/redis-to-redis.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(100, jedis.llen("key_list")); + // Clear data to prevent data duplication in the next TestContainer + jedis.del("key_list"); + Assertions.assertEquals(0, jedis.llen("key_list")); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf new file mode 100644 index 00000000000..1c15644f197 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "SeaTunnel" + keys = "key_test*" + data_type = key + } +} + +transform { +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "SeaTunnel" + key = "key_list" + data_type = list + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 0c42e9ba863..8066bb049b7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -26,6 +26,7 @@ connector-assert-e2e connector-jdbc-it + connector-redis-e2e seatunnel-connector-v2-e2e From 3f614bf70ba7c51e5e3fb4b103fd36ebedb7ab17 Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Wed, 28 Sep 2022 14:31:12 +0800 Subject: [PATCH 3/3] [Improve][e2e] support @DisabledOnContainer --- .../e2e/connector/redis/RedisIT.java | 2 + .../junit/ContainerTestingExtension.java | 14 +++++- .../e2e/common/junit/DisabledOnContainer.java | 44 +++++++++++++++++++ .../TestCaseInvocationContextProvider.java | 16 ++++++- 4 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/DisabledOnContainer.java diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 8b97543a42d..86e853ccd8f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.format.json.JsonSerializationSchema; import lombok.extern.slf4j.Slf4j; @@ -56,6 +57,7 @@ import scala.Tuple2; +@DisabledOnContainer(value = "spark:2.4.3", disabledReason = "json-format conflicts with the Jackson version of Spark-2.4.3, see:https://github.com/apache/incubator-seatunnel/issues/2929") @Slf4j public class RedisIT extends TestSuiteBase implements TestResource { private static final String IMAGE = "redis:latest"; diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/ContainerTestingExtension.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/ContainerTestingExtension.java index 0dd9955ddd6..b2175c20e57 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/ContainerTestingExtension.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/ContainerTestingExtension.java @@ -25,10 +25,14 @@ import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.platform.commons.support.AnnotationSupport; +import org.junit.platform.commons.util.AnnotationUtils; import java.lang.annotation.Annotation; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class ContainerTestingExtension implements BeforeAllCallback, AfterAllCallback { public static final ExtensionContext.Namespace TEST_RESOURCE_NAMESPACE = @@ -57,7 +61,15 @@ public void beforeAll(ExtensionContext context) throws Exception { TestContainersFactory.class); checkExactlyOneAnnotatedField(containersFactories, TestContainers.class); - List testContainers = containersFactories.get(0).create(); + + // Filters disabled containers + final List disabledContainers = new ArrayList<>(); + AnnotationUtils.findAnnotation(context.getRequiredTestInstance().getClass(), DisabledOnContainer.class) + .ifPresent(annotation -> Collections.addAll(disabledContainers, annotation.value())); + List testContainers = containersFactories.get(0).create() + .stream() + .filter(container -> !disabledContainers.contains(container.identifier())) + .collect(Collectors.toList()); context.getStore(TEST_RESOURCE_NAMESPACE) .put(TEST_CONTAINERS_STORE_KEY, testContainers); } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/DisabledOnContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/DisabledOnContainer.java new file mode 100644 index 00000000000..d908d5e2738 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/DisabledOnContainer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.common.junit; + +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RetentionPolicy.RUNTIME) +public @interface DisabledOnContainer { + + /** + * {@link TestContainer#identifier()} + */ + String[] value(); + + /** + * Custom reason to provide if the test container is disabled. + * + *

If a custom reason is supplied, it will be combined with the default + * reason for this annotation. If a custom reason is not supplied, the default + * reason will be used. + */ + String disabledReason() default ""; +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/TestCaseInvocationContextProvider.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/TestCaseInvocationContextProvider.java index 37a1be45fa8..f73f5af7f07 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/TestCaseInvocationContextProvider.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/junit/TestCaseInvocationContextProvider.java @@ -34,9 +34,13 @@ import org.junit.jupiter.api.extension.ParameterResolver; import org.junit.jupiter.api.extension.TestTemplateInvocationContext; import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider; +import org.junit.platform.commons.util.AnnotationUtils; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j @@ -53,8 +57,16 @@ public boolean supportsTestTemplate(ExtensionContext context) { @SuppressWarnings("unchecked") @Override public Stream provideTestTemplateInvocationContexts(ExtensionContext context) { - List testContainers = (List) context.getStore(TEST_RESOURCE_NAMESPACE) - .get(TEST_CONTAINERS_STORE_KEY); + final List disabledContainers = new ArrayList<>(); + AnnotationUtils.findAnnotation(context.getRequiredTestMethod(), DisabledOnContainer.class) + .ifPresent(annotation -> Collections.addAll(disabledContainers, annotation.value())); + + // Filters disabled containers + List testContainers = ((List) context.getStore(TEST_RESOURCE_NAMESPACE) + .get(TEST_CONTAINERS_STORE_KEY)) + .stream() + .filter(container -> !disabledContainers.contains(container.identifier())) + .collect(Collectors.toList()); ContainerExtendedFactory containerExtendedFactory = (ContainerExtendedFactory) context.getStore(TEST_RESOURCE_NAMESPACE) .get(TEST_EXTENDED_FACTORY_STORE_KEY);