Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Oracle][MySQL][SqlServer][PostgresSQL] Fix Oracle/MySQL/SqlServer/PostgresSQL CDC parser schema change event failed #2315

Merged
merged 20 commits into from
Feb 28, 2024

Conversation

e-mhui
Copy link
Contributor

@e-mhui e-mhui commented Jul 18, 2023

Fix: #2055 #1792 #1379 #1807 #1810 #1803 #1804 #2311

When a SchemaChangeEvent is triggered, the schemaChangeRecordValue will obtain binlog information from sourceInfo. However, only MySQL CDC has binlog information, and other CDC will report an error after triggering a SchemaChangeEvent event.

image

So I provided an interface for each connector to implement their own sourceinfo parsing.

/** SchemaChangeEvent Handler. */
public interface SchemaChangeEventHandler {
    Map<String, Object> parseSource(SchemaChangeEvent event);
}

Then, it's used in JdbcSourceEventDispatcher#schemaChangeRecordValue.

        private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOException {
            Map<String, Object> source = schemaChangeEventHandler.parseSource(event);
            HistoryRecord historyRecord =
                    new HistoryRecord(
                            source,
                            event.getOffset(),
                            event.getDatabase(),
                            null,
                            event.getSchema(),
                            event.getDdl(),
                            event.getTableChanges());
        // ... other code ... 

For MySQL CDC

/** MySqlSchemaChangeEventHandler to deal MySql's schema change event. */
public class MySqlSchemaChangeEventHandler implements SchemaChangeEventHandler {

    @Override
    public Map<String, Object> parseSource(SchemaChangeEvent event) {
        Map<String, Object> source = new HashMap<>();
        Struct sourceInfo = event.getSource();
        String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
        Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
        Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
        source.put(SERVER_ID_KEY, serverId);
        source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
        source.put(BINLOG_POSITION_OFFSET_KEY, pos);
        return source;
    }
}

For Oracle CDC

/** OracleSchemaChangeEventHandler to deal Oracle's schema change event. */
public class OracleSchemaChangeEventHandler implements SchemaChangeEventHandler {

    @Override
    public Map<String, Object> parseSource(SchemaChangeEvent event) {
        Map<String, Object> source = new HashMap<>();
        Struct sourceInfo = event.getSource();
        String scn = sourceInfo.getString(SCN_KEY);
        source.put(SCN_KEY, scn);
        return source;
    }
}

@e-mhui
Copy link
Contributor Author

e-mhui commented Jul 18, 2023

@ruanhang1993 pls review it.

@e-mhui e-mhui changed the title Fix Oracle CDC parser schema change event failed [Oracle] Fix Oracle CDC parser schema change event failed Jul 18, 2023
Copy link
Contributor

@ruanhang1993 ruanhang1993 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e-mhui Thanks for the work.
Maybe Oracle cdc should provide a new dispatcher by self instead of modifying the dispatcher in connector-base. WDYT?
Maybe we could change some scope of methods for JdbcSourceEventDispatcher to reuse its code.

@e-mhui
Copy link
Contributor Author

e-mhui commented Jul 19, 2023

@e-mhui Thanks for the work.
Maybe Oracle cdc should provide a new dispatcher by self instead of modifying the dispatcher in connector-base. WDYT?
Maybe we could change some scope of methods for JdbcSourceEventDispatcher to reuse its code.

@ruanhang1993 Hi, I have refactored the code, please review it again.

  1. Use the schemaChangeEventHandler.parseSource(event) interface to handle different CDC sources. The construction of the source mainly refers to the following links:
  1. Use io.debezium.connector.*.SchemaChangeKey to determine the SchemaChangeEvent.

@gtk96
Copy link
Contributor

gtk96 commented Jul 26, 2023

@e-mhui please run mvn spotless:apply fix code style,thanks

@e-mhui
Copy link
Contributor Author

e-mhui commented Jul 26, 2023

@e-mhui please run mvn spotless:apply fix code style,thanks

Thanks, it has done.

@gtk96
Copy link
Contributor

gtk96 commented Aug 30, 2023

@e-mhui please refer https://github.com/ververica/flink-cdc-connectors/blob/master/README.md#code-style fix your code style to pass ci check

@e-mhui
Copy link
Contributor Author

e-mhui commented Aug 30, 2023

@e-mhui please refer https://github.com/ververica/flink-cdc-connectors/blob/master/README.md#code-style fix your code style to pass ci check

@gtk96 I had been fixed the check style, can you rerun the workflow ?

@gtk96
Copy link
Contributor

gtk96 commented Sep 1, 2023

cc @ruanhang1993

@e-mhui
Copy link
Contributor Author

e-mhui commented Oct 18, 2023

@ruanhang1993 pls review it.

@cainbit
Copy link

cainbit commented Oct 30, 2023

@ruanhang1993 Could you please help to review this pr? Many thanks.

@ruanhang1993
Copy link
Contributor

@e-mhui Hi. We need to add some tests to test this bug.
Please tell me when you need to review again. Thanks.

@e-mhui
Copy link
Contributor Author

e-mhui commented Feb 21, 2024

@e-mhui Hi. We need to add some tests to test this bug. Please tell me when you need to review again. Thanks.

@ruanhang1993 Hi, pls review again.

@gong gong mentioned this pull request Feb 22, 2024
2 tasks
@ruanhang1993 ruanhang1993 merged commit ea7b0e8 into apache:master Feb 28, 2024
17 checks passed
@klion26
Copy link
Member

klion26 commented Feb 29, 2024

thanks @e-mhui for contributing and @ruanhang1993 for merging this!

aiwenmo pushed a commit to aiwenmo/flink-cdc-connectors that referenced this pull request Mar 6, 2024
@stardustman
Copy link

stardustman commented Mar 14, 2024

SchemaChangeEventHandler,这个接口在哪个版本里,我找了一下,最新版的flink-cdc-base 3.0.0 没有看到相关代码。是需要自己修改源码自己编译吗

oracle 表结构变动报错,用到oracle-cdc 3.0包,我这边也报错,请问你这边解决了吗

This PR will fix it.

I did't find handler packages in the /com/ververica/cdc/connectors/base/relational and the flink-connector-oracle-cdc also not found, how to deal with the schema changes myself using the release-3.0? thanks a lot.@e-mhui

@e-mhui
Copy link
Contributor Author

e-mhui commented Mar 14, 2024

@stardustman @Jetol The latest release 3.0.1 was packaged on Jan 22 and has not fixed this issue. If you want to use it in advance, you can build it yourself from the master branch. Otherwise, you will have to wait for the next release version.

package command:

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

@stardustman
Copy link

stardustman commented Mar 15, 2024

@stardustman @Jetol The latest release 3.0.1 was packaged on Jan 22 and has not fixed this issue. If you want to use it in advance, you can build it yourself from the master branch. Otherwise, you will have to wait for the next release version.

package command:

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

hello, I'm a new user of the flink-cdc, I have get latest package using master branch. and import the dependecyies. will the OracleSchemaChangeEventHandler work automatically or I need other configs to enable it? thanks a lot. @e-mhui

@e-mhui
Copy link
Contributor Author

e-mhui commented Mar 15, 2024

@stardustman @Jetol The latest release 3.0.1 was packaged on Jan 22 and has not fixed this issue. If you want to use it in advance, you can build it yourself from the master branch. Otherwise, you will have to wait for the next release version.
package command:

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

hello, I'm a new user of the flink-cdc, I have get latest package using master branch. and import the dependecyies. will the OracleSchemaChangeEventHandler work automatically or I need other configs to enable it? thanks a lot. @e-mhui

@stardustmanYes, it will work automatically. However, you may need to configure some parameters to capture DDL events.

debezium.database.history.store.only.captured.tables.ddl = true

You can also refer to:

  1. https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/cdc-connectors/oracle-cdc/
  2. https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-property-database-history-store-only-captured-tables-ddl

@stardustman
Copy link

debezium.database.history.store.only.captured.tables.ddl

I read the doc docs/connectors/cdc-connectors/oracle-cdc/#connector-options. is it the config belongs to debezium should be started with "debezium" when using the flink cdc? but I find the demo code cdc-connectors/oracle-cdc/#datastream-source
this line debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); of the main method, log.mining.strategy didn't start with the debezium, after read the log.mining.strategy it seems the property is belongs to debezium, and the log.mining.strategy also need to be set redo_log_catalog to capture the DDL?

 public static void main(String[] args) throws Exception {
        Properties debeziumProperties = new Properties();
        debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); 

