Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized #5663

Merged
merged 27 commits into from
Oct 30, 2023

Conversation

chl-wxp
Copy link
Contributor

@chl-wxp chl-wxp commented Oct 19, 2023

Function list

  • add save mode function interface ,see “org.apache.seatunnel.api.sink.SaveModeHandler”
  • add saveModeHandler default implementation ,see “org.apache.seatunnel.api.sink.DefaultSaveModeHandler”
  • connector-jdbc(mysql) implement the save mode function interface

…_mysql

# Conflicts:
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
user = "root"
password = "Abc!@#135_seatunnel"
generate_sink_sql = true
catalog {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this tag has deleted
#5645

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been processed

@@ -221,6 +222,13 @@ public void handleSaveMode(DataSaveMode saveMode) {
if (!catalog.tableExists(tablePath)) {
catalog.createTable(tablePath, catalogTable, true);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why retain these logics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been processed

Comment on lines 226 to 228
} catch (UnsupportedOperationException | CatalogException e) {
// TODO Temporary fix, this feature has been changed in this pr
// https://github.com/apache/seatunnel/pull/5645
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

if (!catalog.tableExists(tablePath)) {
catalog.createTable(tablePath, catalogTable, true);
}
return new DefaultSaveModeHandler(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

205 line:try (Catalog catalog = catalogOptional.get()) {

this code auto close catalog, Will savemode not interrupt abnormally?

@hailin0
Copy link
Member

hailin0 commented Oct 20, 2023

rebase dev merge this pr
#5620

…_mysql

# Conflicts:
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
#	seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
Comment on lines 33 to 34
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");
;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");
;
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been processed

// path and files in the path, create new files in the path.
KEEP_SCHEMA_AND_DATA,
// Preserve database structure, preserve data
AND_DATA,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's meaning of AND_DATA? I think you mean APPEND_DATA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been processed

String SCHEMA_SAVE_MODE_KEY = "schema_save_mode";

// This method defines the return of a specific save_mode handler
SaveModeHandler getSaveModeHandler();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use optional to avoid NPE

Suggested change
SaveModeHandler getSaveModeHandler();
Optional<SaveModeHandler> getSaveModeHandler();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been processed

…_mysql

# Conflicts:
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
#	seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
finalCatalogTable);
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(URL, DRIVER)
.required(URL, DRIVER, SCHEMA_SAVE_MODE, DATA_SAVE_MODE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add new options into connector docs

Comment on lines 98 to 102
CatalogTable catalogTable = context.getCatalogTable();
ReadonlyConfig catalogOptions = getCatalogOptions(context);
Map<String, String> catalogOptions =
config.get(CatalogOptions.CATALOG_OPTIONS) == null
? new HashMap<>()
: config.get(CatalogOptions.CATALOG_OPTIONS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compatible with catalog

…_mysql

# Conflicts:
#	seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
#	seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@EricJoy2048 EricJoy2048 merged commit eff17cc into apache:dev Oct 30, 2023
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants