Skip to content

Commit e454b80

Browse files
authored
[feature][Connector-v2][cdc] Add cdc base reader (#3407)
* [feature][Connector-v2][cdc] Add cdc base reader * [feature][Connector-v2][cdc] fix check style
1 parent cc0cb8c commit e454b80

21 files changed

+1776
-12
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
<properties>
3232
<debezium.version>1.6.4.Final</debezium.version>
33+
<hikaricp.version>4.0.3</hikaricp.version>
3334
</properties>
3435

3536
<dependencies>
@@ -42,10 +43,30 @@
4243
<groupId>io.debezium</groupId>
4344
<artifactId>debezium-embedded</artifactId>
4445
</dependency>
46+
<dependency>
47+
<groupId>com.zaxxer</groupId>
48+
<artifactId>HikariCP</artifactId>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.apache.seatunnel</groupId>
52+
<artifactId>connector-common</artifactId>
53+
</dependency>
54+
4555
</dependencies>
4656

4757
<dependencyManagement>
4858
<dependencies>
59+
<dependency>
60+
<groupId>com.zaxxer</groupId>
61+
<artifactId>HikariCP</artifactId>
62+
<version>4.0.3</version>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.apache.seatunnel</groupId>
67+
<artifactId>connector-common</artifactId>
68+
<version>${project.version}</version>
69+
</dependency>
4970
<!-- Debezium dependencies -->
5071
<dependency>
5172
<groupId>io.debezium</groupId>
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.seatunnel.connectors.cdc.base.dialect;
19+
20+
import io.debezium.relational.TableId;
21+
import io.debezium.relational.history.TableChanges;
22+
import org.seatunnel.connectors.cdc.base.config.SourceConfig;
23+
import org.seatunnel.connectors.cdc.base.source.offset.Offset;
24+
import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
25+
import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
26+
27+
import java.io.Serializable;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
/**
32+
* The dialect of data source.
33+
*
34+
* @param <C> The source config of data source.
35+
*/
36+
37+
public interface DataSourceDialect<C extends SourceConfig> extends Serializable {
38+
39+
/** Get the name of dialect. */
40+
String getName();
41+
42+
/** Discovers the list of data collection to capture. */
43+
List<TableId> discoverDataCollections(C sourceConfig);
44+
45+
/**
46+
* Discovers the captured data collections' schema by {@link SourceConfig}.
47+
*
48+
* @param sourceConfig a basic source configuration.
49+
*/
50+
Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(C sourceConfig);
51+
52+
/**
53+
* Displays current offset from the database e.g. query Mysql binary logs by query <code>
54+
* SHOW MASTER STATUS</code>.
55+
*/
56+
Offset displayCurrentOffset(C sourceConfig);
57+
58+
/** Check if the CollectionId is case-sensitive or not. */
59+
boolean isDataCollectionIdCaseSensitive(C sourceConfig);
60+
61+
/** The fetch task used to fetch data of a snapshot split or stream split. */
62+
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase);
63+
64+
/** The task context used for fetch task to fetch data from external systems. */
65+
FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, C sourceConfig);
66+
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
package org.seatunnel.connectors.cdc.base.source.event;
1919

20+
import org.apache.seatunnel.api.source.SourceEvent;
21+
2022
import lombok.Data;
2123

22-
import java.io.Serializable;
2324
import java.util.List;
2425

