Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c0a2665
added metadat and data path in case of dynamic routing
Aug 9, 2025
67619ec
spotless
Aug 9, 2025
6b15ae4
Revert "spotless"
Aug 9, 2025
8398e4c
Revert "added metadat and data path in case of dynamic routing"
Aug 9, 2025
fbf52a9
Merge branch 'apache:main' into main
kumarpritam863 Sep 12, 2025
c92ec66
Merge branch 'apache:main' into main
kumarpritam863 Sep 16, 2025
9392a6d
Merge branch 'apache:main' into main
kumarpritam863 Oct 29, 2025
ecd8b55
Merge branch 'apache:main' into main
kumarpritam863 Nov 12, 2025
5e76e04
Merge branch 'apache:main' into main
kumarpritam863 Nov 20, 2025
a1ec7e6
Merge branch 'apache:main' into main
kumarpritam863 Nov 24, 2025
4eaf70b
Merge branch 'apache:main' into main
kumarpritam863 Nov 27, 2025
1508513
Merge branch 'apache:main' into main
kumarpritam863 Dec 9, 2025
e5908c8
Merge branch 'apache:main' into main
kumarpritam863 Jan 12, 2026
cbefe9a
Merge branch 'apache:main' into main
kumarpritam863 Feb 1, 2026
57d4667
Merge branch 'apache:main' into main
kumarpritam863 Feb 5, 2026
ee658ea
Merge branch 'apache:main' into main
kumarpritam863 Feb 18, 2026
7c5976d
Merge branch 'apache:main' into main
kumarpritam863 Feb 21, 2026
8a9654f
Merge branch 'apache:main' into main
kumarpritam863 Mar 14, 2026
888e659
Merge branch 'apache:main' into main
kumarpritam863 Apr 3, 2026
a6b797b
Used originalTopic, originalOffset, originalPartition as these can be…
Apr 3, 2026
c483a53
added integration tests for dynmicRouting when source topic is mutate…
Apr 3, 2026
7abff37
fixed spotless
Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -59,6 +62,56 @@ public void testIcebergSink(String branch) {
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
}

/**
* Verifies dynamic routing works when topic-rewriting SMTs (e.g. RegexRouter) change
* record.topic(). Before the fix, SinkWriter tracked offsets under the rewritten topic, causing a
* mismatch with context.assignment() and preventing proper offset commits.
*/
@Test
public void testDynamicRouteWithTopicRewritingSMT() {
String smtTable = "smttbl";
TableIdentifier smtTableId = TableIdentifier.of(TEST_DB, smtTable);
catalog().createTable(smtTableId, TestEvent.TEST_SCHEMA);

try {
// RegexRouter rewrites topic to "test.smttbl", then InsertField copies
// record.topic() (now "test.smttbl") into field "srcTopic", and dynamic
// routing uses "srcTopic" to pick the destination table.
KafkaConnectUtils.Config connectorConfig =
createCommonConfig(false)
.config("iceberg.tables.dynamic-enabled", true)
.config("iceberg.tables.route-field", "srcTopic")
.config("transforms", "rewriteTopic,insertTopic")
.config(
"transforms.rewriteTopic.type", "org.apache.kafka.connect.transforms.RegexRouter")
.config("transforms.rewriteTopic.regex", ".*")
.config("transforms.rewriteTopic.replacement", TEST_DB + "." + smtTable)
.config(
"transforms.insertTopic.type",
"org.apache.kafka.connect.transforms.InsertField$Value")
.config("transforms.insertTopic.topic.field", "srcTopic");

context().connectorCatalogProperties().forEach(connectorConfig::config);
context().startConnector(connectorConfig);

send(testTopic(), new TestEvent(1, "type1", Instant.now(), "hello"), false);
send(testTopic(), new TestEvent(2, "type2", Instant.now(), "world"), false);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> assertSnapshotAdded(List.of(smtTableId)));

List<DataFile> files = dataFiles(smtTableId, null);
assertThat(files).hasSizeBetween(1, 2);
assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2);
assertSnapshotProps(smtTableId, null);
} finally {
catalog().dropTable(smtTableId);
}
}

@Override
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
return createCommonConfig(useSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ private void save(SinkRecord record) {
record.timestamp() == null
? null
: OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC);
// use the original topic and partition to track offsets, as SMTs may have changed
// record.topic() and record.kafkaPartition() (e.g. RegexRouter). The framework's
// context.assignment() and consumer offset management use the original values.
sourceOffsets.put(
new TopicPartition(record.topic(), record.kafkaPartition()),
new Offset(record.kafkaOffset() + 1, timestamp));
new TopicPartition(record.originalTopic(), record.originalKafkaPartition()),
new Offset(record.originalKafkaOffset() + 1, timestamp));

if (config.dynamicTablesEnabled()) {
routeRecordDynamically(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,74 @@ public void testDynamicRoute() {
assertThat(writerResult.tableReference().identifier()).isEqualTo(TABLE_IDENTIFIER);
}

@Test
public void testOffsetTrackedByOriginalTopicPartition() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.dynamicTablesEnabled()).thenReturn(true);
when(config.tablesRouteField()).thenReturn(ROUTE_FIELD);

IcebergWriterResult writeResult =
new IcebergWriterResult(
TableIdentifier.parse(TABLE_NAME),
ImmutableList.of(mock(DataFile.class)),
ImmutableList.of(),
Types.StructType.of());
IcebergWriter writer = mock(IcebergWriter.class);
when(writer.complete()).thenReturn(ImmutableList.of(writeResult));

IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class);
when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer);

SinkWriter sinkWriter = new SinkWriter(catalog, config);

// simulate a record that has been transformed by RegexRouter (topic changed)
String originalTopic = "orders";
int originalPartition = 0;
long originalOffset = 42L;
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);

SinkRecord original =
new SinkRecord(
originalTopic,
originalPartition,
null,
"key",
null,
ImmutableMap.of(ROUTE_FIELD, TABLE_IDENTIFIER.toString()),
originalOffset,
now.toEpochMilli(),
TimestampType.LOG_APPEND_TIME);

// RegexRouter changes the topic via newRecord
String transformedTopic = "tmp.dynamic_orders";
SinkRecord transformed =
original.newRecord(
transformedTopic,
originalPartition,
original.keySchema(),
original.key(),
original.valueSchema(),
original.value(),
original.timestamp());

sinkWriter.save(ImmutableList.of(transformed));
SinkWriterResult result = sinkWriter.completeWrite();

// offsets must be keyed by the ORIGINAL topic, not the transformed one
Offset offset =
result.sourceOffsets().get(new TopicPartition(originalTopic, originalPartition));
assertThat(offset).isNotNull();
assertThat(offset.offset()).isEqualTo(originalOffset + 1);
assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));

// the transformed topic key should NOT be present
Offset wrongOffset =
result.sourceOffsets().get(new TopicPartition(transformedTopic, originalPartition));
assertThat(wrongOffset).isNull();
}

@Test
public void testDynamicNoRoute() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
Expand Down
Loading