Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset acknowledgment - Add logic to persist/dump offset only after records are persisted to ClickHouse #230

Closed
subkanthi opened this issue Apr 28, 2023 · 4 comments · Fixed by #443
Assignees
Labels
GA-1 All the issues that are issues in release(Scheduled Dec 2023) high-priority qa-verified label to mark issues that were verified by QA

Comments

@subkanthi
Copy link
Collaborator

subkanthi commented Apr 28, 2023

Add logic to persist records to clickhouse and increment the offset


            this.engine = changeEventBuilder
                    .using(new DebeziumConnectorCallback()).notifying(new DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>>() {
                        @Override
                        public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records,
                                                DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
                            System.out.println("Handle Batch");
                            committer.markBatchFinished();

                        }
                    })

There is an event callback to get batch of records.
There are 2 pieces of logic that needs to be implemented.

  1. The commit policy has to be reviewed.
  2. The committer object should be passed to DbWriter so that it can be used to commit the offsets after the records are merged to ClickHouse.

From the documentation the committer object has to be acknowledged for every record.

/**

  • Handles a batch of records, calling the {@link RecordCommitter#markProcessed(SourceRecord)}
  • for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.
  • @param records the records to be processed
  • @param committer the committer that indicates to the system that we are finished
    */
    void handleBatch(List records, RecordCommitter committer) throws InterruptedException;

As mentioned in the Javadoc, the RecordCommitter object is to be called for each record and once each batch is finished.
The RecordCommitter interface is threadsafe, which allows for flexible processing of records.

@subkanthi subkanthi changed the title Add logic to persist records to clickhouse and increment the offset Offset acknowledgment - Add logic to persist/dump offset only after records are persisted to ClickHouse May 2, 2023
@subkanthi subkanthi self-assigned this May 2, 2023
@subkanthi
Copy link
Collaborator Author

@AlmostIvanSidorov
Test case:

  1. Start replication(Mysql, clickhouse, sink connector)
  2. observe offsets in Clickhouse table. (Atleast one row with the latest timestamp in record_insert_ts)
  3. Stop clickhouse.
  4. Insert new rows to MySQL.
  5. Start clickhouse, replication should resume.
  6. Only new rows should be inserted to ClickHouse, there should be any data loss or duplicate records.

@AlmostIvanSidorov
Copy link
Contributor

@subkanthi I have added tests (same scenario as you wrote and stress scenario with parallel restarts of ClickHouse and inserts, updates and deletes in MySQL. The final results of replication are correct for clickhouse_debezium_embedded:2023-04-25.

@subkanthi subkanthi added the GA-1 All the issues that are issues in release(Scheduled Dec 2023) label Jan 11, 2024
@subkanthi
Copy link
Collaborator Author

subkanthi commented Jan 26, 2024

debezium_batch drawio

lightweight_architecture drawio

Changes

  1. Every thread should make sure the complete debezium batch is copied over.
  2. Maintain a state variable of all batches that are currently handled by all threads.
   #Hashmap of Thread- name -> Min/Max offset.
   thread- 1 -> (1, 10),
   thread - 2 ->  (11, 20),

   current_offsets_in_flight = [(1, 10), (11, 20)];
   thread - 3 -> (21, 30) -> checks [(1, 10), (11, 20)] and finds that its not safe to commit so adds to the list.

   current_offsets_in_flight = [(1, 10), (11, 20), (21, 30)]

  1. If a faster thread finds out that it needs to wait for thread-2 to commit, retry in the next loop. Also stop adding more records from the common queue. - stop debezium event loop.

Multiple retries could also risk the queue getting bigger (as we keep adding records from Debezium)

  1. When every thread starts processing update the state variable.

  2. After every thread successfully commits the offset remove the batch record from the state variable.

@Selfeer
Copy link
Collaborator

Selfeer commented Feb 15, 2024

The issue was manually verified by the Altinity QA team and marked as qa-verified.

Build used for testing: altinityinfra/clickhouse-sink-connector:443-678d8ae2567c2c1d89f5430fa57ede619d6ea851-lt

We've validated that after processing batches, the replica_source_info table is updated with data and the position values are updated incrementally - Meaning that the position value always increases. There are no cases when it decreases in value and increases after that.

@Selfeer Selfeer added the qa-verified label to mark issues that were verified by QA label Feb 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
GA-1 All the issues that are issues in release(Scheduled Dec 2023) high-priority qa-verified label to mark issues that were verified by QA
Projects
None yet
3 participants