Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 150 additions & 57 deletions core/src/main/java/kafka/automq/table/binder/RecordBinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
import kafka.automq.table.metric.FieldMetric;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.avro.Schema.Type.ARRAY;
import static org.apache.avro.Schema.Type.NULL;

/**
Expand Down Expand Up @@ -124,12 +127,8 @@ private FieldMapping[] buildFieldMappings(Schema avroSchema, org.apache.iceberg.
Schema recordSchema = avroSchema;
FieldMapping[] mappings = new FieldMapping[icebergSchema.columns().size()];

if (recordSchema.getType() == Schema.Type.UNION) {
recordSchema = recordSchema.getTypes().stream()
.filter(s -> s.getType() == Schema.Type.RECORD)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("UNION schema does not contain a RECORD type: " + avroSchema));
}
// Unwrap UNION if it contains only one non-NULL type
recordSchema = resolveUnionElement(recordSchema);

for (int icebergPos = 0; icebergPos < icebergSchema.columns().size(); icebergPos++) {
Types.NestedField icebergField = icebergSchema.columns().get(icebergPos);
Expand Down Expand Up @@ -162,17 +161,29 @@ private FieldMapping buildFieldMapping(String avroFieldName, int avroPosition, T
}

private Schema resolveUnionElement(Schema schema) {
Schema resolved = schema;
if (schema.getType() == Schema.Type.UNION) {
resolved = null;
for (Schema unionMember : schema.getTypes()) {
if (unionMember.getType() != NULL) {
resolved = unionMember;
break;
}
if (schema.getType() != Schema.Type.UNION) {
return schema;
}

// Collect all non-NULL types
List<Schema> nonNullTypes = new ArrayList<>();
for (Schema s : schema.getTypes()) {
if (s.getType() != NULL) {
nonNullTypes.add(s);
}
}
return resolved;

if (nonNullTypes.isEmpty()) {
throw new IllegalArgumentException("UNION schema contains only NULL type: " + schema);
} else if (nonNullTypes.size() == 1) {
// Only unwrap UNION if it contains exactly one non-NULL type (optional union)
return nonNullTypes.get(0);
} else {
// Multiple non-NULL types: non-optional union not supported
throw new UnsupportedOperationException(
"Non-optional UNION with multiple non-NULL types is not supported. " +
"Found " + nonNullTypes.size() + " non-NULL types in UNION: " + schema);
}
}


Expand All @@ -184,53 +195,135 @@ private Map<Schema, RecordBinder> precomputeBindersMap(TypeAdapter<Schema> typeA

for (FieldMapping mapping : fieldMappings) {
if (mapping != null) {
Type type = mapping.icebergType();
if (type.isPrimitiveType()) {
} else if (type.isStructType()) {
org.apache.iceberg.Schema schema = type.asStructType().asSchema();
RecordBinder structBinder = new RecordBinder(
schema,
mapping.avroSchema(),
typeAdapter,
batchFieldCount
);
binders.put(mapping.avroSchema(), structBinder);
} else if (type.isListType()) {
Types.ListType listType = type.asListType();
Type elementType = listType.elementType();
if (elementType.isStructType()) {
org.apache.iceberg.Schema schema = elementType.asStructType().asSchema();
RecordBinder elementBinder = new RecordBinder(
schema,
mapping.avroSchema().getElementType(),
typeAdapter,
batchFieldCount
);
binders.put(mapping.avroSchema().getElementType(), elementBinder);
}
} else if (type.isMapType()) {
Types.MapType mapType = type.asMapType();
Type keyType = mapType.keyType();
Type valueType = mapType.valueType();
if (keyType.isStructType()) {
throw new UnsupportedOperationException("Struct keys in MAP types are not supported");
}
if (valueType.isStructType()) {
org.apache.iceberg.Schema schema = valueType.asStructType().asSchema();
RecordBinder valueBinder = new RecordBinder(
schema,
mapping.avroSchema().getValueType(),
typeAdapter,
batchFieldCount
);
binders.put(mapping.avroSchema().getValueType(), valueBinder);
}
}
precomputeBindersForType(mapping.icebergType(), mapping.avroSchema(), binders, typeAdapter);
}
}
return binders;
}

/**
* Recursively precomputes binders for a given Iceberg type and its corresponding Avro schema.
*/
private void precomputeBindersForType(Type icebergType, Schema avroSchema,
Map<Schema, RecordBinder> binders,
TypeAdapter<Schema> typeAdapter) {
if (icebergType.isPrimitiveType()) {
return; // No binders needed for primitive types
}

if (icebergType.isStructType() && !avroSchema.isUnion()) {
createStructBinder(icebergType.asStructType(), avroSchema, binders, typeAdapter);
} else if (icebergType.isStructType() && avroSchema.isUnion()) {
createUnionStructBinders(icebergType.asStructType(), avroSchema, binders, typeAdapter);
} else if (icebergType.isListType()) {
createListBinder(icebergType.asListType(), avroSchema, binders, typeAdapter);
} else if (icebergType.isMapType()) {
createMapBinder(icebergType.asMapType(), avroSchema, binders, typeAdapter);
}
}

/**
* Creates binders for STRUCT types represented as Avro UNIONs.
*/
private void createUnionStructBinders(Types.StructType structType, Schema avroSchema,
Map<Schema, RecordBinder> binders,
TypeAdapter<Schema> typeAdapter) {
org.apache.iceberg.Schema schema = structType.asSchema();
SchemaBuilder.FieldAssembler<Schema> schemaBuilder = SchemaBuilder.record(avroSchema.getName()).fields()
.name("tag").type().intType().noDefault();
int tag = 0;
for (Schema unionMember : avroSchema.getTypes()) {
if (unionMember.getType() != NULL) {
schemaBuilder.name("field" + tag).type(unionMember).noDefault();
tag++;
}
}
RecordBinder structBinder = new RecordBinder(schema, schemaBuilder.endRecord(), typeAdapter, batchFieldCount);
binders.put(avroSchema, structBinder);
}

