Skip to content

Commit f8158bb

Browse files
authored
[Improve][Catalog] Use default tablepath when can not get the tablepath from source config (#6276)
* Use default tablepath when can not get the tablepath from source config * revert SqlServerCDCIT run in flink engine
1 parent 2f62235 commit f8158bb

File tree

6 files changed

+70
-3
lines changed

6 files changed

+70
-3
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig re
215215
} else {
216216
Optional<String> resultTableNameOptional =
217217
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME);
218-
tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY);
218+
tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.DEFAULT);
219219
}
220220

221221
return CatalogTable.of(

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public final class TablePath implements Serializable {
3434
private final String schemaName;
3535
private final String tableName;
3636

37-
public static final TablePath EMPTY = TablePath.of(null, null, null);
37+
public static final TablePath DEFAULT = TablePath.of("default", "default", "default");
3838

3939
public static TablePath of(String fullName) {
4040
return of(fullName, false);

seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,21 @@ public void testCatalogUtilGetCatalogTable() throws FileNotFoundException, URISy
138138
Thread.currentThread().getContextClassLoader()));
139139
}
140140

141+
@Test
142+
public void testDefaultTablePath() throws FileNotFoundException, URISyntaxException {
143+
String path = getTestConfigFile("/conf/default_tablepath.conf");
144+
Config config = ConfigFactory.parseFile(new File(path));
145+
Config source = config.getConfigList("source").get(0);
146+
ReadonlyConfig sourceReadonlyConfig = ReadonlyConfig.fromConfig(source);
147+
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(sourceReadonlyConfig);
148+
Assertions.assertEquals(
149+
TablePath.DEFAULT.getDatabaseName(), catalogTable.getTablePath().getDatabaseName());
150+
Assertions.assertEquals(
151+
TablePath.DEFAULT.getSchemaName(), catalogTable.getTablePath().getSchemaName());
152+
Assertions.assertEquals(
153+
TablePath.DEFAULT.getTableName(), catalogTable.getTablePath().getTableName());
154+
}
155+
141156
@Test
142157
public void testGenericRowSchemaTest() throws FileNotFoundException, URISyntaxException {
143158
String path = getTestConfigFile("/conf/generic_row.schema.conf");
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
job.mode = "BATCH"
20+
}
21+
22+
source {
23+
MongoDB-CDC {
24+
hosts = "mongo0:27017"
25+
database = ["inventory"]
26+
collection = ["inventory.products"]
27+
username = superuser
28+
password = superpw
29+
schema = {
30+
fields {
31+
"_id": string,
32+
"name": string,
33+
"description": string,
34+
"weight": string
35+
}
36+
}
37+
}
38+
}
39+
40+
transform {
41+
}
42+
43+
sink {
44+
Console{}
45+
}

seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2424
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
25+
import org.apache.seatunnel.api.table.catalog.TablePath;
2526
import org.apache.seatunnel.api.table.type.RowKind;
2627
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2728
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -84,15 +85,21 @@ public void testComplexSchemaParse(String conf)
8485
public void testRowDataParse(String conf) throws FileNotFoundException, URISyntaxException {
8586
SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
8687
row1.setRowKind(RowKind.INSERT);
88+
row1.setTableId(TablePath.DEFAULT.getFullName());
8789
SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
8890
row2.setRowKind(RowKind.INSERT);
91+
row2.setTableId(TablePath.DEFAULT.getFullName());
8992
SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
9093
row3.setRowKind(RowKind.INSERT);
94+
row3.setTableId(TablePath.DEFAULT.getFullName());
9195
SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, "A", 100});
96+
row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
9297
row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
9398
SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, "A_1", 100});
99+
row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
94100
row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
95101
SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 100});
102+
row2Delete.setTableId(TablePath.DEFAULT.getFullName());
96103
row2Delete.setRowKind(RowKind.DELETE);
97104
List<SeaTunnelRow> expected =
98105
Arrays.asList(row1, row2, row3, row1UpdateBefore, row1UpdateAfter, row2Delete);

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
@Slf4j
6666
@DisabledOnContainer(
6767
value = {},
68-
type = {EngineType.SPARK, EngineType.FLINK},
68+
type = {EngineType.SPARK},
6969
disabledReason = "Currently SPARK do not support cdc")
7070
public class SqlServerCDCIT extends TestSuiteBase implements TestResource {
7171

0 commit comments

Comments
 (0)