Skip to content

Commit add75d7

Browse files
authored
[Feature][CDC] Support add & dorp tables when restore cdc jobs (#4254)
1 parent f7eccfe commit add75d7

File tree

31 files changed

+404
-132
lines changed

31 files changed

+404
-132
lines changed

config/seatunnel.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ seatunnel:
3434
plugin-config:
3535
namespace: /tmp/seatunnel/checkpoint_snapshot
3636
storage.type: hdfs
37-
fs.defaultFS: file:/// # Ensure that the directory has written permission
37+
fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public static List<CatalogTable> getCatalogTables(Config config, ClassLoader cla
111111
// Get the list of specified tables
112112
List<String> tableNames = catalogConfig.get(CatalogOptions.TABLE_NAMES);
113113
List<CatalogTable> catalogTables = new ArrayList<>();
114-
if (tableNames != null && tableNames.size() > 1) {
114+
if (tableNames != null && tableNames.size() >= 1) {
115115
for (String tableName : tableNames) {
116116
catalogTables.add(catalog.getTable(TablePath.of(tableName)));
117117
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@
4242
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
4343
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
4444
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
45+
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
4546
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
4647
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader;
4748
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter;
4849
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
50+
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
4951
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
5052
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
5153
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
@@ -54,15 +56,21 @@
5456
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
5557
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
5658

59+
import com.google.common.collect.Sets;
5760
import io.debezium.relational.TableId;
5861
import lombok.NoArgsConstructor;
5962

6063
import java.util.HashMap;
6164
import java.util.HashSet;
65+
import java.util.Iterator;
6266
import java.util.List;
67+
import java.util.Map;
68+
import java.util.Set;
6369
import java.util.concurrent.BlockingQueue;
6470
import java.util.concurrent.LinkedBlockingQueue;
6571
import java.util.function.Supplier;
72+
import java.util.stream.Collectors;
73+
import java.util.stream.Stream;
6674

6775
@NoArgsConstructor
6876
public abstract class IncrementalSource<T, C extends SourceConfig>
@@ -218,16 +226,20 @@ public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumera
218226
PendingSplitsState checkpointState)
219227
throws Exception {
220228
C sourceConfig = configFactory.create(0);
221-
final List<TableId> remainingTables =
222-
dataSourceDialect.discoverDataCollections(sourceConfig);
223-
SplitAssigner.Context<C> assignerContext =
224-
new SplitAssigner.Context<>(
225-
sourceConfig,
226-
new HashSet<>(remainingTables),
227-
new HashMap<>(),
228-
new HashMap<>());
229+
Set<TableId> capturedTables =
230+
new HashSet<>(dataSourceDialect.discoverDataCollections(sourceConfig));
231+
229232
final SplitAssigner splitAssigner;
230233
if (checkpointState instanceof HybridPendingSplitsState) {
234+
checkpointState = restore(capturedTables, (HybridPendingSplitsState) checkpointState);
235+
SnapshotPhaseState checkpointSnapshotState =
236+
((HybridPendingSplitsState) checkpointState).getSnapshotPhaseState();
237+
SplitAssigner.Context<C> assignerContext =
238+
new SplitAssigner.Context<>(
239+
sourceConfig,
240+
capturedTables,
241+
checkpointSnapshotState.getAssignedSplits(),
242+
checkpointSnapshotState.getSplitCompletedOffsets());
231243
splitAssigner =
232244
new HybridSplitAssigner<>(
233245
assignerContext,
@@ -237,6 +249,9 @@ public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumera
237249
dataSourceDialect,
238250
offsetFactory);
239251
} else if (checkpointState instanceof IncrementalPhaseState) {
252+
SplitAssigner.Context<C> assignerContext =
253+
new SplitAssigner.Context<>(
254+
sourceConfig, capturedTables, new HashMap<>(), new HashMap<>());
240255
splitAssigner =
241256
new IncrementalSplitAssigner<>(
242257
assignerContext, incrementalParallelism, offsetFactory);
@@ -246,4 +261,43 @@ public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumera
246261
}
247262
return new IncrementalSourceEnumerator(enumeratorContext, splitAssigner);
248263
}
264+
265+
private HybridPendingSplitsState restore(
266+
Set<TableId> capturedTables, HybridPendingSplitsState checkpointState) {
267+
SnapshotPhaseState checkpointSnapshotState = checkpointState.getSnapshotPhaseState();
268+
Set<TableId> checkpointCapturedTables =
269+
Stream.concat(
270+
checkpointSnapshotState.getAlreadyProcessedTables().stream(),
271+
checkpointSnapshotState.getRemainingTables().stream())
272+
.collect(Collectors.toSet());
273+
Set<TableId> newTables = Sets.difference(capturedTables, checkpointCapturedTables);
274+
Set<TableId> deletedTables = Sets.difference(checkpointCapturedTables, capturedTables);
275+
276+
checkpointSnapshotState.getRemainingTables().addAll(newTables);
277+
checkpointSnapshotState.getRemainingTables().removeAll(deletedTables);
278+
checkpointSnapshotState.getAlreadyProcessedTables().removeAll(deletedTables);
279+
Set<String> deletedSplitIds = new HashSet<>();
280+
Iterator<SnapshotSplit> splitIterator =
281+
checkpointSnapshotState.getRemainingSplits().iterator();
282+
while (splitIterator.hasNext()) {
283+
SnapshotSplit split = splitIterator.next();
284+
if (deletedTables.contains(split.getTableId())) {
285+
splitIterator.remove();
286+
deletedSplitIds.add(split.splitId());
287+
}
288+
}
289+
for (Map.Entry<String, SnapshotSplit> entry :
290+
checkpointSnapshotState.getAssignedSplits().entrySet()) {
291+
SnapshotSplit split = entry.getValue();
292+
if (deletedTables.contains(split.getTableId())) {
293+
deletedSplitIds.add(entry.getKey());
294+
}
295+
}
296+
deletedSplitIds.forEach(
297+
splitId -> {
298+
checkpointSnapshotState.getAssignedSplits().remove(splitId);
299+
checkpointSnapshotState.getSplitCompletedOffsets().remove(splitId);
300+
});
301+
return checkpointState;
302+
}
249303
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class IncrementalSplitAssigner<C extends SourceConfig> implements SplitAs
6969

7070
private final Map<String, IncrementalSplit> assignedSplits = new HashMap<>();
7171

72+
private boolean startWithSnapshotMinimumOffset = true;
73+
7274
public IncrementalSplitAssigner(
7375
SplitAssigner.Context<C> context,
7476
int incrementalParallelism,
@@ -94,7 +96,8 @@ public Optional<SourceSplitBase> getNext() {
9496
if (splitAssigned) {
9597
return Optional.empty();
9698
}
97-
List<IncrementalSplit> incrementalSplits = createIncrementalSplits();
99+
List<IncrementalSplit> incrementalSplits =
100+
createIncrementalSplits(startWithSnapshotMinimumOffset);
98101
remainingSplits.addAll(incrementalSplits);
99102
splitAssigned = true;
100103
return getNext();
@@ -136,15 +139,24 @@ public void addSplits(Collection<SourceSplitBase> splits) {
136139
List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
137140
incrementalSplit.getCompletedSnapshotSplitInfos();
138141
for (CompletedSnapshotSplitInfo info : completedSnapshotSplitInfos) {
142+
if (!context.getCapturedTables().contains(info.getTableId())) {
143+
continue;
144+
}
139145
context.getSplitCompletedOffsets()
140146
.put(info.getSplitId(), info.getWatermark());
141147
context.getAssignedSnapshotSplit()
142148
.put(info.getSplitId(), info.asSnapshotSplit());
143149
}
144150
for (TableId tableId : incrementalSplit.getTableIds()) {
151+
if (!context.getCapturedTables().contains(tableId)) {
152+
continue;
153+
}
145154
tableWatermarks.put(tableId, startupOffset);
146155
}
147156
});
157+
if (!tableWatermarks.isEmpty()) {
158+
this.startWithSnapshotMinimumOffset = false;
159+
}
148160
}
149161

150162
@Override
@@ -159,7 +171,7 @@ public void notifyCheckpointComplete(long checkpointId) {
159171

160172
// ------------------------------------------------------------------------------------------
161173

162-
public List<IncrementalSplit> createIncrementalSplits() {
174+
public List<IncrementalSplit> createIncrementalSplits(boolean startWithSnapshotMinimumOffset) {
163175
Set<TableId> allTables = new HashSet<>(context.getCapturedTables());
164176
assignedSplits.values().forEach(split -> split.getTableIds().forEach(allTables::remove));
165177
List<TableId>[] capturedTables = new List[incrementalParallelism];
@@ -175,12 +187,14 @@ public List<IncrementalSplit> createIncrementalSplits() {
175187
i = 0;
176188
List<IncrementalSplit> incrementalSplits = new ArrayList<>();
177189
for (List<TableId> capturedTable : capturedTables) {
178-
incrementalSplits.add(createIncrementalSplit(capturedTable, i++));
190+
incrementalSplits.add(
191+
createIncrementalSplit(capturedTable, i++, startWithSnapshotMinimumOffset));
179192
}
180193
return incrementalSplits;
181194
}
182195

183-
private IncrementalSplit createIncrementalSplit(List<TableId> capturedTables, int index) {
196+
private IncrementalSplit createIncrementalSplit(
197+
List<TableId> capturedTables, int index, boolean startWithSnapshotMinimumOffset) {
184198
final List<SnapshotSplit> assignedSnapshotSplit =
185199
context.getAssignedSnapshotSplit().values().stream()
186200
.filter(split -> capturedTables.contains(split.getTableId()))
@@ -191,10 +205,12 @@ private IncrementalSplit createIncrementalSplit(List<TableId> capturedTables, in
191205
final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = new ArrayList<>();
192206
Offset minOffset = null;
193207
for (SnapshotSplit split : assignedSnapshotSplit) {
194-
// find the min offset of change log
195208
Offset changeLogOffset = splitCompletedOffsets.get(split.splitId());
196-
if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
197-
minOffset = changeLogOffset;
209+
if (startWithSnapshotMinimumOffset) {
210+
// find the min offset of change log
211+
if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
212+
minOffset = changeLogOffset;
213+
}
198214
}
199215
completedSnapshotSplitInfos.add(
200216
new CompletedSnapshotSplitInfo(

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.Map;
4444
import java.util.concurrent.BlockingQueue;
4545
import java.util.function.Supplier;
46+
import java.util.stream.Collectors;
4647

4748
import static com.google.common.base.Preconditions.checkState;
4849

@@ -57,8 +58,6 @@ public class IncrementalSourceReader<T, C extends SourceConfig>
5758

5859
private final Map<String, SnapshotSplit> finishedUnackedSplits;
5960

60-
private final Map<String, IncrementalSplit> uncompletedIncrementalSplits;
61-
6261
private volatile boolean running = false;
6362
private final int subtaskId;
6463

@@ -79,7 +78,6 @@ public IncrementalSourceReader(
7978
context);
8079
this.sourceConfig = sourceConfig;
8180
this.finishedUnackedSplits = new HashMap<>();
82-
this.uncompletedIncrementalSplits = new HashMap<>();
8381
this.subtaskId = context.getIndexOfSubtask();
8482
}
8583

@@ -110,15 +108,15 @@ public void addSplits(List<SourceSplitBase> splits) {
110108
unfinishedSplits.add(split);
111109
}
112110
} else {
113-
// the incremental split is uncompleted
114-
uncompletedIncrementalSplits.put(split.splitId(), split.asIncrementalSplit());
115111
unfinishedSplits.add(split.asIncrementalSplit());
116112
}
117113
}
118114
// notify split enumerator again about the finished unacked snapshot splits
119115
reportFinishedSnapshotSplitsIfNeed();
120116
// add all un-finished splits (including incremental split) to SourceReaderBase
121-
super.addSplits(unfinishedSplits);
117+
if (!unfinishedSplits.isEmpty()) {
118+
super.addSplits(unfinishedSplits);
119+
}
122120
}
123121

124122
@Override
@@ -168,16 +166,18 @@ protected SourceSplitStateBase initializedState(SourceSplitBase split) {
168166

169167
@Override
170168
public List<SourceSplitBase> snapshotState(long checkpointId) {
171-
// unfinished splits
172169
List<SourceSplitBase> stateSplits = super.snapshotState(checkpointId);
173170

174-
// add finished snapshot splits that didn't receive ack yet
175-
stateSplits.addAll(finishedUnackedSplits.values());
171+
// unfinished splits
172+
List<SourceSplitBase> unfinishedSplits =
173+
stateSplits.stream()
174+
.filter(split -> !finishedUnackedSplits.containsKey(split.splitId()))
175+
.collect(Collectors.toList());
176176

177-
// add incremental splits who are uncompleted
178-
stateSplits.addAll(uncompletedIncrementalSplits.values());
177+
// add finished snapshot splits that didn't receive ack yet
178+
unfinishedSplits.addAll(finishedUnackedSplits.values());
179179

180-
return stateSplits;
180+
return unfinishedSplits;
181181
}
182182

183183
@Override

seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ public void testSimpleJobParse() {
4848
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
4949
List<Action> actions = parse.getLeft();
5050
Assertions.assertEquals(1, actions.size());
51-
Assertions.assertEquals("LocalFile", actions.get(0).getName());
51+
Assertions.assertEquals("Sink[0]-LocalFile-default", actions.get(0).getName());
5252
Assertions.assertEquals(1, actions.get(0).getUpstream().size());
53-
Assertions.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
53+
Assertions.assertEquals(
54+
"Source[0]-FakeSource-default", actions.get(0).getUpstream().get(0).getName());
5455

5556
Assertions.assertEquals(3, actions.get(0).getUpstream().get(0).getParallelism());
5657
Assertions.assertEquals(3, actions.get(0).getParallelism());
@@ -69,10 +70,12 @@ public void testComplexJobParse() {
6970
List<Action> actions = parse.getLeft();
7071
Assertions.assertEquals(1, actions.size());
7172

72-
Assertions.assertEquals("LocalFile", actions.get(0).getName());
73+
Assertions.assertEquals("Sink[0]-LocalFile-fake", actions.get(0).getName());
7374
Assertions.assertEquals(2, actions.get(0).getUpstream().size());
74-
Assertions.assertEquals("FakeSource", actions.get(0).getUpstream().get(0).getName());
75-
Assertions.assertEquals("FakeSource", actions.get(0).getUpstream().get(1).getName());
75+
Assertions.assertEquals(
76+
"Source[0]-FakeSource-fake", actions.get(0).getUpstream().get(0).getName());
77+
Assertions.assertEquals(
78+
"Source[1]-FakeSource-fake", actions.get(0).getUpstream().get(1).getName());
7679

7780
Assertions.assertEquals(3, actions.get(0).getUpstream().get(0).getParallelism());
7881
Assertions.assertEquals(3, actions.get(0).getUpstream().get(1).getParallelism());

seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void testLogicalGenerator() {
5656
LogicalDag logicalDag = logicalDagGenerator.generate();
5757
JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
5858
String result =
59-
"{\"vertices\":[{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6},{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"},{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"}]}";
59+
"{\"vertices\":[{\"id\":2,\"name\":\"Source[0]-FakeSource-fake(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Source[1]-FakeSource-fake(id=3)\",\"parallelism\":3},{\"id\":1,\"name\":\"Sink[0]-LocalFile-fake(id=1)\",\"parallelism\":6}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}";
6060
Assertions.assertEquals(result, logicalDagJson.toString());
6161
}
6262
}

seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@
2929
import lombok.ToString;
3030
import lombok.experimental.SuperBuilder;
3131
import lombok.experimental.Tolerate;
32+
import lombok.extern.slf4j.Slf4j;
3233

3334
import java.util.HashMap;
3435
import java.util.Map;
3536
import java.util.Objects;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.Stream;
3639

40+
@Slf4j
3741
@SuperBuilder(toBuilder = true)
3842
@Getter
3943
@Setter
@@ -57,6 +61,13 @@ public Map<String, IQueue<Record<?>>> createShuffles(
5761
queue.clear();
5862
shuffleMap.put(queueName, queue);
5963
}
64+
65+
log.info(
66+
"pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
67+
pipelineId,
68+
inputIndex,
69+
shuffleMap.keySet());
70+
6071
return shuffleMap;
6172
}
6273

@@ -75,18 +86,24 @@ public IQueue<Record<?>>[] getShuffles(
7586
String queueName = generateQueueName(pipelineId, inputIndex, targetTableId);
7687
queues[inputIndex] = getIQueue(hazelcast, queueName);
7788
}
89+
90+
log.info(
91+
"pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
92+
pipelineId,
93+
targetIndex,
94+
Stream.of(queues).map(e -> e.getName()).collect(Collectors.toList()));
95+
7896
return queues;
7997
}
8098

8199
private String generateQueueName(int pipelineId, int inputIndex, String tableId) {
82-
return "ShuffleMultipleRow-Queue["
100+
return "ShuffleMultipleRow-Queue_"
83101
+ getJobId()
84-
+ "-"
102+
+ "_"
85103
+ pipelineId
86-
+ "-"
104+
+ "_"
87105
+ inputIndex
88-
+ "-"
89-
+ tableId
90-
+ "]";
106+
+ "_"
107+
+ tableId;
91108
}
92109
}

0 commit comments

Comments
 (0)