Skip to content

Commit e15757b

Browse files
authored
[Feature][Redis] Flush data when the time reaches checkpoint.interval and update test case (#8308)
1 parent e18a4d0 commit e15757b

File tree

3 files changed

+159
-3
lines changed

3 files changed

+159
-3
lines changed

seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashMap;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.Optional;
4243

4344
public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
4445
implements SupportMultiTableSinkWriter<Void> {
@@ -78,8 +79,7 @@ public void write(SeaTunnelRow element) throws IOException {
7879
String value = getValue(element, fields);
7980
valueBuffer.add(value);
8081
if (keyBuffer.size() >= batchSize) {
81-
doBatchWrite();
82-
clearBuffer();
82+
flush();
8383
}
8484
}
8585

@@ -221,6 +221,16 @@ private void doBatchWrite() {
221221

222222
@Override
223223
public void close() throws IOException {
224+
flush();
225+
}
226+
227+
@Override
228+
public Optional<Void> prepareCommit() {
229+
flush();
230+
return Optional.empty();
231+
}
232+
233+
private synchronized void flush() {
224234
if (!keyBuffer.isEmpty()) {
225235
doBatchWrite();
226236
clearBuffer();

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.seatunnel.e2e.connector.redis;
1818

19+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
20+
1921
import org.apache.seatunnel.api.table.type.ArrayType;
2022
import org.apache.seatunnel.api.table.type.BasicType;
2123
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -25,15 +27,20 @@
2527
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2628
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2729
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
30+
import org.apache.seatunnel.common.utils.JsonUtils;
2831
import org.apache.seatunnel.e2e.common.TestResource;
2932
import org.apache.seatunnel.e2e.common.TestSuiteBase;
33+
import org.apache.seatunnel.e2e.common.container.EngineType;
3034
import org.apache.seatunnel.e2e.common.container.TestContainer;
35+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
3136
import org.apache.seatunnel.format.json.JsonSerializationSchema;
3237

3338
import org.junit.jupiter.api.AfterAll;
3439
import org.junit.jupiter.api.Assertions;
3540
import org.junit.jupiter.api.BeforeAll;
3641
import org.junit.jupiter.api.TestTemplate;
42+
import org.junit.jupiter.api.condition.DisabledOnOs;
43+
import org.junit.jupiter.api.condition.OS;
3744
import org.testcontainers.containers.Container;
3845
import org.testcontainers.containers.GenericContainer;
3946
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -52,13 +59,21 @@
5259
import java.time.LocalDate;
5360
import java.time.LocalDateTime;
5461
import java.util.ArrayList;
62+
import java.util.Arrays;
5563
import java.util.Collections;
5664
import java.util.HashMap;
5765
import java.util.List;
5866
import java.util.Map;
5967
import java.util.Objects;
68+
import java.util.concurrent.CompletableFuture;
69+
import java.util.concurrent.TimeUnit;
70+
import java.util.regex.Matcher;
71+
import java.util.regex.Pattern;
72+
import java.util.stream.Collectors;
6073
import java.util.stream.Stream;
6174

75+
import static org.awaitility.Awaitility.await;
76+
6277
@Slf4j
6378
public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements TestResource {
6479

@@ -492,7 +507,7 @@ public void testFakeToRedisDeleteSetTest(TestContainer container)
492507
}
493508

494509
@TestTemplate
495-
public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
510+
public void testFakeToToRedisDeleteZSetTest(TestContainer container)
496511
throws IOException, InterruptedException {
497512
Container.ExecResult execResult =
498513
container.executeJob("/fake-to-redis-test-delete-zset.conf");
@@ -501,6 +516,69 @@ public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
501516
jedis.del("zset_check");
502517
}
503518

519+
@TestTemplate
520+
@DisabledOnContainer(
521+
value = {},
522+
type = {EngineType.SPARK, EngineType.FLINK},
523+
disabledReason = "Only support for seatunnel")
524+
@DisabledOnOs(OS.WINDOWS)
525+
public void testFakeToRedisInRealTimeTest(TestContainer container)
526+
throws IOException, InterruptedException {
527+
CompletableFuture.supplyAsync(
528+
() -> {
529+
try {
530+
container.executeJob("/fake-to-redis-test-in-real-time.conf");
531+
} catch (Exception e) {
532+
log.error("Commit task exception :" + e.getMessage());
533+
throw new RuntimeException(e);
534+
}
535+
return null;
536+
});
537+
await().atMost(60000, TimeUnit.MILLISECONDS)
538+
.untilAsserted(
539+
() -> {
540+
Assertions.assertEquals(3, jedis.llen("list_check"));
541+
});
542+
jedis.del("list_check");
543+
// Get the task id
544+
Container.ExecResult execResult = container.executeBaseCommand(new String[] {"-l"});
545+
String regex = "(\\d+)\\s+";
546+
Pattern pattern = Pattern.compile(regex);
547+
List<String> runningJobId =
548+
Arrays.stream(execResult.getStdout().toString().split("\n"))
549+
.filter(s -> s.contains("fake-to-redis-test-in-real-time"))
550+
.map(
551+
s -> {
552+
Matcher matcher = pattern.matcher(s);
553+
return matcher.find() ? matcher.group(1) : null;
554+
})
555+
.filter(jobId -> jobId != null)
556+
.collect(Collectors.toList());
557+
Assertions.assertEquals(1, runningJobId.size());
558+
// Verify that the status is Running
559+
for (String jobId : runningJobId) {
560+
Container.ExecResult execResult1 =
561+
container.executeBaseCommand(new String[] {"-j", jobId});
562+
String stdout = execResult1.getStdout();
563+
ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
564+
Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "RUNNING");
565+
}
566+
// Execute cancellation task
567+
String[] batchCancelCommand =
568+
Stream.concat(Arrays.stream(new String[] {"-can"}), runningJobId.stream())
569+
.toArray(String[]::new);
570+
Assertions.assertEquals(0, container.executeBaseCommand(batchCancelCommand).getExitCode());
571+
572+
// Verify whether the cancellation is successful
573+
for (String jobId : runningJobId) {
574+
Container.ExecResult execResult1 =
575+
container.executeBaseCommand(new String[] {"-j", jobId});
576+
String stdout = execResult1.getStdout();
577+
ObjectNode jsonNodes = JsonUtils.parseObject(stdout);
578+
Assertions.assertEquals(jsonNodes.get("jobStatus").asText(), "CANCELED");
579+
}
580+
}
581+
504582
@TestTemplate
505583
public void testFakeToRedisNormalKeyIsNullTest(TestContainer container)
506584
throws IOException, InterruptedException {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "STREAMING"
21+
checkpoint.interval = 10000
22+
shade.identifier = "base64"
23+
}
24+
25+
source {
26+
FakeSource {
27+
schema = {
28+
fields {
29+
id = int
30+
val_bool = boolean
31+
val_int8 = tinyint
32+
val_int16 = smallint
33+
val_int32 = int
34+
val_int64 = bigint
35+
val_float = float
36+
val_double = double
37+
val_decimal = "decimal(16, 1)"
38+
val_string = string
39+
val_unixtime_micros = timestamp
40+
}
41+
}
42+
rows = [
43+
{
44+
kind = INSERT
45+
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
46+
},
47+
{
48+
kind = INSERT
49+
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
50+
},
51+
{
52+
kind = INSERT
53+
fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
54+
}
55+
]
56+
}
57+
}
58+
59+
sink {
60+
Redis {
61+
host = "redis-e2e"
62+
port = 6379
63+
auth = "U2VhVHVubmVs"
64+
key = "list_check"
65+
data_type = list
66+
batch_size = 33
67+
}
68+
}

0 commit comments

Comments
 (0)