diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegrationDynamicTable.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegrationDynamicTable.java index 65bbcde9dfed..1d3d71a54152 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegrationDynamicTable.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegrationDynamicTable.java @@ -20,11 +20,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.Duration; import java.time.Instant; import java.util.List; import org.apache.iceberg.DataFile; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; @@ -59,6 +62,56 @@ public void testIcebergSink(String branch) { assertSnapshotProps(TABLE_IDENTIFIER2, branch); } + /** + * Verifies dynamic routing works when topic-rewriting SMTs (e.g. RegexRouter) change + * record.topic(). Before the fix, SinkWriter tracked offsets under the rewritten topic, causing a + * mismatch with context.assignment() and preventing proper offset commits. + */ + @Test + public void testDynamicRouteWithTopicRewritingSMT() { + String smtTable = "smttbl"; + TableIdentifier smtTableId = TableIdentifier.of(TEST_DB, smtTable); + catalog().createTable(smtTableId, TestEvent.TEST_SCHEMA); + + try { + // RegexRouter rewrites topic to "test.smttbl", then InsertField copies + // record.topic() (now "test.smttbl") into field "srcTopic", and dynamic + // routing uses "srcTopic" to pick the destination table. + KafkaConnectUtils.Config connectorConfig = + createCommonConfig(false) + .config("iceberg.tables.dynamic-enabled", true) + .config("iceberg.tables.route-field", "srcTopic") + .config("transforms", "rewriteTopic,insertTopic") + .config( + "transforms.rewriteTopic.type", "org.apache.kafka.connect.transforms.RegexRouter") + .config("transforms.rewriteTopic.regex", ".*") + .config("transforms.rewriteTopic.replacement", TEST_DB + "." + smtTable) + .config( + "transforms.insertTopic.type", + "org.apache.kafka.connect.transforms.InsertField$Value") + .config("transforms.insertTopic.topic.field", "srcTopic"); + + context().connectorCatalogProperties().forEach(connectorConfig::config); + context().startConnector(connectorConfig); + + send(testTopic(), new TestEvent(1, "type1", Instant.now(), "hello"), false); + send(testTopic(), new TestEvent(2, "type2", Instant.now(), "world"), false); + flush(); + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> assertSnapshotAdded(List.of(smtTableId))); + + List files = dataFiles(smtTableId, null); + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(smtTableId, null); + } finally { + catalog().dropTable(smtTableId); + } + } + @Override protected KafkaConnectUtils.Config createConfig(boolean useSchema) { return createCommonConfig(useSchema) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index f81155e13777..48a01881935b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -75,9 +75,12 @@ private void save(SinkRecord record) { record.timestamp() == null ? null : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC); + // use the original topic and partition to track offsets, as SMTs may have changed + // record.topic() and record.kafkaPartition() (e.g. RegexRouter). The framework's + // context.assignment() and consumer offset management use the original values. sourceOffsets.put( - new TopicPartition(record.topic(), record.kafkaPartition()), - new Offset(record.kafkaOffset() + 1, timestamp)); + new TopicPartition(record.originalTopic(), record.originalKafkaPartition()), + new Offset(record.originalKafkaOffset() + 1, timestamp)); if (config.dynamicTablesEnabled()) { routeRecordDynamically(record); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index 6baf72117d04..09f7a373d5f2 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -153,6 +153,74 @@ public void testDynamicRoute() { assertThat(writerResult.tableReference().identifier()).isEqualTo(TABLE_IDENTIFIER); } + @Test + public void testOffsetTrackedByOriginalTopicPartition() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.dynamicTablesEnabled()).thenReturn(true); + when(config.tablesRouteField()).thenReturn(ROUTE_FIELD); + + IcebergWriterResult writeResult = + new IcebergWriterResult( + TableIdentifier.parse(TABLE_NAME), + ImmutableList.of(mock(DataFile.class)), + ImmutableList.of(), + Types.StructType.of()); + IcebergWriter writer = mock(IcebergWriter.class); + when(writer.complete()).thenReturn(ImmutableList.of(writeResult)); + + IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class); + when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); + + SinkWriter sinkWriter = new SinkWriter(catalog, config); + + // simulate a record that has been transformed by RegexRouter (topic changed) + String originalTopic = "orders"; + int originalPartition = 0; + long originalOffset = 42L; + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + + SinkRecord original = + new SinkRecord( + originalTopic, + originalPartition, + null, + "key", + null, + ImmutableMap.of(ROUTE_FIELD, TABLE_IDENTIFIER.toString()), + originalOffset, + now.toEpochMilli(), + TimestampType.LOG_APPEND_TIME); + + // RegexRouter changes the topic via newRecord + String transformedTopic = "tmp.dynamic_orders"; + SinkRecord transformed = + original.newRecord( + transformedTopic, + originalPartition, + original.keySchema(), + original.key(), + original.valueSchema(), + original.value(), + original.timestamp()); + + sinkWriter.save(ImmutableList.of(transformed)); + SinkWriterResult result = sinkWriter.completeWrite(); + + // offsets must be keyed by the ORIGINAL topic, not the transformed one + Offset offset = + result.sourceOffsets().get(new TopicPartition(originalTopic, originalPartition)); + assertThat(offset).isNotNull(); + assertThat(offset.offset()).isEqualTo(originalOffset + 1); + assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC)); + + // the transformed topic key should NOT be present + Offset wrongOffset = + result.sourceOffsets().get(new TopicPartition(transformedTopic, originalPartition)); + assertThat(wrongOffset).isNull(); + } + @Test public void testDynamicNoRoute() { IcebergSinkConfig config = mock(IcebergSinkConfig.class);