Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-92] fix bug that flink never flush lsn to PG #107

Closed
wants to merge 1 commit into from

Conversation

eric3zhao
Copy link
Contributor

  1. support debezium heartbeat.interval.ms option
  2. flush lsn to pg after offset was writen to checkpoint

@wuchong
Copy link
Member

wuchong commented Feb 24, 2021

Hi @eric3zhao , did you validate whether this PR works well in your production envrionment? Is it possible to add a test to cover this?

Besides, could you drop the first commit which has been merged into master?

@eric3zhao
Copy link
Contributor Author

@wuchong I have not validate this in PRD environment, I only do some test in DEV. test case is:
after method DebeziumSourceFunction#snapshotState is completed

  • step.1

read offset info from checkpoint "sourceOffset":{"transaction_id":null,"lsn_proc":358680102240,"lsn":358680102240,"ts_usec":1614064954691947}

  • step.2

use SELECT * from pg_get_replication_slots() get slot info from PG

slot_name plugin slot_type datoid temporary active active_pid xmin catalog_xmin restart_lsn confirmed_flush_lsn
modelhome_zwz_test_book decoderbufs logical 17497 f f 1394055 53/7E0016A8 53/83000560
  • step.3

compare sourceOffset.lsn_proc to confirmed_flush_lsn

358680102240 -> 0x5383000560 -> 53/83000560 os offset in checkpoint equals to confirmed_flush_lsn in PG

I also want to write a unit test for this, but I have no idea how to compare the offset in the checkpoint with the lsn in the database in java code

last, I checkout the branch from tag: release-1.1.0 far behind master now, I will rebase my fix branch to master

@wuchong
Copy link
Member

wuchong commented Feb 25, 2021

Please do not use "git merge" to rebase branches, otherwise the changes is hard to track. Please use "git rebase" instead.

@eric3zhao
Copy link
Contributor Author

sorry, but I have already push the commits to the remote, should I create a new PR based on master?

@wuchong
Copy link
Member

wuchong commented Feb 25, 2021

You can force push the branch .

…and flush lsn to pg after offset was writen to checkpoint
@eric3zhao
Copy link
Contributor Author

You can force push the branch .

done

@Tan-JiaLiang
Copy link

that's great! i found the same situation in my postgres database in these days, it is luck for me to found this PR, i hope this PR will merge into master as soon as possible, thanks for the commitors @eric3zhao @wuchong

@wuchong
Copy link
Member

wuchong commented Feb 26, 2021

Thanks for the contribution @eric3zhao . However, I think there are 3 problem in the PR:

  1. it's very hack to get and invoke on ChangeEventSourceCoordinator. Actually, Debezium provides API to commit offsets by io.debezium.engine.DebeziumEngine.RecordCommitter.
  2. we should commit offsets when checkpoint complete instead of performing checkpoint. Otherwise, we may lose data if the checkpoint is failed.
  3. there is no tests to cover this case.

I have fixed the problem in 4127661.

@wuchong wuchong closed this Feb 26, 2021
@eric3zhao
Copy link
Contributor Author

eric3zhao commented Feb 27, 2021

great job, but I recommend update debeziumOffset to the heartbeat‘s offset when received heartbeat record, not just skip it

if (isHeartbeatEvent(record)) {
 emitRecordsUnderCheckpointLock(new ArrayDeque<>(), record.sourcePartition(), record.sourceOffset());					
  continue;
}

because, if skip heartbeat record, offset is updated only when the table data is updated. Suppose I set 'table-name' = 'table_A' then if table_A has on data updated for a long time the WAL disk space keep growing. However, if the data of any table in the PG database is updated, the offset in heartbeat record will be updated.

@wuchong
Copy link
Member

wuchong commented Feb 27, 2021

@eric3zhao , you are right, otherwise the LSN can't be advanced if there is no updates. But we only need to update debeziumOffset under checkpoint lock instead of emitRecordsUnderCheckpointLock.

Do you want to contribute a pull request?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support ' heartbeat.interval​.ms' options to reduce WAL disk space consumption of PG
3 participants