Skip to content

Commit 21503bd

Browse files
authored
[Fix] Fix MultiTableSink restore failed when add new table (#5746)
1 parent d8d289b commit 21503bd

File tree

12 files changed

+292
-28
lines changed

12 files changed

+292
-28
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,13 @@ public SnapshotPhaseState snapshotState(long checkpointId) {
217217
new SnapshotPhaseState(
218218
alreadyProcessedTables,
219219
remainingSplits.isEmpty()
220-
? Collections.emptyList()
220+
? new ArrayList<>()
221221
: new ArrayList<>(remainingSplits),
222222
assignedSplits,
223223
splitCompletedOffsets,
224224
assignerCompleted,
225225
remainingTables.isEmpty()
226-
? Collections.emptyList()
226+
? new ArrayList<>()
227227
: new ArrayList<>(remainingTables),
228228
isTableIdCaseSensitive,
229229
true);

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3131

3232
import java.io.IOException;
33+
import java.util.Collection;
3334
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
@@ -89,15 +90,21 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
8990
SinkIdentifier sinkIdentifier = SinkIdentifier.of(tableIdentifier, index);
9091
List<?> state =
9192
states.stream()
92-
.flatMap(
93+
.map(
9394
multiTableState ->
94-
multiTableState.getStates().get(sinkIdentifier)
95-
.stream())
95+
multiTableState.getStates().get(sinkIdentifier))
9696
.filter(Objects::nonNull)
97+
.flatMap(Collection::stream)
9798
.collect(Collectors.toList());
98-
writers.put(
99-
sinkIdentifier,
100-
sink.restoreWriter(new SinkContextProxy(index, context), state));
99+
if (state.isEmpty()) {
100+
writers.put(
101+
sinkIdentifier,
102+
sink.createWriter(new SinkContextProxy(index, context)));
103+
} else {
104+
writers.put(
105+
sinkIdentifier,
106+
sink.restoreWriter(new SinkContextProxy(index, context), state));
107+
}
101108
}
102109
}
103110
return new MultiTableSinkWriter(writers, replicaNum);

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,12 @@ public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
196196

197197
@Override
198198
public void abortPrepare() {
199-
checkQueueRemain();
200199
Throwable firstE = null;
200+
try {
201+
checkQueueRemain();
202+
} catch (Exception e) {
203+
firstE = e;
204+
}
201205
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
202206
synchronized (runnable.get(i)) {
203207
for (SinkWriter<SeaTunnelRow, ?, ?> sinkWriter :
@@ -220,9 +224,13 @@ public void abortPrepare() {
220224

221225
@Override
222226
public void close() throws IOException {
223-
checkQueueRemain();
224-
executorService.shutdownNow();
225227
Throwable firstE = null;
228+
try {
229+
checkQueueRemain();
230+
} catch (Exception e) {
231+
firstE = e;
232+
}
233+
executorService.shutdownNow();
226234
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
227235
synchronized (runnable.get(i)) {
228236
for (SinkWriter<SeaTunnelRow, ?, ?> sinkWriter :

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import lombok.extern.slf4j.Slf4j;
4141

42+
import java.io.IOException;
4243
import java.sql.Connection;
4344
import java.sql.DriverManager;
4445
import java.sql.ResultSet;
@@ -47,6 +48,8 @@
4748
import java.util.List;
4849
import java.util.concurrent.CompletableFuture;
4950
import java.util.concurrent.TimeUnit;
51+
import java.util.regex.Matcher;
52+
import java.util.regex.Pattern;
5053
import java.util.stream.Stream;
5154

5255
import static org.awaitility.Awaitility.await;
@@ -184,7 +187,8 @@ public void testMysqlCdcMultiTableE2e(TestContainer container) {
184187
CompletableFuture.supplyAsync(
185188
() -> {
186189
try {
187-
container.executeJob("/mysqlcdc_to_mysql_with_multi_table.conf");
190+
container.executeJob(
191+
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf");
188192
} catch (Exception e) {
189193
log.error("Commit task exception :" + e.getMessage());
190194
throw new RuntimeException(e);
@@ -223,6 +227,104 @@ public void testMysqlCdcMultiTableE2e(TestContainer container) {
223227
SOURCE_TABLE_2)))));
224228
}
225229

230+
@TestTemplate
231+
@DisabledOnContainer(
232+
value = {},
233+
type = {EngineType.SPARK, EngineType.FLINK},
234+
disabledReason = "Currently SPARK and FLINK do not support multi table")
235+
public void testMultiTableWithRestore(TestContainer container)
236+
throws IOException, InterruptedException {
237+
// Clear related content to ensure that multiple operations are not affected
238+
clearTable(MYSQL_DATABASE, SOURCE_TABLE_1);
239+
clearTable(MYSQL_DATABASE, SOURCE_TABLE_2);
240+
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
241+
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);
242+
243+
CompletableFuture.supplyAsync(
244+
() -> {
245+
try {
246+
return container.executeJob(
247+
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf");
248+
} catch (Exception e) {
249+
log.error("Commit task exception :" + e.getMessage());
250+
throw new RuntimeException(e);
251+
}
252+
});
253+
254+
// insert update delete
255+
upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
256+
257+
// stream stage
258+
await().atMost(60000, TimeUnit.MILLISECONDS)
259+
.untilAsserted(
260+
() ->
261+
Assertions.assertAll(
262+
() ->
263+
Assertions.assertIterableEquals(
264+
query(
265+
getSourceQuerySQL(
266+
MYSQL_DATABASE,
267+
SOURCE_TABLE_1)),
268+
query(
269+
getSourceQuerySQL(
270+
MYSQL_DATABASE2,
271+
SOURCE_TABLE_1)))));
272+
273+
Pattern jobIdPattern =
274+
Pattern.compile(
275+
".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", Pattern.DOTALL);
276+
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
277+
String jobId;
278+
if (matcher.matches()) {
279+
jobId = matcher.group(1);
280+
} else {
281+
throw new RuntimeException("Can not find jobId");
282+
}
283+
284+
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
285+
286+
// Restore job with add a new table
287+
CompletableFuture.supplyAsync(
288+
() -> {
289+
try {
290+
container.restoreJob(
291+
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf", jobId);
292+
} catch (Exception e) {
293+
log.error("Commit task exception :" + e.getMessage());
294+
throw new RuntimeException(e);
295+
}
296+
return null;
297+
});
298+
299+
upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_2);
300+
301+
// stream stage
302+
await().atMost(60000, TimeUnit.MILLISECONDS)
303+
.untilAsserted(
304+
() ->
305+
Assertions.assertAll(
306+
() ->
307+
Assertions.assertIterableEquals(
308+
query(
309+
getSourceQuerySQL(
310+
MYSQL_DATABASE,
311+
SOURCE_TABLE_1)),
312+
query(
313+
getSourceQuerySQL(
314+
MYSQL_DATABASE2,
315+
SOURCE_TABLE_1))),
316+
() ->
317+
Assertions.assertIterableEquals(
318+
query(
319+
getSourceQuerySQL(
320+
MYSQL_DATABASE,
321+
SOURCE_TABLE_2)),
322+
query(
323+
getSourceQuerySQL(
324+
MYSQL_DATABASE2,
325+
SOURCE_TABLE_2)))));
326+
}
327+
226328
private Connection getJdbcConnection() throws SQLException {
227329
return DriverManager.getConnection(
228330
MYSQL_CONTAINER.getJdbcUrl(),
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
# You can set engine configuration here
23+
execution.parallelism = 1
24+
job.mode = "STREAMING"
25+
checkpoint.interval = 5000
26+
}
27+
28+
source {
29+
MySQL-CDC {
30+
result_table_name = "customers_mysql_cdc"
31+
server-id = 5652
32+
username = "mysqluser"
33+
password = "mysqlpw"
34+
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
35+
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
36+
}
37+
}
38+
39+
transform {
40+
}
41+
42+
sink {
43+
jdbc {
44+
source_table_name = "customers_mysql_cdc"
45+
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
46+
driver = "com.mysql.cj.jdbc.Driver"
47+
user = "mysqluser"
48+
password = "mysqlpw"
49+
database = "mysql_cdc2"
50+
generate_sink_sql = true
51+
}
52+
}
File renamed without changes.

seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public AbstractTestContainer() {
6666

6767
protected abstract String getConnectorType();
6868

69+
protected abstract String getSavePointCommand();
70+
71+
protected abstract String getRestoreCommand();
72+
6973
protected abstract String getConnectorNamePrefix();
7074

7175
protected abstract List<String> getExtraStartShellCommands();
@@ -100,59 +104,94 @@ protected Container.ExecResult executeJob(GenericContainer<?> container, String
100104
getConnectorNamePrefix(),
101105
getConnectorType(),
102106
SEATUNNEL_HOME);
103-
return executeCommand(container, confInContainerPath);
107+
final List<String> command = new ArrayList<>();
108+
String binPath = Paths.get(SEATUNNEL_HOME, "bin", getStartShellName()).toString();
109+
// base command
110+
command.add(adaptPathForWin(binPath));
111+
command.add("--config");
112+
command.add(adaptPathForWin(confInContainerPath));
113+
command.addAll(getExtraStartShellCommands());
114+
return executeCommand(container, command);
104115
}
105116

106-
protected Container.ExecResult executeCommand(GenericContainer<?> container, String configPath)
117+
protected Container.ExecResult savepointJob(GenericContainer<?> container, String jobId)
107118
throws IOException, InterruptedException {
108119
final List<String> command = new ArrayList<>();
109120
String binPath = Paths.get(SEATUNNEL_HOME, "bin", getStartShellName()).toString();
110121
// base command
111122
command.add(adaptPathForWin(binPath));
123+
command.add(getSavePointCommand());
124+
command.add(jobId);
125+
command.addAll(getExtraStartShellCommands());
126+
return executeCommand(container, command);
127+
}
128+
129+
protected Container.ExecResult restoreJob(
130+
GenericContainer<?> container, String confFile, String jobId)
131+
throws IOException, InterruptedException {
132+
final String confInContainerPath = copyConfigFileToContainer(container, confFile);
133+
// copy connectors
134+
copyConnectorJarToContainer(
135+
container,
136+
confFile,
137+
getConnectorModulePath(),
138+
getConnectorNamePrefix(),
139+
getConnectorType(),
140+
SEATUNNEL_HOME);
141+
final List<String> command = new ArrayList<>();
142+
String binPath = Paths.get(SEATUNNEL_HOME, "bin", getStartShellName()).toString();
143+
// base command
144+
command.add(adaptPathForWin(binPath));
112145
command.add("--config");
113-
command.add(adaptPathForWin(configPath));
146+
command.add(adaptPathForWin(confInContainerPath));
147+
command.add(getRestoreCommand());
148+
command.add(jobId);
114149
command.addAll(getExtraStartShellCommands());
150+
return executeCommand(container, command);
151+
}
115152

153+
protected Container.ExecResult executeCommand(
154+
GenericContainer<?> container, List<String> command)
155+
throws IOException, InterruptedException {
156+
String commandStr = String.join(" ", command);
116157
LOG.info(
117-
"Execute config file: {} to Container[{}] "
158+
"Execute command in container[{}] "
118159
+ "\n==================== Shell Command start ====================\n"
119160
+ "{}"
120161
+ "\n==================== Shell Command end ====================",
121-
configPath,
122162
container.getDockerImageName(),
123-
String.join(" ", command));
124-
Container.ExecResult execResult =
125-
container.execInContainer("bash", "-c", String.join(" ", command));
163+
commandStr);
164+
Container.ExecResult execResult = container.execInContainer("bash", "-c", commandStr);
126165

127-
if (execResult.getStdout() != null && execResult.getStdout().length() > 0) {
166+
if (execResult.getStdout() != null && !execResult.getStdout().isEmpty()) {
128167
LOG.info(
129-
"Execute config file: {} to Container[{}] STDOUT:"
168+
"Container[{}] command {} STDOUT:"
130169
+ "\n==================== STDOUT start ====================\n"
131170
+ "{}"
132171
+ "\n==================== STDOUT end ====================",
133-
configPath,
134172
container.getDockerImageName(),
173+
commandStr,
135174
execResult.getStdout());
136175
}
137-
if (execResult.getStderr() != null && execResult.getStderr().length() > 0) {
176+
if (execResult.getStderr() != null && !execResult.getStderr().isEmpty()) {
138177
LOG.error(
139-
"Execute config file: {} to Container[{}] STDERR:"
178+
"Container[{}] command {} STDERR:"
140179
+ "\n==================== STDERR start ====================\n"
141180
+ "{}"
142181
+ "\n==================== STDERR end ====================",
143-
configPath,
144182
container.getDockerImageName(),
183+
commandStr,
145184
execResult.getStderr());
146185
}
147186

148187
if (execResult.getExitCode() != 0) {
149188
LOG.info(
150-
"Execute config file: {} to Container[{}] Server Log:"
189+
"Container[{}] command {} Server Log:"
151190
+ "\n==================== Server Log start ====================\n"
152191
+ "{}"
153192
+ "\n==================== Server Log end ====================",
154-
configPath,
155193
container.getDockerImageName(),
194+
commandStr,
156195
container.getLogs());
157196
}
158197

0 commit comments

Comments
 (0)