diff --git a/core/src/main/java/kafka/automq/table/process/RecordAssembler.java b/core/src/main/java/kafka/automq/table/process/RecordAssembler.java index d80aeed107..64750b5a7f 100644 --- a/core/src/main/java/kafka/automq/table/process/RecordAssembler.java +++ b/core/src/main/java/kafka/automq/table/process/RecordAssembler.java @@ -27,6 +27,7 @@ import org.apache.avro.SchemaNormalization; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.internal.Accessor; import java.util.ArrayList; import java.util.List; @@ -145,7 +146,9 @@ private AssemblerSchema buildFinalAssemblerSchema() { List finalFields = new ArrayList<>(baseRecord.getSchema().getFields().size() + 3); Schema baseSchema = baseRecord.getSchema(); for (Schema.Field field : baseSchema.getFields()) { - finalFields.add(new Schema.Field(field, field.schema())); + // Accessor keeps the original Schema instance (preserving logical types) while skipping default-value revalidation. + Schema.Field f = Accessor.createField(field.name(), field.schema(), field.doc(), Accessor.defaultValue(field), false, field.order()); + finalFields.add(f); } int baseFieldCount = baseSchema.getFields().size();