Skip to content

Commit 2238fda

Browse files
authored
[feature][cdc] Fixed error in mysql cdc under real-time job (#3666)
* [feature][cdc] Fixed error in mysql cdc under real-time job * [chore] license header
1 parent 49cbb9d commit 2238fda

File tree

18 files changed

+525
-12
lines changed

18 files changed

+525
-12
lines changed

plugin-mapping.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,5 @@ seatunnel.source.Jira = connector-http-jira
156156
seatunnel.source.Gitlab = connector-http-gitlab
157157
seatunnel.sink.RabbitMQ = connector-rabbitmq
158158
seatunnel.source.RabbitMQ = connector-rabbitmq
159-
seatunnel.source.OpenMldb = connector-openmldb
159+
seatunnel.source.OpenMldb = connector-openmldb
160+
seatunnel.source.MySQL-CDC = connector-cdc-mysql
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.relational;
19+
20+
import io.debezium.annotation.Immutable;
21+
import io.debezium.relational.Selectors.TableIdToStringMapper;
22+
import io.debezium.schema.DataCollectionId;
23+
24+
import java.io.Serializable;
25+
26+
/**
27+
* Unique identifier for a database table.
28+
*
29+
*/
30+
@Immutable
31+
public final class TableId implements DataCollectionId, Comparable<TableId>, Serializable {
32+
private static final long serialVersionUID = 1L;
33+
34+
/**
35+
* Parse the supplied string, extracting up to the first 3 parts into a TableID.
36+
*
37+
* @param str the string representation of the table identifier; may not be null
38+
* @return the table ID, or null if it could not be parsed
39+
*/
40+
public static TableId parse(String str) {
41+
return parse(str, true);
42+
}
43+
44+
/**
45+
* Parse the supplied string, extracting up to the first 3 parts into a TableID.
46+
*
47+
* @param str the string representation of the table identifier; may not be null
48+
* @param useCatalogBeforeSchema {@code true} if the parsed string contains only 2 items and the first should be used as
49+
* the catalog and the second as the table name, or {@code false} if the first should be used as the schema and the
50+
* second as the table name
51+
* @return the table ID, or null if it could not be parsed
52+
*/
53+
public static TableId parse(String str, boolean useCatalogBeforeSchema) {
54+
String[] parts = TableIdParser.parse(str).toArray(new String[0]);
55+
56+
return TableId.parse(parts, parts.length, useCatalogBeforeSchema);
57+
}
58+
59+
/**
60+
* Parse the supplied string, extracting up to the first 3 parts into a TableID.
61+
*
62+
* @param parts the parts of the identifier; may not be null
63+
* @param numParts the number of parts to use for the table identifier
64+
* @param useCatalogBeforeSchema {@code true} if the parsed string contains only 2 items and the first should be used as
65+
* the catalog and the second as the table name, or {@code false} if the first should be used as the schema and the
66+
* second as the table name
67+
* @return the table ID, or null if it could not be parsed
68+
*/
69+
protected static TableId parse(String[] parts, int numParts, boolean useCatalogBeforeSchema) {
70+
if (numParts == 0) {
71+
return null;
72+
}
73+
if (numParts == 1) {
74+
return new TableId(null, null, parts[0]); // table only
75+
}
76+
if (numParts == 2) {
77+
if (useCatalogBeforeSchema) {
78+
return new TableId(parts[0], null, parts[1]); // catalog & table only
79+
}
80+
return new TableId(null, parts[0], parts[1]); // schema & table only
81+
}
82+
return new TableId(parts[0], parts[1], parts[2]); // catalog, schema & table
83+
}
84+
85+
private final String catalogName;
86+
private final String schemaName;
87+
private final String tableName;
88+
private final String id;
89+
90+
/**
91+
* Create a new table identifier.
92+
*
93+
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
94+
* show a schema for this table
95+
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
96+
* show a schema for this table
97+
* @param tableName the name of the table; may not be null
98+
* @param tableIdMapper the customization of fully quailified table name
99+
*/
100+
public TableId(String catalogName, String schemaName, String tableName, TableIdToStringMapper tableIdMapper) {
101+
this.catalogName = catalogName;
102+
this.schemaName = schemaName;
103+
this.tableName = tableName;
104+
assert this.tableName != null;
105+
this.id = tableIdMapper == null ? tableId(this.catalogName, this.schemaName, this.tableName) : tableIdMapper.toString(this);
106+
}
107+
108+
/**
109+
* Create a new table identifier.
110+
*
111+
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
112+
* show a schema for this table
113+
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
114+
* show a schema for this table
115+
* @param tableName the name of the table; may not be null
116+
*/
117+
public TableId(String catalogName, String schemaName, String tableName) {
118+
this(catalogName, schemaName, tableName, null);
119+
}
120+
121+
/**
122+
* Get the name of the JDBC catalog.
123+
*
124+
* @return the catalog name, or null if the table does not belong to a catalog
125+
*/
126+
public String catalog() {
127+
return catalogName;
128+
}
129+
130+
/**
131+
* Get the name of the JDBC schema.
132+
*
133+
* @return the JDBC schema name, or null if the table does not belong to a JDBC schema
134+
*/
135+
public String schema() {
136+
return schemaName;
137+
}
138+
139+
/**
140+
* Get the name of the table.
141+
*
142+
* @return the table name; never null
143+
*/
144+
public String table() {
145+
return tableName;
146+
}
147+
148+
@Override
149+
public String identifier() {
150+
return id;
151+
}
152+
153+
@Override
154+
public int compareTo(TableId that) {
155+
if (this == that) {
156+
return 0;
157+
}
158+
return this.id.compareTo(that.id);
159+
}
160+
161+
public int compareToIgnoreCase(TableId that) {
162+
if (this == that) {
163+
return 0;
164+
}
165+
return this.id.compareToIgnoreCase(that.id);
166+
}
167+
168+
@Override
169+
public int hashCode() {
170+
return id.hashCode();
171+
}
172+
173+
@Override
174+
public boolean equals(Object obj) {
175+
if (obj instanceof TableId) {
176+
return this.compareTo((TableId) obj) == 0;
177+
}
178+
return false;
179+
}
180+
181+
@Override
182+
public String toString() {
183+
return identifier();
184+
}
185+
186+
/**
187+
* Returns a dot-separated String representation of this identifier, quoting all
188+
* name parts with the {@code "} char.
189+
*/
190+
public String toDoubleQuotedString() {
191+
return toQuotedString('"');
192+
}
193+
194+
/**
195+
* Returns a new {@link TableId} with all parts of the identifier using {@code "} character.
196+
*/
197+
public TableId toDoubleQuoted() {
198+
return toQuoted('"');
199+
}
200+
201+
/**
202+
* Returns a new {@link TableId} that has all parts of the identifier quoted.
203+
*
204+
* @param quotingChar the character to be used to quote the identifier parts.
205+
*/
206+
public TableId toQuoted(char quotingChar) {
207+
String catalogName = null;
208+
if (this.catalogName != null && !this.catalogName.isEmpty()) {
209+
catalogName = quote(this.catalogName, quotingChar);
210+
}
211+
212+
String schemaName = null;
213+
if (this.schemaName != null && !this.schemaName.isEmpty()) {
214+
schemaName = quote(this.schemaName, quotingChar);
215+
}
216+
217+
return new TableId(catalogName, schemaName, quote(this.tableName, quotingChar));
218+
}
219+
220+
/**
221+
* Returns a dot-separated String representation of this identifier, quoting all
222+
* name parts with the given quoting char.
223+
*/
224+
public String toQuotedString(char quotingChar) {
225+
StringBuilder quoted = new StringBuilder();
226+
227+
if (catalogName != null && !catalogName.isEmpty()) {
228+
quoted.append(quote(catalogName, quotingChar)).append(".");
229+
}
230+
231+
if (schemaName != null && !schemaName.isEmpty()) {
232+
quoted.append(quote(schemaName, quotingChar)).append(".");
233+
}
234+
235+
quoted.append(quote(tableName, quotingChar));
236+
237+
return quoted.toString();
238+
}
239+
240+
private static String tableId(String catalog, String schema, String table) {
241+
if (catalog == null || catalog.length() == 0) {
242+
if (schema == null || schema.length() == 0) {
243+
return table;
244+
}
245+
return schema + "." + table;
246+
}
247+
if (schema == null || schema.length() == 0) {
248+
return catalog + "." + table;
249+
}
250+
return catalog + "." + schema + "." + table;
251+
}
252+
253+
/**
254+
* Quotes the given identifier part, e.g. schema or table name.
255+
*/
256+
private static String quote(String identifierPart, char quotingChar) {
257+
if (identifierPart == null) {
258+
return null;
259+
}
260+
261+
if (identifierPart.isEmpty()) {
262+
return new StringBuilder().append(quotingChar).append(quotingChar).toString();
263+
}
264+
265+
if (identifierPart.charAt(0) != quotingChar && identifierPart.charAt(identifierPart.length() - 1) != quotingChar) {
266+
identifierPart = identifierPart.replace(quotingChar + "", repeat(quotingChar));
267+
identifierPart = quotingChar + identifierPart + quotingChar;
268+
}
269+
270+
return identifierPart;
271+
}
272+
273+
private static String repeat(char quotingChar) {
274+
return new StringBuilder().append(quotingChar).append(quotingChar).toString();
275+
}
276+
277+
public TableId toLowercase() {
278+
return new TableId(catalogName, schemaName, tableName.toLowerCase());
279+
}
280+
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,21 @@ public JdbcSourceConfigFactory startupOptions(StartupConfig startupConfig) {
179179
return this;
180180
}
181181

182+
/** Specifies the stop options. */
183+
public JdbcSourceConfigFactory stopOptions(StopConfig stopConfig) {
184+
this.stopConfig = stopConfig;
185+
return this;
186+
}
187+
182188
public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
183189
this.port = config.get(JdbcSourceOptions.PORT);
184190
this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
191+
this.username = config.get(JdbcSourceOptions.USERNAME);
185192
this.password = config.get(JdbcSourceOptions.PASSWORD);
186193
// TODO: support multi-table
187194
this.databaseList = Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
188-
this.tableList = Collections.singletonList(config.get(JdbcSourceOptions.TABLE_NAME));
195+
this.tableList = Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME)
196+
+ "." + config.get(JdbcSourceOptions.TABLE_NAME));
189197
this.distributionFactorUpper = config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
190198
this.distributionFactorLower = config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
191199
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public Optional<SourceSplitBase> getNext() {
111111
// we need to wait snapshot-assigner to be completed before
112112
// assigning the incremental split. Otherwise, records emitted from incremental split
113113
// might be out-of-order in terms of same primary key with snapshot splits.
114-
return snapshotSplitAssigner.getNext();
114+
return incrementalSplitAssigner.getNext();
115115
}
116116
// no more splits for the assigner
117117
return Optional.empty();

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ public class IncrementalSourceEnumerator
5151
*/
5252
private final TreeSet<Integer> readersAwaitingSplit;
5353

54+
private volatile boolean running;
5455
public IncrementalSourceEnumerator(
5556
SourceSplitEnumerator.Context<SourceSplitBase> context,
5657
SplitAssigner splitAssigner) {
5758
this.context = context;
5859
this.splitAssigner = splitAssigner;
5960
this.readersAwaitingSplit = new TreeSet<>();
61+
this.running = false;
6062
}
6163

6264
@Override
@@ -66,7 +68,8 @@ public void open() {
6668

6769
@Override
6870
public void run() throws Exception {
69-
71+
this.running = true;
72+
assignSplits();
7073
}
7174

7275
@Override
@@ -77,7 +80,9 @@ public void handleSplitRequest(int subtaskId) {
7780
}
7881

7982
readersAwaitingSplit.add(subtaskId);
80-
assignSplits();
83+
if (running) {
84+
assignSplits();
85+
}
8186
}
8287

8388
@Override

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ public boolean waitingForCompletedSplits() {
124124
@Override
125125
public void onCompletedSplits(List<SnapshotSplitWatermark> completedSplitWatermarks) {
126126
// do nothing
127+
completedSplitWatermarks.forEach(watermark ->
128+
context.getSplitCompletedOffsets().put(watermark.getSplitId(), watermark.getHighWatermark()));
127129
}
128130

129131
@Override
@@ -205,7 +207,8 @@ private IncrementalSplit createIncrementalSplit(List<TableId> capturedTables, in
205207
}
206208
for (TableId tableId : capturedTables) {
207209
Offset watermark = tableWatermarks.get(tableId);
208-
if (minOffset == null || watermark.isBefore(minOffset)) {
210+
if (minOffset == null ||
211+
(watermark != null && watermark.isBefore(minOffset))) {
209212
minOffset = watermark;
210213
}
211214
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,16 @@ public void open() {
148148

149149
@Override
150150
public Optional<SourceSplitBase> getNext() {
151+
if (chunkSplitter == null) {
152+
return Optional.empty();
153+
}
151154
if (!remainingSplits.isEmpty()) {
152155
// return remaining splits firstly
153156
Iterator<SnapshotSplit> iterator = remainingSplits.iterator();
154157
SnapshotSplit split = iterator.next();
155158
iterator.remove();
156159
assignedSplits.put(split.splitId(), split);
160+
context.getAssignedSnapshotSplit().put(split.splitId(), split);
157161
return Optional.of(split);
158162
} else {
159163
// it's turn for new table

0 commit comments

Comments
 (0)