Skip to content

Add start and end offset for each source topic partition in iceberg snapshot as a property#15206

Open
kumarpritam863 wants to merge 12 commits intoapache:mainfrom
kumarpritam863:add_start_end_source_offset_snapshot_prop
Open

Add start and end offset for each source topic partition in iceberg snapshot as a property#15206
kumarpritam863 wants to merge 12 commits intoapache:mainfrom
kumarpritam863:add_start_end_source_offset_snapshot_prop

Conversation

@kumarpritam863
Copy link
Contributor

Summary

This PR enhances the Kafka Connect sink to track offset ranges (start and end offsets) for each topic partition, providing better traceability of which source data was included in each Iceberg
snapshot.

Motivation

Previously, the connector only tracked a single offset value per partition, which represented the next offset to consume. This made it difficult to determine the exact range of data that was
committed in a particular snapshot. By tracking both start and end offsets, we can now:

  1. Precisely identify the range of Kafka messages included in each Iceberg snapshot
  2. Improve debugging and data lineage capabilities
  3. Enable better auditing and compliance tracking
  4. Facilitate data recovery and reprocessing scenarios
  5. Also this will enable switching to exact topic partition for Flink jobs if running in hybrid mode with model of iceberg -> kafka

Changes

Core Data Models

TopicPartitionOffset class:

  • Changed from single offset field to startOffset and endOffset fields
  • Updated Iceberg schema to include both offset fields
  • Field IDs: START_OFFSET = 10_702, END_OFFSET = 10_703, TIMESTAMP = 10_704
  • Added getter methods: startOffset() and endOffset()

Offset class:

  • Modified to track both startOffset and endOffset instead of a single offset
  • Added backward compatibility method offset() that returns endOffset
  • Updated constructor signature: Offset(Long startOffset, Long endOffset, OffsetDateTime timestamp)

Offset Tracking

SinkWriter class:

  • Enhanced save() method to track offset ranges per partition
  • For the first record in a partition: startOffset = currentOffset, endOffset = currentOffset + 1
  • For subsequent records: preserves original startOffset, updates endOffset = currentOffset + 1
  • Ensures accurate range tracking across multiple records in the same commit cycle

Worker class:

  • Updated to pass both startOffset and endOffset when creating TopicPartitionOffset objects

Snapshot Metadata

CommitState class:

  • Added topicPartitionOffsets() method to extract all topic partition offsets from ready buffer
  • Returns complete offset information for the current commit

Coordinator class:

  • Added new snapshot property: kafka.connect.topic-partition-offsets
  • Implemented topicPartitionOffsetsToJson() to serialize offset ranges to JSON
  • JSON format includes: topic, partition, startOffset, endOffset, and timestamp for each partition
  • Updated commitToTable() to store topic partition offsets in both append and delta operations

Example Snapshot Metadata

After this change, Iceberg snapshots will include metadata like:

{
  "kafka.connect.topic-partition-offsets": [
    {
      "topic": "events",
      "partition": 0,
      "startOffset": 100,
      "endOffset": 250,
      "timestamp": "2024-01-15T10:30:00Z"
    },
    {
      "topic": "events",
      "partition": 1,
      "startOffset": 50,
      "endOffset": 175,
      "timestamp": "2024-01-15T10:30:05Z"
    }
  ]
}

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the Kafka Connect sink to track offset ranges (start and end offsets) for each topic partition in Iceberg snapshots, improving data lineage and traceability capabilities.

Changes:

  • Modified core data models (Offset and TopicPartitionOffset) to track both start and end offsets instead of a single offset value
  • Updated offset tracking logic in SinkWriter to maintain offset ranges across multiple records in the same commit cycle
  • Added snapshot metadata property kafka.connect.topic-partition-offsets containing JSON-serialized offset ranges for all partitions

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated no comments.

Show a summary per file
File Description
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Offset.java Changed from single offset to startOffset/endOffset fields with backward compatibility method
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java Enhanced to track offset ranges per partition using compute() to preserve startOffset
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java Updated to pass both startOffset and endOffset when creating TopicPartitionOffset objects
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitState.java Added topicPartitionOffsets() method to extract offset information for current commit
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java Added JSON serialization of offset ranges and storage in snapshot metadata properties
kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java Updated schema to include start_offset and end_offset fields with adjusted field IDs
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java Updated test assertions to verify both startOffset and endOffset
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java Updated test to use new Offset constructor with startOffset and endOffset
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java Updated test to use new TopicPartitionOffset constructor with offset range
kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/TestEventSerialization.java Updated test data to include both startOffset and endOffset values

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant