Skip to content

Kafka Connect: evolve-schema-enabled cannot migrate empty-struct oneof arms — task FAILED with InvalidSchemaException #16212

@kzajaczkowski

Description

@kzajaczkowski

Summary

iceberg.tables.evolve-schema-enabled: true cannot rescue an empty-struct → non-empty-struct migration that arises from Protobuf oneof arms that were initially zero-field marker messages. When a new schema version adds fields to those arms, the Parquet writer opens against the stale table schema (which still contains struct<> columns) and immediately throws InvalidSchemaException. The task is killed; data is lost.

This is distinct from #15395, which describes a silent drop (no evolution, no crash) for nested structs that were never empty. The failure here is louder — the task transitions to FAILED at the very first record of the new schema version — and the root cause is different.


Version

  • iceberg-kafka-connect 1.9.2 (also reproduced on the 1.9.2 connector against iceberg-core 1.9.2, parquet-column 1.15.2)
  • Confluent cp-kafka-connect-base 7.6.6 (Apache Kafka Connect 3.7.x)
  • Schema Registry: Confluent 7.6.6
  • Catalog: Iceberg REST (Nessie 0.103.3 + MinIO)

Proto Shape (Minimum Reproduction)

V1 — zero-field marker arms:

syntax = "proto3";
package example.v1;
import "google/protobuf/timestamp.proto";

message OrderLifecycleEvent {
  string order_id                          = 1;
  google.protobuf.Timestamp event_timestamp = 2;
  oneof event {
    OrderPlacedEvent    placed    = 3;
    OrderShippedEvent   shipped   = 4;
    OrderCancelledEvent cancelled = 5;
  }
  string reason = 6;
}
message OrderPlacedEvent    {}
message OrderShippedEvent   {}
message OrderCancelledEvent {}

V2 — Timestamp field added to each arm (BACKWARD-compatible per SR):

message OrderPlacedEvent    { google.protobuf.Timestamp placed_timestamp    = 1; }
message OrderShippedEvent   { google.protobuf.Timestamp shipped_timestamp   = 1; }
message OrderCancelledEvent { google.protobuf.Timestamp cancelled_timestamp = 1; }

Connector Config

{
  "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
  "tasks.max": "1",
  "iceberg.tables.auto-create-enabled": "true",
  "iceberg.tables.evolve-schema-enabled": "true",
  "iceberg.control.commit.interval-ms": "5000",
  "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

Reproduction Steps

  1. Register V1 schema; produce 3 V1 records (deliberately below the flush threshold of ~40). The sink auto-creates the Iceberg table:

    event_0      struct<
      placed:    struct<>   ← empty arm
      shipped:   struct<>   ← empty arm
      cancelled: struct<>   ← empty arm
    >
    

    The coordinator emits committed to 0 table(s) — table exists with empty-arm schema but no Parquet data is written yet (silent-drop state).

  2. Produce 50 V2 records (total = 53, crosses the flush threshold). Each record sets only the shipped arm:

    {"orderId":"o-1","eventTimestamp":"2026-04-24T11:00:00Z","shipped":{"shippedTimestamp":"2026-04-24T11:00:00Z"},"reason":"dispatched"}
  3. Within seconds the task transitions to FAILED.


Observed Behaviour

Task FAILED at offset 3 (the very first V2 record):

Caused by: org.apache.kafka.connect.errors.DataException:
  An error occurred converting record, topic: example.order_lifecycle_event.v1,
  partition: 0, offset: 3
Caused by: org.apache.parquet.schema.InvalidSchemaException:
  Cannot write a schema with an empty group: optional group placed = 2 {
}
  at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
  at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:458)
  at org.apache.iceberg.parquet.ParquetWriter.ensureWriterInitialized(ParquetWriter.java:114)
  at org.apache.iceberg.parquet.ParquetWriter.flushRowGroup(ParquetWriter.java:214)
  at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:258)
  at org.apache.iceberg.io.PartitionedFanoutWriter.close(PartitionedFanoutWriter.java:70)
  at org.apache.iceberg.io.BaseTaskWriter.complete(BaseTaskWriter.java:100)
  at org.apache.iceberg.connect.data.IcebergWriter.flush(IcebergWriter.java:103)
  at org.apache.iceberg.connect.data.IcebergWriter.complete(IcebergWriter.java:118)
  at org.apache.iceberg.connect.data.SinkWriter.lambda$completeWrite$0(SinkWriter.java:57)

No schema-evolution log line (Schema for table … updated) appears before the failure. The Parquet writer is opened against the stale V1 table schema before any evolution is attempted.


Expected Behaviour

evolve-schema-enabled: true should promote all empty-arm structs to their new non-empty shape before the Parquet writer is initialised, so the write succeeds without manual intervention.


Root Cause Analysis

The bug is in RecordConverter.convertStructValue(). When the sink processes a SinkRecord, it calls the schema-update consumer (addColumn) only for struct fields whose value is non-null in the current record:

// RecordConverter.convertStructValue() — simplified
for (Field recordField : recordSchema.fields()) {
    Object value = struct.get(recordField);
    if (value == null) {
        continue;   // ← null field: no addColumn callback fired
    }
    // schema-update path runs only here ...
    schemaUpdateConsumer.addColumn(path, name, icebergType);
}

In a Protobuf oneof, the Confluent ProtobufConverter wraps all arms in a synthetic outer struct (event_0: struct<placed, shipped, cancelled>). The active arm contains a value; all other arms are null. Therefore:

  • Only the set arm (shipped) has its sub-fields discovered via addColumn.
  • placed and cancelled are null → no callback → they remain struct<> in the table schema.
  • When the writer reinitialises, it reads the partially-evolved table. Parquet's TypeUtil.checkValidWriteSchema rejects the first struct<> it encounters — in this run, optional group placed = 2 {}.

The Connect schema attached to the SinkRecord (record.valueSchema()) does contain the full V2 type for every arm, including the arms that are null in this record. The fix should use that typed schema information to evolve the column even when the value is null.


Proposed Fix

In RecordConverter.convertStructValue(), in the else branch where value == null, check whether the existing table column for that field is a struct that is currently empty. If it is, look up the expected Iceberg type from the Connect schema (via SchemaUtils.toIcebergType(recordField.schema(), config)) and fire addColumn for each sub-field:

} else {
    // value is null — but if the existing column is an empty struct and
    // the Connect schema has sub-fields, evolve the column now
    // so the Parquet writer never sees a zero-field group.
    if (existingColumn != null
            && existingColumn.type().isStructType()
            && existingColumn.type().asStructType().fields().isEmpty()
            && recordField.schema().type() == Schema.Type.STRUCT) {
        Type newType = SchemaUtils.toIcebergType(recordField.schema(), config);
        schemaUpdateConsumer.addColumn(path, name, newType);
    }
}

This approach requires no value — only the schema — and is therefore safe for any null field, not just oneof arms.


Relationship to #15395

#15395 ("Does not evolve schema for nested fields inside struct/list/map of structs") reports that evolution silently skips sub-fields of a non-empty existing struct when new sub-fields are added. That is a silent-drop; the task stays RUNNING.

This issue is a separate, more severe case: the existing column is a zero-field struct, and the failure mode is a task FAILED at the first write attempt. Both issues share the same null-skipping code path but trigger different downstream failures.


Workaround

evolve-schema-enabled alone cannot recover from this state. Operators must either:

  • Drop and recreate the Iceberg table (losing any metadata snapshot history), or
  • Manually add the missing sub-fields via ALTER TABLE … ADD COLUMN before restarting the connector.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions