Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data row is smaller than a column index, inter schema representation is probably out of sync with real database schema #732

Closed
Tracked by #1728
1105220927 opened this issue Dec 16, 2021 · 25 comments
Assignees
Labels
bug Something isn't working critical should fix before release

Comments

@1105220927
Copy link

1105220927 commented Dec 16, 2021

Describe the bug
error: data row is smaller than a column index, inter schema representation is probably out of sync with real database schema
image
Environment :

  • Flink version : 1.13.2
  • Flink CDC version: 1.4.0
  • Database and version: mysql 5.6.28

To Reproduce
这个实例的任务一启动就会触发报错。

尝试使用更高版本的flink-cdc(2.0.1/2.0.2/2.1.0),手动修改掉MysqlValidator中对mysql版本的限制,仍会触发该报错。
image

查看该位置的binlog情况,与表结构是对应的并没有什么异常:
企业微信截图_16396609055734

企业微信截图_16396609511911

对应的flink报错日志如图:
企业微信截图_16396611572081

麻烦大佬帮看下

@1105220927 1105220927 added the bug Something isn't working label Dec 16, 2021
@SEZ9
Copy link

SEZ9 commented Dec 20, 2021

这个问题有跟进吗? 使用原生debezium同样会遇到一样的问题。 有经验处理吗?

@1105220927
Copy link
Author

这个问题有跟进吗? 使用原生debezium同样会遇到一样的问题。 有经验处理吗?

不知道怎么处理, 我们估计要换成canal来做数据同步了。 我考虑是不是mysql版本的问题,你们也是用mysql 5.6吗?

@SEZ9
Copy link

SEZ9 commented Dec 20, 2021

这个问题有跟进吗? 使用原生debezium同样会遇到一样的问题。 有经验处理吗?

不知道怎么处理, 我们估计要换成canal来做数据同步了。 我考虑是不是mysql版本的问题,你们也是用mysql 5.6吗?

mysql 5.7 同样的问题。

@1105220927
Copy link
Author

这个问题有跟进吗? 使用原生debezium同样会遇到一样的问题。 有经验处理吗?

不知道怎么处理, 我们估计要换成canal来做数据同步了。 我考虑是不是mysql版本的问题,你们也是用mysql 5.6吗?

mysql 5.7 同样的问题。

使用flink-cdc 和 原生debezium都会遇到这个报错吗?

@1105220927
Copy link
Author

这个问题有跟进吗? 使用原生debezium同样会遇到一样的问题。 有经验处理吗?

不知道怎么处理, 我们估计要换成canal来做数据同步了。 我考虑是不是mysql版本的问题,你们也是用mysql 5.6吗?

mysql 5.7 同样的问题。

可以把你使用的环境信息以及报错信息提个bug, 看社区有没有开发者可以帮看下。

@wuchong
Copy link
Member

wuchong commented Dec 20, 2021

@1105220927 @SEZ9 完整的 flink sql 能贴一下吗? 作业是 initial 模式启动的么(先全量再增量)?

@1105220927
Copy link
Author

@1105220927 @SEZ9 完整的 flink sql 能贴一下吗? 作业是 initial 模式启动的么(先全量再增量)?

我这里是使用streaming api来写的,代码基本没什么额外的操作。 作业模式是schema_only.

        // 设置连接实例
        var instance = Instance.newInstance(jobConfig); // 数据库配置的封装
        var builder = MySQLSource.<ChangeLog>builder(); 
        builder.hostname(instance.host())
                .port(instance.port())
                .username(instance.username())
                .password(instance.password())
                .deserializer(new BinlogParser())
                .databaseList(instance.databases())
               .tableList(instance.tables());
        // 设置debezium参数
        Properties properties = new Properties();
        properties.setProperty("snapshot.locking.mode", "none");
        properties.setProperty("binary.handling.mode","base64");
        properties.setProperty("producer.max.request.size","16777216");
        properties.setProperty("database.history.producer.max.request.size","16777216");
        properties.setProperty("inconsistent.schema.handling.mode","warn");
        properties.setProperty("database.history.skip.unparseable.ddl","true");
        properties.setProperty("database.history.store.only.monitored.tables.ddl ","true");

        builder.startupOptions(StartupOptions.latest());
        builder.debeziumProperties(properties);
        streamEnv.addSource(builder.build())
                        .map(......)

@SEZ9
Copy link

SEZ9 commented Dec 20, 2021

@1105220927 @SEZ9 完整的 flink sql 能贴一下吗? 作业是 initial 模式启动的么(先全量再增量)?

