diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml
index a4147738875..1dfa220b126 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/pom.xml
@@ -17,8 +17,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- seatunnel-connector-v2-e2e
org.apache.seatunnel
+ seatunnel-connector-v2-e2e
${revision}
4.0.0
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 9b628ba46ba..33ed73bc531 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
env {
execution.parallelism = 1
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
new file mode 100644
index 00000000000..04af9f8b894
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-connector-v2-e2e
+ ${revision}
+
+ 4.0.0
+
+ connector-redis-e2e
+
+
+
+
+ org.apache.seatunnel
+ connector-redis
+ ${project.version}
+ test
+
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
new file mode 100644
index 00000000000..8b97543a42d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.redis;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+@Slf4j
+public class RedisIT extends TestSuiteBase implements TestResource {
+ private static final String IMAGE = "redis:latest";
+ private static final String HOST = "redis-e2e";
+ private static final int PORT = 6379;
+ private static final String PASSWORD = "SeaTunnel";
+
+ private static final Tuple2> TEST_DATASET = generateTestDataSet();
+
+ private GenericContainer> redisContainer;
+
+ private Jedis jedis;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.redisContainer = new GenericContainer<>(DockerImageName.parse(IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withExposedPorts(PORT)
+ .withLogConsumer(new Slf4jLogConsumer(log))
+ .withCommand(String.format("redis-server --requirepass %s", PASSWORD))
+ .waitingFor(new HostPortWaitStrategy()
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ Startables.deepStart(Stream.of(redisContainer)).join();
+ log.info("Redis container started");
+ this.initJedis();
+ this.initSourceData();
+ }
+
+ private void initSourceData() {
+ JsonSerializationSchema jsonSerializationSchema = new JsonSerializationSchema(TEST_DATASET._1());
+ List rows = TEST_DATASET._2();
+ for (int i = 0; i < rows.size(); i++) {
+ jedis.set("key_test" + i, new String(jsonSerializationSchema.serialize(rows.get(i))));
+ }
+ }
+
+ private static Tuple2> generateTestDataSet() {
+ SeaTunnelRowType rowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType<>(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ }
+ );
+
+ List rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ rows.add(row);
+ }
+ return Tuple2.apply(rowType, rows);
+ }
+
+ private void initJedis() {
+ Jedis jedis = new Jedis(redisContainer.getHost(), redisContainer.getFirstMappedPort());
+ jedis.auth(PASSWORD);
+ this.jedis = jedis;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ jedis.close();
+ redisContainer.close();
+ }
+
+ @TestTemplate
+ public void testRedis(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/redis-to-redis.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, jedis.llen("key_list"));
+ // Clear data to prevent data duplication in the next TestContainer
+ jedis.del("key_list");
+ Assertions.assertEquals(0, jedis.llen("key_list"));
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..db5d9e51220
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
new file mode 100644
index 00000000000..1c15644f197
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "SeaTunnel"
+ keys = "key_test*"
+ data_type = key
+ }
+}
+
+transform {
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "SeaTunnel"
+ key = "key_list"
+ data_type = list
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 0c42e9ba863..8066bb049b7 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -26,6 +26,7 @@
connector-assert-e2e
connector-jdbc-it
+ connector-redis-e2e
seatunnel-connector-v2-e2e