Skip to content

Commit eff17cc

Browse files
authored
[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized (#5663)
1 parent 245705d commit eff17cc

File tree

34 files changed

+1474
-224
lines changed

34 files changed

+1474
-224
lines changed

.github/workflows/backend.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,30 @@ jobs:
925925
env:
926926
MAVEN_OPTS: -Xmx4096m
927927

928+
jdbc-connectors-it-part-7:
929+
needs: [ changes, sanity-check ]
930+
if: needs.changes.outputs.api == 'true'
931+
runs-on: ${{ matrix.os }}
932+
strategy:
933+
matrix:
934+
java: [ '8', '11' ]
935+
os: [ 'ubuntu-latest' ]
936+
timeout-minutes: 90
937+
steps:
938+
- uses: actions/checkout@v2
939+
- name: Set up JDK ${{ matrix.java }}
940+
uses: actions/setup-java@v3
941+
with:
942+
java-version: ${{ matrix.java }}
943+
distribution: 'temurin'
944+
cache: 'maven'
945+
- name: run jdbc connectors integration test (part-6)
946+
if: needs.changes.outputs.api == 'true'
947+
run: |
948+
./mvnw -B -T 1C verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci
949+
env:
950+
MAVEN_OPTS: -Xmx4096m
951+
928952

929953
kafka-connector-it:
930954
needs: [ changes, sanity-check ]

seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
2929
DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
3030
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
3131
HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
32-
;
32+
SOURCE_ALREADY_HAS_DATA("API-10", "The target data source already has data"),
33+
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");
3334

3435
private final String code;
3536
private final String description;

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,16 @@
2121
* The SaveMode for the Sink connectors that use table or other table structures to organize data
2222
*/
2323
public enum DataSaveMode {
24-
// Will drop table in MySQL, Will drop path for File Connector.
25-
DROP_SCHEMA,
2624

27-
// Only drop the data in MySQL, Only drop the files in the path for File Connector.
28-
KEEP_SCHEMA_DROP_DATA,
25+
// Preserve database structure and delete data
26+
DROP_DATA,
2927

30-
// Keep the table and data and continue to write data to the existing table for MySQL. Keep the
31-
// path and files in the path, create new files in the path.
32-
KEEP_SCHEMA_AND_DATA,
28+
// Preserve database structure, preserve data
29+
APPEND_DATA,
3330

34-
// The connector provides custom processing methods, such as running user provided SQL or shell
35-
// scripts, etc
31+
// User defined processing
3632
CUSTOM_PROCESSING,
3733

38-
// Throw error when table is exists for MySQL. Throw error when path is exists.
39-
ERROR_WHEN_EXISTS
34+
// When there exist data, an error will be reported
35+
ERROR_WHEN_DATA_EXISTS
4036
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.sink;
19+
20+
import org.apache.seatunnel.api.table.catalog.Catalog;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.api.table.catalog.TablePath;
23+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
24+
25+
import lombok.AllArgsConstructor;
26+
27+
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST;
28+
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA;
29+
30+
@AllArgsConstructor
31+
public class DefaultSaveModeHandler implements SaveModeHandler {
32+
33+
public SchemaSaveMode schemaSaveMode;
34+
public DataSaveMode dataSaveMode;
35+
public Catalog catalog;
36+
public TablePath tablePath;
37+
public CatalogTable catalogTable;
38+
public String customSql;
39+
40+
public DefaultSaveModeHandler(
41+
SchemaSaveMode schemaSaveMode,
42+
DataSaveMode dataSaveMode,
43+
Catalog catalog,
44+
CatalogTable catalogTable,
45+
String customSql) {
46+
this(
47+
schemaSaveMode,
48+
dataSaveMode,
49+
catalog,
50+
catalogTable.getTableId().toTablePath(),
51+
catalogTable,
52+
customSql);
53+
}
54+
55+
@Override
56+
public void handleSchemaSaveMode() {
57+
switch (schemaSaveMode) {
58+
case RECREATE_SCHEMA:
59+
recreateSchema();
60+
break;
61+
case CREATE_SCHEMA_WHEN_NOT_EXIST:
62+
createSchemaWhenNotExist();
63+
break;
64+
case ERROR_WHEN_SCHEMA_NOT_EXIST:
65+
errorWhenSchemaNotExist();
66+
break;
67+
default:
68+
throw new UnsupportedOperationException("Unsupported save mode: " + schemaSaveMode);
69+
}
70+
}
71+
72+
@Override
73+
public void handleDataSaveMode() {
74+
switch (dataSaveMode) {
75+
case DROP_DATA:
76+
keepSchemaDropData();
77+
break;
78+
case APPEND_DATA:
79+
keepSchemaAndData();
80+
break;
81+
case CUSTOM_PROCESSING:
82+
customProcessing();
83+
break;
84+
case ERROR_WHEN_DATA_EXISTS:
85+
errorWhenDataExists();
86+
break;
87+
default:
88+
throw new UnsupportedOperationException("Unsupported save mode: " + dataSaveMode);
89+
}
90+
}
91+
92+
protected void recreateSchema() {
93+
if (tableExists()) {
94+
dropTable();
95+
}
96+
createTable();
97+
}
98+
99+
protected void createSchemaWhenNotExist() {
100+
if (!tableExists()) {
101+
createTable();
102+
}
103+
}
104+
105+
protected void errorWhenSchemaNotExist() {
106+
if (!tableExists()) {
107+
throw new SeaTunnelRuntimeException(SINK_TABLE_NOT_EXIST, "The sink table not exist");
108+
}
109+
}
110+
111+
protected void keepSchemaDropData() {
112+
if (tableExists()) {
113+
truncateTable();
114+
}
115+
}
116+
117+
protected void keepSchemaAndData() {}
118+
119+
protected void customProcessing() {
120+
executeCustomSql();
121+
}
122+
123+
protected void errorWhenDataExists() {
124+
if (dataExists()) {
125+
throw new SeaTunnelRuntimeException(
126+
SOURCE_ALREADY_HAS_DATA, "The target data source already has data");
127+
}
128+
}
129+
130+
protected boolean tableExists() {
131+
return catalog.tableExists(tablePath);
132+
}
133+
134+
protected void dropTable() {
135+
catalog.dropTable(tablePath, true);
136+
}
137+
138+
protected void createTable() {
139+
catalog.createTable(tablePath, catalogTable, true);
140+
}
141+
142+
protected void truncateTable() {
143+
catalog.truncateTable(tablePath, true);
144+
}
145+
146+
protected boolean dataExists() {
147+
return catalog.isExistsData(tablePath);
148+
}
149+
150+
protected void executeCustomSql() {
151+
catalog.executeSql(tablePath, customSql);
152+
}
153+
154+
@Override
155+
public void close() throws Exception {
156+
catalog.close();
157+
}
158+
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java renamed to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717

1818
package org.apache.seatunnel.api.sink;
1919

20-
/** The Sink Connectors which support data SaveMode should implement this interface */
21-
public interface SupportDataSaveMode {
22-
String SAVE_MODE_KEY = "savemode";
23-
/**
24-
* Return the value of DataSaveMode configured by user in the job config file.
25-
*
26-
* @return
27-
*/
28-
DataSaveMode getUserConfigSaveMode();
20+
public interface SaveModeHandler extends AutoCloseable {
2921

30-
/** The implementation of specific logic according to different {@link DataSaveMode} */
31-
void handleSaveMode(DataSaveMode userConfigSaveMode);
22+
void handleSchemaSaveMode();
23+
24+
void handleDataSaveMode();
25+
26+
default void handleSaveMode() {
27+
handleSchemaSaveMode();
28+
handleDataSaveMode();
29+
}
3230
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.sink;
19+
20+
public enum SchemaSaveMode {
21+
22+
// Will create when the table does not exist, delete and rebuild when the table is saved
23+
RECREATE_SCHEMA,
24+
25+
// Will Created when the table does not exist, skipped when the table is saved
26+
CREATE_SCHEMA_WHEN_NOT_EXIST,
27+
28+
// Error will be reported when the table does not exist
29+
ERROR_WHEN_SCHEMA_NOT_EXIST,
30+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.sink;
19+
20+
public final class SinkReplaceNameConstant {
21+
22+
public static final String REPLACE_TABLE_NAME_KEY = "${table_name}";
23+
24+
public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}";
25+
26+
public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}";
27+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.sink;
19+
20+
import java.util.Optional;
21+
22+
/** The Sink Connectors which support schema and data SaveMode should implement this interface */
23+
public interface SupportSaveMode {
24+
25+
String DATA_SAVE_MODE_KEY = "data_save_mode";
26+
27+
String SCHEMA_SAVE_MODE_KEY = "schema_save_mode";
28+
29+
// This method defines the return of a specific save_mode handler
30+
Optional<SaveModeHandler> getSaveModeHandler();
31+
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,25 @@ void createDatabase(TablePath tablePath, boolean ignoreIfExists)
189189
void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
190190
throws DatabaseNotExistException, CatalogException;
191191

192+
/**
193+
* Truncate an existing table data in this catalog.
194+
*
195+
* @param tablePath Path of the table
196+
* @param ignoreIfNotExists Flag to specify behavior when a table with the given name doesn't
197+
* exist
198+
* @throws TableNotExistException thrown if the table doesn't exist in the catalog and
199+
* ignoreIfNotExists is false
200+
* @throws CatalogException in case of any runtime exception
201+
*/
202+
default void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
203+
throws TableNotExistException, CatalogException {}
204+
205+
default boolean isExistsData(TablePath tablePath) {
206+
return false;
207+
}
208+
209+
default void executeSql(TablePath tablePath, String sql) {}
210+
192211
// todo: Support for update table metadata
193212

194213
}

seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/SingleChoiceOptionTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,8 @@ public void test() {
4141
Options.key("save_mode")
4242
.singleChoice(
4343
DataSaveMode.class,
44-
Arrays.asList(
45-
DataSaveMode.DROP_SCHEMA,
46-
DataSaveMode.KEEP_SCHEMA_DROP_DATA))
47-
.defaultValue(DataSaveMode.DROP_SCHEMA)
44+
Arrays.asList(DataSaveMode.APPEND_DATA, DataSaveMode.DROP_DATA))
45+
.defaultValue(DataSaveMode.APPEND_DATA)
4846
.withDescription("save mode test");
4947

5048
OptionRule build = OptionRule.builder().optional(stringOption, saveModeOption).build();
@@ -58,6 +56,6 @@ public void test() {
5856
option = optionalOptions.get(1);
5957
singleChoiceOption = (SingleChoiceOption) option;
6058
Assertions.assertEquals(2, singleChoiceOption.getOptionValues().size());
61-
Assertions.assertEquals(DataSaveMode.DROP_SCHEMA, singleChoiceOption.defaultValue());
59+
Assertions.assertEquals(DataSaveMode.APPEND_DATA, singleChoiceOption.defaultValue());
6260
}
6361
}

0 commit comments

Comments
 (0)