我这边是直接用的debezium 配置 "snapshot.mode":"schema_only_recovery"
遇到binlog的一个点就报错
org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-12-20 07:19:30,367] ERROR WorkerSourceTask{id=klorderbizdb-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:366)
at io.debezium.connector.mysql.EventBuffer.completeTransaction(EventBuffer.java:201)
at io.debezium.connector.mysql.EventBuffer.add(EventBuffer.java:110)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$26(MySqlStreamingChangeEventSource.java:859)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 9 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1639537800, file=mysql-bin-changelog.203654, pos=2872, gtids=2b1cd284-1e54-11ea-953c-069aa761c8fc:556456946-556456947, row=1, server_id=834342492, event=2}
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:255)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:688)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:745)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:686)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$15(MySqlStreamingChangeEventSource.java:830)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:349)
... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:70)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:46)
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:218)
... 13 more

@wuchong
Copy link
Member

wuchong commented Dec 20, 2021

@1105220927 @SEZ9 这个问题在重启后是稳定复现的吗?

可以尝试database.history.store.only.monitored.tables.ddl 改成 false 看看。

@SEZ9
Copy link

SEZ9 commented Dec 20, 2021

@1105220927 @SEZ9 这个问题在重启后是稳定复现的吗?

可以尝试database.history.store.only.monitored.tables.ddl 改成 false 看看。
目前遇到几个表小概率出现的问题,只要到那几个表的更改binlog pos ,就一定会报错。。
我这边尝试 加了一些debezium的配置参数 试图绕过问题,但没有起作用。
"database.history.skip.unparseable.ddl":true,
"inconsistent.schema.handling.mode":"warn"
"event.deserialization.failure.handling.mode":"ignore",
"database.history.store.only.monitored.tables.ddl":true,

@SEZ9
Copy link

SEZ9 commented Dec 20, 2021

@1105220927 @SEZ9 这个问题在重启后是稳定复现的吗?

可以尝试database.history.store.only.monitored.tables.ddl 改成 false 看看。

目前只能通过配置 table.include.list 把有问题的表去除掉。。。

@1105220927
Copy link
Author

@1105220927 @SEZ9 这个问题在重启后是稳定复现的吗?

可以尝试database.history.store.only.monitored.tables.ddl 改成 false 看看。

是的。 对于有问题的db,启动必现。

@1105220927
Copy link
Author

@1105220927 @SEZ9 这个问题在重启后是稳定复现的吗?
可以尝试database.history.store.only.monitored.tables.ddl 改成 false 看看。
目前遇到几个表小概率出现的问题,只要到那几个表的更改binlog pos ,就一定会报错。。
我这边尝试 加了一些debezium的配置参数 试图绕过问题,但没有起作用。
"database.history.skip.unparseable.ddl":true,
"inconsistent.schema.handling.mode":"warn"
"event.deserialization.failure.handling.mode":"ignore",
"database.history.store.only.monitored.tables.ddl":true,

我这边的问题定位到了,不知道是不是适用你们的场景。 原因是修改了binlog_row_image为FULL后,需要重启所有的长连接,如果是从库的话,需要重启从库实例,才会生效。 否则虽然修改了参数,但binlog格式仍然有问题

@wuchong
Copy link
Member

wuchong commented Dec 24, 2021

@1105220927 修改binlog_row_image以后,不重启实例,show global variables like "binlog_row_image" 返回是 FULL 么?

@SEZ9
Copy link

SEZ9 commented Dec 24, 2021

@1105220927 @SEZ9 这个问题在重启后是稳定复现的吗?
可以尝试database.history.store.only.monitored.tables.ddl 改成 false 看看。
目前遇到几个表小概率出现的问题,只要到那几个表的更改binlog pos ,就一定会报错。。
我这边尝试 加了一些debezium的配置参数 试图绕过问题,但没有起作用。
"database.history.skip.unparseable.ddl":true,
"inconsistent.schema.handling.mode":"warn"
"event.deserialization.failure.handling.mode":"ignore",
"database.history.store.only.monitored.tables.ddl":true,

我这边的问题定位到了,不知道是不是适用你们的场景。 原因是修改了binlog_row_image为FULL后,需要重启所有的长连接,如果是从库的话,需要重启从库实例,才会生效。 否则虽然修改了参数,但binlog格式仍然有问题

还不是,我这边数据库一直都是 binlog_row_image | FULL ,有问题的是个别表。 目前产线有1000+表,有2张有问题。咨询了debezium 社区回复说' Just please bearin mind if the database history topic is recereated around a schema change then you have a problem as the unprocessed binlog can still contain the old schema.' 准备进一步排查下 存schema的那个topic信息

