diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md index 9c7d3d83f2e4..53ec0c34d0da 100644 --- a/docs/docs/kafka-connect.md +++ b/docs/docs/kafka-connect.md @@ -64,7 +64,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later. | iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | | iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | | iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | -| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | +| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key). Dotted names (e.g. `user.id`) are supported and resolve to the nested leaf field; the leaf and all parent structs must be required. | | iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | | iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` | | iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` | @@ -73,7 +73,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later. | iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create | | iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence | | iceberg.table.<_table-name_\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified | -| iceberg.table.<_table-name_\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) | +| iceberg.table.<_table-name_\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key). Dotted names (e.g. `user.id`) are supported and resolve to the nested leaf field; the leaf and all parent structs must be required. | | iceberg.table.<_table-name_\>.partition-by | Comma-separated list of partition fields to use when creating the table | | iceberg.table.<_table-name_\>.route-regex | The regex used to match a record's `routeField` to a table | | iceberg.control.topic | Name of the control topic, default is `control-iceberg` | diff --git a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegration.java b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegration.java index 25778dfd78ed..d04ab78f6d43 100644 --- a/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegration.java +++ b/kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/TestIntegration.java @@ -24,9 +24,11 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -136,6 +138,47 @@ public void testIcebergSinkAutoCreate(String branch) { assertThat(spec.isPartitioned()).isEqualTo(useSchema); } + @ParameterizedTest + @NullSource + public void testIcebergSinkAutoCreateWithIdentifierFields(String branch) { + // Identifier fields require a Kafka schema with required fields, + // so this test only runs with useSchema = true + boolean useSchema = true; + + Map extraConfig = Maps.newHashMap(); + extraConfig.put("iceberg.tables.auto-create-enabled", "true"); + // Configure identifier fields + extraConfig.put("iceberg.tables.default-id-columns", "id,type"); + if (useSchema) { + // partition the table for one of the tests + extraConfig.put("iceberg.tables.default-partition-by", "hour(ts)"); + } + + runTest(branch, useSchema, extraConfig, List.of(TABLE_IDENTIFIER)); + + // Verify table was created with correct identifier fields + Table table = catalog().loadTable(TABLE_IDENTIFIER); + Schema tableSchema = table.schema(); + + Set identifierFieldIds = tableSchema.identifierFieldIds(); + assertThat(identifierFieldIds) + .as("Table should have identifier field IDs set for id and type columns") + .isNotEmpty() + .containsExactlyInAnyOrder( + tableSchema.findField("id").fieldId(), tableSchema.findField("type").fieldId()); + + List files = dataFiles(TABLE_IDENTIFIER, branch); + // may involve 1 or 2 workers + assertThat(files).hasSizeBetween(1, 2); + assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2); + assertSnapshotProps(TABLE_IDENTIFIER, branch); + + assertGeneratedSchema(useSchema, LongType.class); + + PartitionSpec spec = catalog().loadTable(TABLE_IDENTIFIER).spec(); + assertThat(spec.isPartitioned()).isEqualTo(useSchema); + } + private void assertGeneratedSchema(boolean useSchema, Class expectedIdType) { Schema tableSchema = catalog().loadTable(TABLE_IDENTIFIER).schema(); assertThat(tableSchema.findField("id").type()).isInstanceOf(expectedIdType); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index afb68f170136..8985f141da12 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -20,8 +20,10 @@ import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -35,6 +37,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.Tasks; import org.apache.kafka.connect.errors.DataException; @@ -93,7 +96,41 @@ Table autoCreateTable(String tableName, SinkRecord sample) { structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType(); } - org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields()); + // Get ID columns configuration and map to field IDs + List idColumns = config.tableConfig(tableName).idColumns(); + + org.apache.iceberg.Schema initialSchema = new org.apache.iceberg.Schema(structType.fields()); + + Set identifierFieldIds = + idColumns.stream() + .map( + name -> { + NestedField field = initialSchema.findField(name); + if (field == null) { + throw new DataException( + String.format( + "ID column '%s' not found in schema for table %s. Available columns: %s", + name, + tableName, + initialSchema.columns().stream() + .map(NestedField::name) + .collect(Collectors.toList()))); + } + return field.fieldId(); + }) + .collect(Collectors.toSet()); + + org.apache.iceberg.Schema schema; + try { + schema = new org.apache.iceberg.Schema(structType.fields(), identifierFieldIds); + } catch (IllegalArgumentException e) { + throw new DataException( + String.format( + "Invalid identifier column configuration for table %s: %s", + tableName, e.getMessage()), + e); + } + TableIdentifier identifier = TableIdentifier.parse(tableName); createNamespaceIfNotExist(catalog, identifier.namespace()); diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java index 2b18d4b24a1f..8ceb827698b1 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java @@ -19,6 +19,7 @@ package org.apache.iceberg.connect.data; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; @@ -41,7 +43,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.StringType; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; @@ -96,4 +102,178 @@ public void testAutoCreateTable(boolean partitioned) { assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", "foo2")); assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", "foo2", "foo3")); } + + @Test + @SuppressWarnings("unchecked") + public void testAutoCreateTableWithIdentifierFields() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table")); + + TableSinkConfig tableConfig = mock(TableSinkConfig.class); + when(tableConfig.partitionBy()).thenReturn(ImmutableList.of()); + // Configure ID columns + when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id", "data")); + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop", "foo1")); + when(config.tableConfig(any())).thenReturn(tableConfig); + + // Create a Kafka schema with required fields for identifier columns + org.apache.kafka.connect.data.Schema valueSchema = + SchemaBuilder.struct() + .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) // required + .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) // required + .build(); + + Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo2"); + + SinkRecord record = mock(SinkRecord.class); + when(record.valueSchema()).thenReturn(valueSchema); + when(record.value()).thenReturn(value); + + IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config); + factory.autoCreateTable("foo.bar", record); + + ArgumentCaptor identCaptor = ArgumentCaptor.forClass(TableIdentifier.class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Schema.class); + ArgumentCaptor specCaptor = ArgumentCaptor.forClass(PartitionSpec.class); + ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + verify(catalog) + .createTable( + identCaptor.capture(), + schemaCaptor.capture(), + specCaptor.capture(), + propsCaptor.capture()); + + Schema schema = schemaCaptor.getValue(); + assertThat(schema.findField("id").type()).isEqualTo(LongType.get()); + assertThat(schema.findField("data").type()).isEqualTo(StringType.get()); + + // Verify identifier field IDs are set correctly + Set identifierFieldIds = schema.identifierFieldIds(); + assertThat(identifierFieldIds) + .as("Schema should have identifier field IDs set") + .containsExactlyInAnyOrder( + schema.findField("id").fieldId(), schema.findField("data").fieldId()); + } + + @Test + @SuppressWarnings("unchecked") + public void testAutoCreateTableWithMissingIdentifierField() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table")); + + TableSinkConfig tableConfig = mock(TableSinkConfig.class); + when(tableConfig.partitionBy()).thenReturn(ImmutableList.of()); + // Configure ID column that doesn't exist in the data + when(tableConfig.idColumns()).thenReturn(ImmutableList.of("missing_column")); + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.autoCreateProps()).thenReturn(ImmutableMap.of()); + when(config.tableConfig(any())).thenReturn(tableConfig); + + // Create schema without the missing column + org.apache.kafka.connect.data.Schema valueSchema = + SchemaBuilder.struct() + .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) + .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .build(); + + Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo"); + + SinkRecord record = mock(SinkRecord.class); + when(record.valueSchema()).thenReturn(valueSchema); + when(record.value()).thenReturn(value); + + IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config); + + // Should throw DataException when ID column is not found in schema + assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record)) + .isInstanceOf(DataException.class) + .hasMessageContaining("ID column 'missing_column' not found in schema") + .hasMessageContaining("Available columns:"); + } + + @Test + @SuppressWarnings("unchecked") + public void testAutoCreateTableWithNestedIdentifierField() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table")); + + TableSinkConfig tableConfig = mock(TableSinkConfig.class); + when(tableConfig.partitionBy()).thenReturn(ImmutableList.of()); + // Dotted path references the required leaf inside a required struct + when(tableConfig.idColumns()).thenReturn(ImmutableList.of("meta.id")); + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.autoCreateProps()).thenReturn(ImmutableMap.of()); + when(config.tableConfig(any())).thenReturn(tableConfig); + + org.apache.kafka.connect.data.Schema metaSchema = + SchemaBuilder.struct() + .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) + .build(); + org.apache.kafka.connect.data.Schema valueSchema = + SchemaBuilder.struct() + .field("meta", metaSchema) + .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .build(); + + Struct metaValue = new Struct(metaSchema).put("id", 42L); + Struct value = new Struct(valueSchema).put("meta", metaValue).put("data", "foo"); + + SinkRecord record = mock(SinkRecord.class); + when(record.valueSchema()).thenReturn(valueSchema); + when(record.value()).thenReturn(value); + + IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config); + factory.autoCreateTable("foo.bar", record); + + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Schema.class); + verify(catalog).createTable(any(), schemaCaptor.capture(), any(), any()); + + Schema schema = schemaCaptor.getValue(); + int nestedLeafId = schema.findField("meta.id").fieldId(); + assertThat(schema.identifierFieldIds()) + .as("Nested leaf referenced by dotted id-column should be stamped as identifier") + .containsExactly(nestedLeafId); + } + + @Test + @SuppressWarnings("unchecked") + public void testAutoCreateTableWithOptionalIdentifierField() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table")); + + TableSinkConfig tableConfig = mock(TableSinkConfig.class); + when(tableConfig.partitionBy()).thenReturn(ImmutableList.of()); + // Configure ID column + when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id")); + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.autoCreateProps()).thenReturn(ImmutableMap.of()); + when(config.tableConfig(any())).thenReturn(tableConfig); + + // Create schema with optional identifier field + org.apache.kafka.connect.data.Schema valueSchema = + SchemaBuilder.struct() + .field("id", org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA) // optional! + .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) + .build(); + + Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo"); + + SinkRecord record = mock(SinkRecord.class); + when(record.valueSchema()).thenReturn(valueSchema); + when(record.value()).thenReturn(value); + + IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config); + + // Should throw DataException when ID column is optional + assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record)) + .isInstanceOf(DataException.class) + .hasMessageContaining("Invalid identifier column configuration for table foo.bar") + .hasMessageContaining("not a required field"); + } }