Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] 设置scanNewlyAddedTableEnabled(true)后,无法采集到新增的表 #2306

Closed
2 tasks done
edge535 opened this issue Jul 17, 2023 · 15 comments
Closed
2 tasks done
Labels
bug Something isn't working

Comments

@edge535
Copy link

edge535 commented Jul 17, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.16.1

Flink CDC version

2.4.0

Database and its version

mysql 5.7.38

Minimal reproduce step

  1. flinkCDC设置.scanNewlyAddedTableEnabled(true) 不支持运行过程中采集新增加的表
  2. flinkCDC注释.scanNewlyAddedTableEnabled(true) 不支持在扩大采集库的范围后从savepoint恢复
    我需要在实现,flinkCDC首次启动时,采集的库为test,并且支持运行过程中采集到新增加的表。增加一个被采集的库,如test2后,可以从savepoint重启

What did you expect to see?

flink code
val dataBaseList = "test" val tableList = "test.*" MySqlSource.builder() .scanNewlyAddedTableEnabled(true) .debeziumProperties(mergedDebeziumProperties(prop)) .hostname(address).port(port) .username(userName).password(password) .databaseList(dataBaseList).tableList(tableList) .deserializer(withFieldsDescSchema) .startupOptions(StartupOptions.initial()) .build()

  1. 如上,创建一个flinkCDC的mysql数据源,如果设置了.scanNewlyAddedTableEnabled(true) flinkCDC无法采集到新增的表
    eg:在flinkCDC启动前mysql中有table1,table2两张表,flinkCDC启动后可以正常采集到table1和table2中的数据。此时,新建一张表名为table3,并在table3中插入数据。flinkCDC采集不到table3中的数据
    如果将.scanNewlyAddedTableEnabled(true) 注释掉,则flinkCDC任务可以采集到table3中的数据。

2.如果我将
val dataBaseList = "test" val tableList = "test.*"
更改为
val dataBaseList = "test,test2" val tableList = "test.*,test.*"
意味我需要同时采集test和test2库中所有表的数据,此时如果注释了.scanNewlyAddedTableEnabled(true) ,并从savepoint重启,则会报错
io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Encountered change event 'Event{header=EventHeaderV4{timestamp=1689577430000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=144, nextPosition=198630518, flags=0}, data=TableMapEventData{tableId=1233, database='test2', table='fire_spot_output', columnTypes=8, 8, 3, 10, 10, 8, 15, 8, 15, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, -10, 8, 18, 18, 8, 3, -10, -10, -10, columnMetadata=0, 0, 0, 0, 0, 0, 512, 0, 64, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 2580, 0, 0, 0, 0, 0, 2580, 2580, 2580, columnNullability={1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35}, eventMetadata=null}}' at offset {transaction_id=null, ts_sec=1689577431, file=mysql-bin.006103, pos=195573871, server_id=1, event=3742} for table tj-fire-electricity.fire_spot_output whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case. Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=198630355 --stop-position=198630518 --verbose mysql-bin.006103

我期望看到的是,设置了.scanNewlyAddedTableEnabled(true) 之后,flinkCDC可以在运行过程中采集到新增的表,并且在修改了databaseList增加了采集的库的范围之后可以从savepoint恢复。

What did you see instead?

设置了.scanNewlyAddedTableEnabled(true) 之后,flinkCDC可以在运行过程中不能采集到新增的表
不设置该参数反而可以在运行过程中采集到新增的表,但如果不注释该参数,则在扩大了采集库的范围后,不能从savapoint恢复,报错信息如上描述

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@edge535 edge535 added the bug Something isn't working label Jul 17, 2023
@lss1231
Copy link

lss1231 commented Jul 25, 2023

is this a bug? or design to be like this

@Level1Accelerator
Copy link

以前的版本是可以采集到.tableList(db.*)下新增的表的,2.4.1不行了,改批了。。

@z070204z
Copy link

2.4.1同样有这个BUG,我也采集不到.tableList(test.*)的新表,我只能回滚版本了

@z070204z
Copy link

我定位到了问题,在BinlogSplitReader,看样子是特意这样设计的
// Use still need to capture new sharding table if user disable scan new added table,
// The history records for all new added tables(including sharding table and normal table)
// will be capture after restore from a savepoint if user enable scan new added table

image

@z070204z
Copy link

我删掉了判断逻辑,目前测了一下,似乎一切正常了
image

@edge535
Copy link
Author

edge535 commented Aug 22, 2023

is this a bug? or design to be like this

i think that is a bug, It does not conform to the api definition

@edge535
Copy link
Author

edge535 commented Aug 22, 2023

以前的版本是可以采集到.tableList(db.*)下新增的表的,2.4.1不行了,改批了。。

这样设计首先不符合直觉,其次不合符api的定义

@edge535
Copy link
Author

edge535 commented Aug 22, 2023

2.4.1同样有这个BUG,我也采集不到.tableList(test.*)的新表,我只能回滚版本了

我也是回滚版本了,而且看其他人说专门是这样设计的,可能以后不能升级版本了

@edge535
Copy link
Author

edge535 commented Aug 22, 2023

我删掉了判断逻辑,目前测了一下,似乎一切正常了 image

感谢,我也试试

@z070204z
Copy link

2.4.1同样有这个BUG,我也采集不到.tableList(test.*)的新表,我只能回滚版本了

我也是回滚版本了,而且看其他人说专门是这样设计的,可能以后不能升级版本了

你在哪儿看到说是专门这样设计的?我在那个改动下问了,但没回我
60eee3a

@z070204z
Copy link

2.4.1同样有这个BUG,我也采集不到.tableList(test.*)的新表,我只能回滚版本了

我也是回滚版本了,而且看其他人说专门是这样设计的,可能以后不能升级版本了

如果真的是就这样设计不改了,那我也只好fork了,哎,麻烦,我不喜欢fork下来改

@z070204z
Copy link

z070204z commented Aug 22, 2023

哈哈,GPT-4是这样分析的:

这段代码的改动主要是对动态添加的表的捕获逻辑进行了调整。以下是对这段代码改动的分析:

原始代码:

1.注释提到了一个问题:在某些情况下,可能无法捕获新添加的表。这种情况发生的概率很小,是在枚举中发现已捕获的表之后和所有表分割的最低binlog偏移之前,动态地添加了表。这个时间间隔应该很短,所以目前不支持这种情况。
代码逻辑是:如果maxSplitHighWatermarkMap不包含tableId且capturedTableFilter包含tableId,则返回true。
修改后的代码:

2.新的注释提到,即使用户禁用了扫描新添加的表,仍然需要捕获新的分片表。如果用户启用了扫描新添加的表,那么从保存点恢复后,所有新添加的表(包括分片表和普通表)的历史记录都会被捕获。
新的代码逻辑首先检查是否启用了扫描新添加的表的配置。如果没有启用,那么对于没有历史记录的新添加的分片表,逻辑与原始代码相同。如果启用了,直接返回false。

改动原因分析:

1.在原始代码中,对于新添加的表的捕获逻辑是固定的,不考虑用户的配置。
2.在修改后的代码中,增加了对用户配置的考虑。如果用户禁用了扫描新添加的表,那么仍然会捕获新的分片表,但不会捕获其他新添加的表。如果用户启用了扫描新添加的表,那么在从保存点恢复后,所有新添加的表的历史记录都会被捕获。
3.这个改动可能是为了提供更灵活的配置选项,让用户可以根据自己的需求来决定是否捕获新添加的表。

总之,这个代码改动的主要原因是为了提供更多的配置选项,以满足不同用户的需求,并对新添加的表的捕获逻辑进行了优化和调整。

@lss1231
Copy link

lss1231 commented Aug 22, 2023 via email

@loserwang1024
Copy link
Contributor

Maybe can add scan.table-discovery.interval like kafka connector to fink new table periodlly, no need restart job.

@ruanhang1993 ruanhang1993 added this to the V3.1.0 milestone Jan 8, 2024
@ruanhang1993
Copy link
Contributor

Considering collaboration with developers around the world, please re-create your issue in English on Apache Jira under project Flink with component tag 'Flink CDC'. Thanks.

为了便于各个国家的开发者合作,请用英文在 Apache Jira 的 Flink项目下重新提交问题,同时将 issue 的 Component 标注为 Flink CDC。谢谢。

@loserwang1024 Would you like to help to create an issue about this problem? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants