Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the problem where binlog position is not updated when capturing DDL #925

Merged
merged 1 commit into from
Mar 11, 2022

Conversation

qidian99
Copy link
Contributor

No description provided.

@leonardBang
Copy link
Contributor

Thanks @qidian99 for the contribution, LGTM,but we should give a PR description for other developers/users can get your point when they check this PR.

@leonardBang leonardBang merged commit 64e3408 into apache:master Mar 11, 2022
@ldwnt
Copy link

ldwnt commented Dec 28, 2022

I applied the fix of https://github.com/ververica/flink-cdc-connectors/pull/925/files/2829d2d6d50a9eadf26f928185d636b570f26497 on branch 2.1. The binlog position is actually updated with any DDL. However, if the job is restarted, it still consumes the last ddl. Please see to the logs below (lines starting with @@ are the logs I add), the binlog position is 1021282711
before job stop:

2022-12-28 22:08:30,384 INFO  [Source: @s -> @p -> (Sink: @ds, Sink: @ls) (1/1)#0] com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [92] - @@ schema change, position: 1021282711@mysql-bin.000153, element: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1672236510, file=mysql-bin.000153, pos=1021282711, gtids=1b2fd828-7496-11ed-bef5-00163e1487f9:1-28723416,92a63284-9cf9-11eb-87d8-00163e147fb0:1-78665182,a64821b2-7c37-11ed-8e09-00163e1fb04f:1-24240622, server_id=1}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=mdm_source}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1672236510261,db=mdm_source,table=b,server_id=1,gtid=a64821b2-7c37-11ed-8e09-00163e1fb04f:24240623,file=mysql-bin.000153,pos=1021282584,row=0},historyRecord={"source":{"file":"mysql-bin.000153","pos":1021282584,"server_id":1},"position":{"transaction_id":null,"ts_sec":1672236510,"file":"mysql-bin.000153","pos":1021282711,"gtids":"1b2fd828-7496-11ed-bef5-00163e1487f9:1-28723416,92a63284-9cf9-11eb-87d8-00163e147fb0:1-78665182,a64821b2-7c37-11ed-8e09-00163e1fb04f:1-24240622","server_id":1},"databaseName":"mdm_source","ddl":"alter table b comment = '5'","tableChanges":[{"type":"ALTER","id":"\"mdm_source\".\"b\"","table":{"defaultCharsetName":"utf8mb4","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"position":1,"optional":false,"autoIncremented":false,"generated":false},{"name":"name","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"position":2,"optional":true,"autoIncremented":false,"generated":false}]}}]}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

after job restart:

2022-12-28 22:11:11,449 INFO  [blc-mysql-sit.deepq.tech:3306] io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1171] - Connected to MySQL binlog at mysql-sit.deepq.tech:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000153, currentBinlogPosition=1021282711, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], partition={server=mysql_binlog_source}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=1b2fd828-7496-11ed-bef5-00163e1487f9:1-28723416,92a63284-9cf9-11eb-87d8-00163e147fb0:1-78665182,a64821b2-7c37-11ed-8e09-00163e1fb04f:1-24240622, currentGtidSet=1b2fd828-7496-11ed-bef5-00163e1487f9:1-28723416,92a63284-9cf9-11eb-87d8-00163e147fb0:1-78665182,a64821b2-7c37-11ed-8e09-00163e1fb04f:1-24240622, restartBinlogFilename=mysql-bin.000153, restartBinlogPosition=1021282711, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null]
2022-12-28 22:11:11,451 INFO  [blc-mysql-sit.deepq.tech:3306] io.debezium.util.Threads [287] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2022-12-28 22:11:11,452 INFO  [debezium-reader-0] io.debezium.connector.mysql.MySqlStreamingChangeEventSource [909] - Waiting for keepalive thread to start
2022-12-28 22:11:11,553 INFO  [debezium-reader-0] io.debezium.connector.mysql.MySqlStreamingChangeEventSource [916] - Keepalive thread is running
2022-12-28 22:11:13,310 INFO  [Source: @s -> @p -> (Sink: @ds, Sink: @ls) (1/1)#0] com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [80] - @@ isSchemaChangeEvent element: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1672236672, file=mysql-bin.000153, pos=1021282711, gtids=1b2fd828-7496-11ed-bef5-00163e1487f9:1-28723416,92a63284-9cf9-11eb-87d8-00163e147fb0:1-78665182,a64821b2-7c37-11ed-8e09-00163e1fb04f:1-24240622, server_id=1}} ConnectRecord{topic='mysql_binlog_source', kafkaPartition=0, key=Struct{databaseName=mdm_source}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1672236672622,db=mdm_source,table=b,server_id=1,gtid=a64821b2-7c37-11ed-8e09-00163e1fb04f:24240623,file=mysql-bin.000153,pos=1021282584,row=0},historyRecord={"source":{"file":"mysql-bin.000153","pos":1021282584,"server_id":1},"position":{"transaction_id":null,"ts_sec":1672236672,"file":"mysql-bin.000153","pos":1021282711,"gtids":"1b2fd828-7496-11ed-bef5-00163e1487f9:1-28723416,92a63284-9cf9-11eb-87d8-00163e147fb0:1-78665182,a64821b2-7c37-11ed-8e09-00163e1fb04f:1-24240622","server_id":1},"databaseName":"mdm_source","ddl":"alter table b comment = '5'","tableChanges":[{"type":"ALTER","id":"\"mdm_source\".\"b\"","table":{"defaultCharsetName":"utf8mb4","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"position":1,"optional":false,"autoIncremented":false,"generated":false},{"name":"name","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"position":2,"optional":true,"autoIncremented":false,"generated":false}]}}]}}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants