Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key). Dotted names (e.g. `user.id`) are supported and resolve to the nested leaf field; the leaf and all parent structs must be required. |
| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables |
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
Expand All @@ -73,7 +73,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.<_table-name_\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.<_table-name_\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.<_table-name_\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key). Dotted names (e.g. `user.id`) are supported and resolve to the nested leaf field; the leaf and all parent structs must be required. |
| iceberg.table.<_table-name_\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.<_table-name_\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -136,6 +138,47 @@ public void testIcebergSinkAutoCreate(String branch) {
assertThat(spec.isPartitioned()).isEqualTo(useSchema);
}

@ParameterizedTest
@NullSource
public void testIcebergSinkAutoCreateWithIdentifierFields(String branch) {
// Identifier fields require a Kafka schema with required fields,
// so this test only runs with useSchema = true
boolean useSchema = true;

Map<String, String> extraConfig = Maps.newHashMap();
extraConfig.put("iceberg.tables.auto-create-enabled", "true");
// Configure identifier fields
extraConfig.put("iceberg.tables.default-id-columns", "id,type");
if (useSchema) {
// partition the table for one of the tests
extraConfig.put("iceberg.tables.default-partition-by", "hour(ts)");
}

runTest(branch, useSchema, extraConfig, List.of(TABLE_IDENTIFIER));

// Verify table was created with correct identifier fields
Table table = catalog().loadTable(TABLE_IDENTIFIER);
Schema tableSchema = table.schema();

Set<Integer> identifierFieldIds = tableSchema.identifierFieldIds();
assertThat(identifierFieldIds)
.as("Table should have identifier field IDs set for id and type columns")
.isNotEmpty()
.containsExactlyInAnyOrder(
tableSchema.findField("id").fieldId(), tableSchema.findField("type").fieldId());

List<DataFile> files = dataFiles(TABLE_IDENTIFIER, branch);
// may involve 1 or 2 workers
assertThat(files).hasSizeBetween(1, 2);
assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(2);
assertSnapshotProps(TABLE_IDENTIFIER, branch);

assertGeneratedSchema(useSchema, LongType.class);

PartitionSpec spec = catalog().loadTable(TABLE_IDENTIFIER).spec();
assertThat(spec.isPartitioned()).isEqualTo(useSchema);
}

private void assertGeneratedSchema(boolean useSchema, Class<? extends Type> expectedIdType) {
Schema tableSchema = catalog().loadTable(TABLE_IDENTIFIER).schema();
assertThat(tableSchema.findField("id").type()).isInstanceOf(expectedIdType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -35,6 +37,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.Tasks;
import org.apache.kafka.connect.errors.DataException;
Expand Down Expand Up @@ -93,7 +96,41 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}

org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
// Get ID columns configuration and map to field IDs
List<String> idColumns = config.tableConfig(tableName).idColumns();

org.apache.iceberg.Schema initialSchema = new org.apache.iceberg.Schema(structType.fields());

Set<Integer> identifierFieldIds =
idColumns.stream()
.map(
name -> {
NestedField field = initialSchema.findField(name);
if (field == null) {
throw new DataException(
String.format(
"ID column '%s' not found in schema for table %s. Available columns: %s",
name,
tableName,
initialSchema.columns().stream()
.map(NestedField::name)
.collect(Collectors.toList())));
}
return field.fieldId();
})
.collect(Collectors.toSet());

org.apache.iceberg.Schema schema;
try {
schema = new org.apache.iceberg.Schema(structType.fields(), identifierFieldIds);
} catch (IllegalArgumentException e) {
throw new DataException(
String.format(
"Invalid identifier column configuration for table %s: %s",
tableName, e.getMessage()),
e);
}

TableIdentifier identifier = TableIdentifier.parse(tableName);

createNamespaceIfNotExist(catalog, identifier.namespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.connect.data;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -28,6 +29,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -41,7 +43,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.StringType;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -96,4 +102,178 @@ public void testAutoCreateTable(boolean partitioned) {
assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", "foo2"));
assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", "foo2", "foo3"));
}

@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTableWithIdentifierFields() {
Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class));
when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table"));

TableSinkConfig tableConfig = mock(TableSinkConfig.class);
when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
// Configure ID columns
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id", "data"));

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop", "foo1"));
when(config.tableConfig(any())).thenReturn(tableConfig);

// Create a Kafka schema with required fields for identifier columns
org.apache.kafka.connect.data.Schema valueSchema =
SchemaBuilder.struct()
.field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) // required
.field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) // required
.build();

Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo2");

SinkRecord record = mock(SinkRecord.class);
when(record.valueSchema()).thenReturn(valueSchema);
when(record.value()).thenReturn(value);

IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
factory.autoCreateTable("foo.bar", record);

ArgumentCaptor<TableIdentifier> identCaptor = ArgumentCaptor.forClass(TableIdentifier.class);
ArgumentCaptor<Schema> schemaCaptor = ArgumentCaptor.forClass(Schema.class);
ArgumentCaptor<PartitionSpec> specCaptor = ArgumentCaptor.forClass(PartitionSpec.class);
ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);

verify(catalog)
.createTable(
identCaptor.capture(),
schemaCaptor.capture(),
specCaptor.capture(),
propsCaptor.capture());

Schema schema = schemaCaptor.getValue();
assertThat(schema.findField("id").type()).isEqualTo(LongType.get());
assertThat(schema.findField("data").type()).isEqualTo(StringType.get());

// Verify identifier field IDs are set correctly
Set<Integer> identifierFieldIds = schema.identifierFieldIds();
assertThat(identifierFieldIds)
.as("Schema should have identifier field IDs set")
.containsExactlyInAnyOrder(
schema.findField("id").fieldId(), schema.findField("data").fieldId());
}

@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTableWithMissingIdentifierField() {
Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class));
when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table"));

TableSinkConfig tableConfig = mock(TableSinkConfig.class);
when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
// Configure ID column that doesn't exist in the data
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("missing_column"));

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
when(config.tableConfig(any())).thenReturn(tableConfig);

// Create schema without the missing column
org.apache.kafka.connect.data.Schema valueSchema =
SchemaBuilder.struct()
.field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
.field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
.build();

Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");

SinkRecord record = mock(SinkRecord.class);
when(record.valueSchema()).thenReturn(valueSchema);
when(record.value()).thenReturn(value);

IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);

// Should throw DataException when ID column is not found in schema
assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
.isInstanceOf(DataException.class)
.hasMessageContaining("ID column 'missing_column' not found in schema")
.hasMessageContaining("Available columns:");
}

@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTableWithNestedIdentifierField() {
Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class));
when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table"));

TableSinkConfig tableConfig = mock(TableSinkConfig.class);
when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
// Dotted path references the required leaf inside a required struct
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("meta.id"));

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
when(config.tableConfig(any())).thenReturn(tableConfig);

org.apache.kafka.connect.data.Schema metaSchema =
SchemaBuilder.struct()
.field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
.build();
org.apache.kafka.connect.data.Schema valueSchema =
SchemaBuilder.struct()
.field("meta", metaSchema)
.field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
.build();

Struct metaValue = new Struct(metaSchema).put("id", 42L);
Struct value = new Struct(valueSchema).put("meta", metaValue).put("data", "foo");

SinkRecord record = mock(SinkRecord.class);
when(record.valueSchema()).thenReturn(valueSchema);
when(record.value()).thenReturn(value);

IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
factory.autoCreateTable("foo.bar", record);

ArgumentCaptor<Schema> schemaCaptor = ArgumentCaptor.forClass(Schema.class);
verify(catalog).createTable(any(), schemaCaptor.capture(), any(), any());

Schema schema = schemaCaptor.getValue();
int nestedLeafId = schema.findField("meta.id").fieldId();
assertThat(schema.identifierFieldIds())
.as("Nested leaf referenced by dotted id-column should be stamped as identifier")
.containsExactly(nestedLeafId);
}

@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTableWithOptionalIdentifierField() {
Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class));
when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such table"));

TableSinkConfig tableConfig = mock(TableSinkConfig.class);
when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
// Configure ID column
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id"));

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
when(config.tableConfig(any())).thenReturn(tableConfig);

// Create schema with optional identifier field
org.apache.kafka.connect.data.Schema valueSchema =
SchemaBuilder.struct()
.field("id", org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA) // optional!
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test covers schemaForceOptional=false plus a Kafka-level optional field, which is the easy case the user can fix. The harder case — schemaForceOptional=true plus a Kafka-level required field — is the one a real deployment hits, and it's the path that currently produces a misleading error per the inline on IcebergWriterFactory.java:115. I'd add a third test that stubs config.schemaForceOptional() to true and asserts whatever the right behavior turns out to be after that thread settles.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I will create a follow-up PR with the changes here and in https://github.com/apache/iceberg/pull/15615/changes#r3311617558 👍🏽

.field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
.build();

Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");

SinkRecord record = mock(SinkRecord.class);
when(record.valueSchema()).thenReturn(valueSchema);
when(record.value()).thenReturn(value);

IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);

// Should throw DataException when ID column is optional
assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
.isInstanceOf(DataException.class)
.hasMessageContaining("Invalid identifier column configuration for table foo.bar")
.hasMessageContaining("not a required field");
}
}