@1105220927
Copy link
Author

@1105220927 修改binlog_row_image以后,不重启实例,show global variables like "binlog_row_image" 返回是 FULL 么?

是的。 数据库参数是对的,但是update的binlog格式是这样的:
image
正常的binlog格式应该是这样的:
image

因为这个参数修改只对新连接生效,不知道之前修改参数的时候,哪里的流程出了问题,导致没有生效。

@1105220927
Copy link
Author

1105220927 commented Dec 24, 2021

@1105220927 @SEZ9 这个问题在重启后是稳定复现的吗?
可以尝试database.history.store.only.monitored.tables.ddl 改成 false 看看。
目前遇到几个表小概率出现的问题,只要到那几个表的更改binlog pos ,就一定会报错。。
我这边尝试 加了一些debezium的配置参数 试图绕过问题,但没有起作用。
"database.history.skip.unparseable.ddl":true,
"inconsistent.schema.handling.mode":"warn"
"event.deserialization.failure.handling.mode":"ignore",
"database.history.store.only.monitored.tables.ddl":true,

我这边的问题定位到了,不知道是不是适用你们的场景。 原因是修改了binlog_row_image为FULL后,需要重启所有的长连接,如果是从库的话,需要重启从库实例,才会生效。 否则虽然修改了参数,但binlog格式仍然有问题

还不是,我这边数据库一直都是 binlog_row_image | FULL ,有问题的是个别表。 目前产线有1000+表,有2张有问题。咨询了debezium 社区回复说' Just please bearin mind if the database history topic is recereated around a schema change then you have a problem as the unprocessed binlog can still contain the old schema.' 准备进一步排查下 存schema的那个topic信息

好吧,那可能还不是同一个原因。我这边是通过mysqlbinlog工具对比出问题的binlog position的具体日志格式发现的问题。

@kennywgx
Copy link

kennywgx commented May 9, 2023

I encountered the same problem, too. And I eventually figure out why it happends.

The error stack trace:

Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
	at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
	... 12 more

At the beginning, my table has only 2 columns (id, name) and i inserted some testing data, like (6, 'fff'). And then I start the binlog-sync task with specied position, before which I added 7 the columns to the table.

Debezium will get table schema by running show create table {target.table} when initializing. This SQL will fetch the newest table schema, which has 9 columns.
image

Next, it starts to read binlog rows from the specied position. The first row data from the specied position is (6, 'fff'). This row data was inserted before I adding the 7 columns , so it only have 2 columns (id, name). But debezium does not know my table schema before, it only knows the newest table schema since start. That means debezium cannot convert the data (6, 'fff') into 9 columns. Because they are not match.
image
image

@kennywgx
Copy link

kennywgx commented May 9, 2023

To avoid this propblem, we should reset the binlog position to time after we change the table columns.
image

@sdlcwangsong
Copy link

请问这个bug有人修复吗?我也遇到同样的问题了

@leonardBang
Copy link
Contributor

Thanks all for the detail feedback, there're two reasons lead to this issue:

  • 1.The table in mysql is modified by multiple sessions, some of these sessions set wrong binlog_row_image which produced un-parsable binlog data
  • 2.We only keep the latest(newest) table schema of the captured table, if the binlog comes from earlier phase e.g. before the table added a column, the binlog data length will be less than the newest schema column length, which also lead to this issue.

It's not easy to fix the issue right now.

For (1): What we can do is only to skip the un-parsable binlog data and limit all DB users do not change the binlog_row_image configuration
For (2): We need versioned table schema according different binlog positions, but there's a challenge how we track history schemas of a table?

@leonardBang leonardBang removed this from the V2.4.0 milestone Jun 14, 2023
@ruanhang1993 ruanhang1993 mentioned this issue Jun 27, 2023
5 tasks
@dingqianwen
Copy link
Contributor

image

try:

properties.setProperty("event.processing.failure.handling.mode", "skip");

@SML0127
Copy link
Contributor

SML0127 commented Oct 24, 2023

image

try:

properties.setProperty("event.processing.failure.handling.mode", "skip");

Applying that property will not cause an exception.
However, the update-related binlog still fails to parse, so that source table and target table out of sync😢

@ratneshsahu14
Copy link

Getting this error for Db2 connectors as well :

Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema

There has not been any change to the table schema but still fails.

@PatrickRen
Copy link
Contributor

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

@PatrickRen PatrickRen closed this as not planned Won't fix, can't repro, duplicate, stale Feb 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working critical should fix before release
Projects
None yet
Development

No branches or pull requests

10 participants