2526
@Data
26-
public class CompletedSnapshotSplitReportEvent implements Serializable {
27+
public class CompletedSnapshotSplitReportEvent implements SourceEvent {
2728
private static final long serialVersionUID = 1L;
2829
List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
2930
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.seatunnel.connectors.cdc.base.source.reader;
19+
20+
import static com.google.common.base.Preconditions.checkState;
21+
22+
import org.apache.seatunnel.api.source.SourceReader;
23+
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
24+
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
25+
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
26+
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
27+
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
28+
29+
import lombok.extern.slf4j.Slf4j;
30+
import org.seatunnel.connectors.cdc.base.config.SourceConfig;
31+
import org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitReportEvent;
32+
import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
33+
import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
34+
import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
35+
import org.seatunnel.connectors.cdc.base.source.split.SourceRecords;
36+
import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
37+
import org.seatunnel.connectors.cdc.base.source.split.state.LogSplitState;
38+
import org.seatunnel.connectors.cdc.base.source.split.state.SnapshotSplitState;
39+
import org.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
40+
41+
import java.util.ArrayList;
42+
import java.util.HashMap;
43+
import java.util.List;
44+
import java.util.Map;
45+
import java.util.concurrent.BlockingQueue;
46+
import java.util.function.Supplier;
47+
48+
/**
49+
* The multi-parallel source reader for table snapshot phase from {@link SnapshotSplit} and then
50+
* single-parallel source reader for table stream phase from {@link LogSplit}.
51+
*/
52+
@Slf4j
53+
public class IncrementalSourceReader<T, C extends SourceConfig>
54+
extends SingleThreadMultiplexSourceReaderBase<
55+
SourceRecords, T, SourceSplitBase, SourceSplitStateBase> {
56+
57+
private final Map<String, SnapshotSplit> finishedUnackedSplits;
58+
59+
private final Map<String, LogSplit> uncompletedStreamSplits;
60+
61+
private final int subtaskId;
62+
63+
private final C sourceConfig;
64+
65+
public IncrementalSourceReader(
66+
BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue,
67+
Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier,
68+
RecordEmitter<SourceRecords, T, SourceSplitStateBase> recordEmitter,
69+
SourceReaderOptions options,
70+
SourceReader.Context context,
71+
C sourceConfig) {
72+
super(
73+
elementsQueue,
74+
new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get),
75+
recordEmitter,
76+
options,
77+
context);
78+
this.sourceConfig = sourceConfig;
79+
this.finishedUnackedSplits = new HashMap<>();
80+
this.uncompletedStreamSplits = new HashMap<>();
81+
this.subtaskId = context.getIndexOfSubtask();
82+
}
83+
84+
@Override
85+
public void notifyCheckpointComplete(long checkpointId) throws Exception {
86+
87+
}
88+
89+
@Override
90+
public void addSplits(List<SourceSplitBase> splits) {
91+
// restore for finishedUnackedSplits
92+
List<SourceSplitBase> unfinishedSplits = new ArrayList<>();
93+
for (SourceSplitBase split : splits) {
94+
if (split.isSnapshotSplit()) {
95+
SnapshotSplit snapshotSplit = split.asSnapshotSplit();
96+
if (snapshotSplit.isSnapshotReadFinished()) {
97+
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
98+
} else {
99+
unfinishedSplits.add(split);
100+
}
101+
} else {
102+
// the stream split is uncompleted
103+
uncompletedStreamSplits.put(split.splitId(), split.asLogSplit());
104+
unfinishedSplits.add(split.asLogSplit());
105+
}
106+
}
107+
// notify split enumerator again about the finished unacked snapshot splits
108+
reportFinishedSnapshotSplitsIfNeed();
109+
// add all un-finished splits (including stream split) to SourceReaderBase
110+
super.addSplits(unfinishedSplits);
111+
}
112+
113+
@Override
114+
protected void onSplitFinished(Map<String, SourceSplitStateBase> finishedSplitIds) {
115+
for (SourceSplitStateBase splitState : finishedSplitIds.values()) {
116+
SourceSplitBase sourceSplit = splitState.toSourceSplit();
117+
checkState(
118+
sourceSplit.isSnapshotSplit(),
119+
String.format(
120+
"Only snapshot split could finish, but the actual split is stream split %s",
121+
sourceSplit));
122+
finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
123+
}
124+
reportFinishedSnapshotSplitsIfNeed();
125+
context.sendSplitRequest();
126+
}
127+
128+
private void reportFinishedSnapshotSplitsIfNeed() {
129+
if (!finishedUnackedSplits.isEmpty()) {
130+
List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks = new ArrayList<>();
131+
132+
for (SnapshotSplit split : finishedUnackedSplits.values()) {
133+
completedSnapshotSplitWatermarks.add(new SnapshotSplitWatermark(split.splitId(), split.getHighWatermark()));
134+
}
135+
CompletedSnapshotSplitReportEvent reportEvent = new CompletedSnapshotSplitReportEvent();
136+
reportEvent.setCompletedSnapshotSplitWatermarks(completedSnapshotSplitWatermarks);
137+
context.sendSourceEventToEnumerator(reportEvent);
138+
//TODO need enumerator return ack
139+
finishedUnackedSplits.clear();
140+
log.debug(
141+
"The subtask {} reports offsets of finished snapshot splits {}.",
142+
subtaskId,
143+
completedSnapshotSplitWatermarks);
144+
}
145+
}
146+
147+
@Override
148+
protected SourceSplitStateBase initializedState(SourceSplitBase split) {
149+
if (split.isSnapshotSplit()) {
150+
return new SnapshotSplitState(split.asSnapshotSplit());
151+
} else {
152+
return new LogSplitState(split.asLogSplit());
153+
}
154+
}
155+
156+
@Override
157+
public List<SourceSplitBase> snapshotState(long checkpointId) {
158+
// unfinished splits
159+
List<SourceSplitBase> stateSplits = super.snapshotState(checkpointId);
160+
161+
// add finished snapshot splits that didn't receive ack yet
162+
stateSplits.addAll(finishedUnackedSplits.values());
163+
164+
// add stream splits who are uncompleted
165+
stateSplits.addAll(uncompletedStreamSplits.values());
166+
167+
return stateSplits;
168+
}
169+
170+
@Override
171+
protected SourceSplitBase toSplitType(String splitId, SourceSplitStateBase splitState) {
172+
return splitState.toSourceSplit();
173+
}
174+
}

0 commit comments

Comments
 (0)