/**
* Creates a binder for a STRUCT type field.
*/
private void createStructBinder(Types.StructType structType, Schema avroSchema,
Map<Schema, RecordBinder> binders,
TypeAdapter<Schema> typeAdapter) {
org.apache.iceberg.Schema schema = structType.asSchema();
RecordBinder structBinder = new RecordBinder(schema, avroSchema, typeAdapter, batchFieldCount);
binders.put(avroSchema, structBinder);
}

/**
* Creates binders for LIST type elements (if they are STRUCT types).
*/
private void createListBinder(Types.ListType listType, Schema avroSchema,
Map<Schema, RecordBinder> binders,
TypeAdapter<Schema> typeAdapter) {
Type elementType = listType.elementType();
if (elementType.isStructType()) {
Schema elementAvroSchema = avroSchema.getElementType();
createStructBinder(elementType.asStructType(), elementAvroSchema, binders, typeAdapter);
}
}

/**
* Creates binders for MAP type keys and values (if they are STRUCT types).
* Handles two Avro representations: ARRAY of key-value records, or native MAP.
*/
private void createMapBinder(Types.MapType mapType, Schema avroSchema,
Map<Schema, RecordBinder> binders,
TypeAdapter<Schema> typeAdapter) {
Type keyType = mapType.keyType();
Type valueType = mapType.valueType();

if (ARRAY.equals(avroSchema.getType())) {
// Avro represents MAP as ARRAY of records with "key" and "value" fields
createMapAsArrayBinder(keyType, valueType, avroSchema, binders, typeAdapter);
} else {
// Avro represents MAP as native MAP type
createMapAsMapBinder(keyType, valueType, avroSchema, binders, typeAdapter);
}
}

/**
* Handles MAP represented as Avro ARRAY of {key, value} records.
*/
private void createMapAsArrayBinder(Type keyType, Type valueType, Schema avroSchema,
Map<Schema, RecordBinder> binders,
TypeAdapter<Schema> typeAdapter) {
Schema elementSchema = avroSchema.getElementType();

// Process key if it's a STRUCT
if (keyType.isStructType()) {
Schema keyAvroSchema = elementSchema.getField("key").schema();
createStructBinder(keyType.asStructType(), keyAvroSchema, binders, typeAdapter);
}

// Process value if it's a STRUCT
if (valueType.isStructType()) {
Schema valueAvroSchema = elementSchema.getField("value").schema();
createStructBinder(valueType.asStructType(), valueAvroSchema, binders, typeAdapter);
}
}

/**
* Handles MAP represented as Avro native MAP type.
*/
private void createMapAsMapBinder(Type keyType, Type valueType, Schema avroSchema,
Map<Schema, RecordBinder> binders,
TypeAdapter<Schema> typeAdapter) {
// Struct keys in native MAP are not supported by Avro
if (keyType.isStructType()) {
throw new UnsupportedOperationException("Struct keys in MAP types are not supported");
}

// Process value if it's a STRUCT
if (valueType.isStructType()) {
Schema valueAvroSchema = avroSchema.getValueType();
createStructBinder(valueType.asStructType(), valueAvroSchema, binders, typeAdapter);
}
}

private static class AvroRecordView implements Record {
private final GenericRecord avroRecord;
private final org.apache.iceberg.Schema icebergSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.automq.table.process.convert;

import com.google.protobuf.Descriptors;

import org.apache.avro.Schema;
import org.apache.avro.protobuf.ProtobufData;
import org.apache.iceberg.avro.CodecSetup;

import java.util.Arrays;

/**
* ProtobufData extension that annotates protobuf map fields with Iceberg's LogicalMap logical type so that
* downstream Avro{@literal >}Iceberg conversion keeps them as MAP instead of generic {@literal ARRAY<record<key,value>>}.
*/
public class LogicalMapProtobufData extends ProtobufData {
private static final LogicalMapProtobufData INSTANCE = new LogicalMapProtobufData();
private static final Schema NULL = Schema.create(Schema.Type.NULL);

public static LogicalMapProtobufData get() {
return INSTANCE;
}

@Override
public Schema getSchema(Descriptors.FieldDescriptor f) {
Schema schema = super.getSchema(f);
if (f.isMapField()) {
Schema nonNull = resolveNonNull(schema);
// protobuf maps are materialized as ARRAY<entry{key,value}> in Avro
if (nonNull != null && nonNull.getType() == Schema.Type.ARRAY) {
// set logicalType property; LogicalTypes is registered in CodecSetup
CodecSetup.getLogicalMap().addToSchema(nonNull);
}
} else if (f.isOptional() && !f.isRepeated() && f.getContainingOneof() == null
&& schema.getType() != Schema.Type.UNION) {
// Proto3 optional scalars/messages: wrap as union(type, null) so the protobuf default (typically non-null)
// remains valid (Avro default must match the first branch).
schema = Schema.createUnion(Arrays.asList(schema, NULL));
} else if (f.getContainingOneof() != null && !f.isRepeated() && schema.getType() != Schema.Type.UNION) {
// oneof fields: wrap as union(type, null) so that non-set fields can be represented as null
schema = Schema.createUnion(Arrays.asList(schema, NULL));
}
return schema;
}

private Schema resolveNonNull(Schema schema) {
if (schema == null) {
return null;
}
if (schema.getType() == Schema.Type.UNION) {
for (Schema member : schema.getTypes()) {
if (member.getType() != Schema.Type.NULL) {
return member;
}
}
return null;
}
return schema;
}
}
Loading
Loading