will the "log.mining.strategy" need to start with "debezium"? thanks a lot for bothering you many times😄. @e-mhui

@e-mhui
Copy link
Contributor Author

e-mhui commented Mar 15, 2024

debezium.database.history.store.only.captured.tables.ddl

I read the doc docs/connectors/cdc-connectors/oracle-cdc/#connector-options. is it the config belongs to debezium should be started with "debezium" when using the flink cdc? but I find the demo code cdc-connectors/oracle-cdc/#datastream-source this line debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); of the main method, log.mining.strategy didn't start with the debezium, after read the log.mining.strategy it seems the property is belongs to debezium, and the log.mining.strategy also need to be set redo_log_catalog to capture the DDL?

 public static void main(String[] args) throws Exception {
        Properties debeziumProperties = new Properties();
        debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); 

will the "log.mining.strategy" need to start with "debezium"? thanks a lot for bothering you many times😄. @e-mhui

It is a parameter of debezium, so you should add the prefix debezium..

image

stardustman added a commit to stardustman/flink-cdc that referenced this pull request Mar 15, 2024
fix error the "log.mining.strategy" should be start with prefix "debezium." according to https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/cdc-connectors/oracle-cdc/#connector-options and the help from @e-mhui with the link apache#2315 (comment)
abatorQ pushed a commit to abatorQ/flink-cdc that referenced this pull request Mar 18, 2024
@stardustman
Copy link

SchemaChangeEventHandler,这个接口在哪个版本里,我找了一下,最新版的flink-cdc-base 3.0.0 没有看到相关代码。是需要自己修改源码自己编译吗

oracle 表结构变动报错,用到oracle-cdc 3.0包,我这边也报错,请问你这边解决了吗

你好,你是实现 DebeziumDeserializationSchema 接口自定义 Deserialization 来接收数据的么? 在这里是怎样区分 DDL 和实际表的数据的呢?有没有脱敏的 demo 可以发一下?谢谢!@Jetol @e-mhui

joyCurry30 pushed a commit to joyCurry30/flink-cdc-connectors that referenced this pull request Mar 22, 2024
@mdzz9527
Copy link

合并到哪个版本了呢

@shangeyao
Copy link

合并到哪个版本了呢

3.1

wuzhenhua01 pushed a commit to wuzhenhua01/flink-cdc-connectors that referenced this pull request Aug 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] [Oracle] Fix Oracle CDC cannot capture newly added tables during task running