Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-7076][Sort] Add multi table sink for MySQL #7079

Merged
merged 3 commits into from
Dec 30, 2022

Conversation

chyueyi
Copy link
Contributor

@chyueyi chyueyi commented Dec 28, 2022

Prepare a Pull Request

Motivation

  • Users want to real time sync multi-tables data in mongoDB、KAFKA etc to MySql or other JDBC based databases in many case. We need multi table-sink connector for MySql.

  • Based on flink-connector-jdbc, sort-connector-jdbc can sink data to MySql in two formats: canal-json.

Modifications

  • Add multi table sink for MySQL
  • add MySQLRowConverter, MySQLDialect
  • modify JdbcMultiBatchingOutputFormat and AbstractJdbcRowConverter to Support Mysql data type

Verifying this change

(Please pick either of the following options)

  • This change is a trivial rework/code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:
    (please describe tests)

  • This change added tests and can be verified as follows:

    (example:)

CREATE TABLE cdc_mysql_source (
   `data` BYTES METADATA FROM 'meta.data_canal' VIRTUAL
 ) WITH (
'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
'migrate-all' = 'true',
'connector' = 'mysql-cdc-inlong',
'scan.incremental.snapshot.enabled' = 'false',
'hostname' = 'localhost',
'database-name' = 'test',
'server-time-zone' = 'Asia/Shanghai',
'username' = '****',
'password' = '****',
'table-name' = 'test\.[\s\S]*'
);

CREATE TABLE cdc_mysql_sink (
 `data` BYTES
 )WITH (
   'connector' = 'jdbc-inlong',
   'url' = 'jdbc:mysql://localhost:3306',
   'username' = '****',
   'password' = '****',
   'table-name' = 'test',  
   'sink.multiple.enable' = 'true',
   'sink.multiple.schema-update.policy' = 'TRY_IT_BEST',
   'sink.multiple.format' = 'canal-json',
   'sink.multiple.database-pattern' = 'test2',
   'sink.multiple.schema-pattern' = '',
   'sink.multiple.table-pattern' = '${table}'
  );

insert into cdc_mysql_sink select * from cdc_mysql_source;

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation

@chyueyi chyueyi changed the title [INLONG-7076] [sort-connector-jdbc] Add multi table sink for MySQL [INLONG-7076] [Sort] Add multi table sink for MySQL Dec 28, 2022
@chyueyi chyueyi changed the title [INLONG-7076] [Sort] Add multi table sink for MySQL [INLONG-7076][Sort] Add multi table sink for MySQL Dec 28, 2022
@chyueyi
Copy link
Contributor Author

chyueyi commented Dec 28, 2022

@gong OK, I've modified it.

@gong
Copy link
Contributor

gong commented Dec 28, 2022

@kuansix please help to review it

@dockerzhang
Copy link
Contributor

@chyueyi do we need to update the document at the same time?

https://inlong.apache.org/docs/next/data_node/extract_node/mysql-cdc

if you want to change the document, you can create PR for https://github.com/apache/inlong-website.

@dockerzhang dockerzhang merged commit d2204b3 into apache:master Dec 30, 2022
@chyueyi chyueyi deleted the INLONG-7076 branch December 30, 2022 03:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Sort] Add multi table sink for MySQL
6 participants