Apache Iceberg version
1.8.0
Query engine
Kafka Connect
Please describe the bug 🐞
Problem
When the Kafka Connect sink uses AWS Glue auto-creation and receives Protobuf schemas with either:
(1) empty messages (e.g. message KeepAlive {}) or
(2) recursive structures (e.g. google.protobuf.Struct)
SchemaUtils.SchemaGenerator.toIcebergType tries to convert those schemas into Iceberg types and either produces an empty struct (which Parquet rejects with InvalidSchemaException) or recurses indefinitely through the nested schema and blows the stack. This happens during task initialization, before any records are written.
Steps to Reproduce
- Define the Protobuf schemas:
// event_payload.proto
syntax = "proto3";
package events;
import "google/protobuf/struct.proto";
message Event {
sfixed64 timestamp_ns = 1;
EventData data = 2;
message EventData {
KeepAlive keep_alive = 1;
ErrorReport error_report = 2;
}
}
message KeepAlive {}
message ErrorReport {
google.protobuf.Struct details = 1;
}
// device_reading.proto
syntax = "proto3";
package readings;
import "event_payload.proto";
import "google/protobuf/timestamp.proto";
message DeviceReading {
string device_id = 1;
google.protobuf.Timestamp observed_at = 2;
events.Event.EventData data = 3;
}
- Register the schemas with Schema Registry.
- Produce a Kafka message to a topic (e.g., device-readings-topic) with the DeviceReading schema. The keep_alive field will be an empty message.
- Configure and run the Iceberg Kafka Connect sink to consume from device-readings-topic and write to an Iceberg table.
Observed Behavior
InvalidSchemaException due to empty struct:
org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group keep_alive = 1 {}
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
StackOverflowError due to recursive struct:
java.lang.StackOverflowError
at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:641)
Expected Behavior
The Iceberg Kafka Connect sink should handle these schemas gracefully, without crashing. A possible approach would be to convert empty or recursive structs to a StringType in the Iceberg schema.
Proposed Solution
The issue seems to be in the SchemaUtils$SchemaGenerator.toIcebergType method. Here are two suggested fixes:
- Handle empty structs:
In the STRUCT case, after building the list of fields, check if the list is empty. If it is, return StringType.get() instead of creating an empty StructType.
// Handle empty structs - Parquet cannot write empty groups
if (fields.isEmpty()) {
return StringType.get();
}
- Handle recursive structs:
Use a Map<String, Type> to track visited schemas, using a logical key (e.g., schema.type() + ":" + schema.name()). If a cycle is detected, return StringType.get().
// At the beginning of toIcebergType
String schemaKey = getSchemaKey(valueSchema);
Type result = visited.get(schemaKey);
if (result != null) {
// Break the recursion by returning a StringType
return StringType.get();
}
This would prevent both the InvalidSchemaException and the StackOverflowError.
Willingness to contribute
Apache Iceberg version
1.8.0
Query engine
Kafka Connect
Please describe the bug 🐞
Problem
When the Kafka Connect sink uses AWS Glue auto-creation and receives Protobuf schemas with either:
(1) empty messages (e.g. message KeepAlive {}) or
(2) recursive structures (e.g. google.protobuf.Struct)
SchemaUtils.SchemaGenerator.toIcebergTypetries to convert those schemas into Iceberg types and either produces an empty struct (which Parquet rejects withInvalidSchemaException) or recurses indefinitely through the nested schema and blows the stack. This happens during task initialization, before any records are written.Steps to Reproduce
Observed Behavior
InvalidSchemaExceptiondue to empty struct:StackOverflowErrordue to recursive struct:Expected Behavior
The Iceberg Kafka Connect sink should handle these schemas gracefully, without crashing. A possible approach would be to convert empty or recursive structs to a StringType in the Iceberg schema.
Proposed Solution
The issue seems to be in the SchemaUtils$SchemaGenerator.toIcebergType method. Here are two suggested fixes:
In the STRUCT case, after building the list of fields, check if the list is empty. If it is, return StringType.get() instead of creating an empty StructType.
Use a Map<String, Type> to track visited schemas, using a logical key (e.g., schema.type() + ":" + schema.name()). If a cycle is detected, return StringType.get().
This would prevent both the InvalidSchemaException and the StackOverflowError.
Willingness to contribute