diff --git a/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java index db0a6f6242..803ee2157a 100644 --- a/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java +++ b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java @@ -47,7 +47,10 @@ public class AvroValueAdapter extends AbstractTypeAdapter { @Override protected Object convertString(Object sourceValue, Schema sourceSchema, Type targetType) { - if (sourceValue instanceof Utf8 || sourceValue instanceof GenericData.EnumSymbol) { + if (sourceValue instanceof Utf8) { + return sourceValue; + } + if (sourceValue instanceof GenericData.EnumSymbol) { return sourceValue.toString(); } return super.convertString(sourceValue, sourceSchema, targetType); diff --git a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java index 2e6b155ec4..ebed2e14c3 100644 --- a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java +++ b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java @@ -258,7 +258,7 @@ private long calculateFieldCount(Object value, Type icebergType) { switch (icebergType.typeId()) { case STRING: - return FieldMetric.count((String) value); + return FieldMetric.count((CharSequence) value); case BINARY: return FieldMetric.count((ByteBuffer) value); case FIXED: diff --git a/core/src/main/java/kafka/automq/table/metric/FieldMetric.java b/core/src/main/java/kafka/automq/table/metric/FieldMetric.java index 807a1f31f6..ab8ae5967d 100644 --- a/core/src/main/java/kafka/automq/table/metric/FieldMetric.java +++ b/core/src/main/java/kafka/automq/table/metric/FieldMetric.java @@ -19,14 +19,19 @@ package kafka.automq.table.metric; +import org.apache.avro.util.Utf8; + import java.nio.ByteBuffer; public class FieldMetric { - public static int count(String value) { + public static int count(CharSequence value) { if (value == null) { return 0; } + if (value instanceof Utf8) { + return (((Utf8) value).getByteLength() + 23) / 24; + } if (value.isEmpty()) { return 1; } diff --git a/core/src/main/java/kafka/automq/table/process/DefaultRecordProcessor.java b/core/src/main/java/kafka/automq/table/process/DefaultRecordProcessor.java index 81b111cb39..a1d7ce8d96 100644 --- a/core/src/main/java/kafka/automq/table/process/DefaultRecordProcessor.java +++ b/core/src/main/java/kafka/automq/table/process/DefaultRecordProcessor.java @@ -25,6 +25,8 @@ import kafka.automq.table.process.exception.SchemaRegistrySystemException; import kafka.automq.table.process.exception.TransformException; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.Record; @@ -45,6 +47,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static kafka.automq.table.process.RecordAssembler.KAFKA_VALUE_FIELD; +import static kafka.automq.table.process.RecordAssembler.ensureOptional; /** * Default implementation of RecordProcessor using a two-stage processing pipeline. @@ -56,16 +59,25 @@ public class DefaultRecordProcessor implements RecordProcessor { private static final Schema HEADER_SCHEMA = Schema.createMap(Schema.create(Schema.Type.BYTES)); private static final String HEADER_SCHEMA_IDENTITY = String.valueOf(HEADER_SCHEMA.hashCode()); + private static final ConversionResult EMPTY_HEADERS_RESULT = + new ConversionResult(Map.of(), HEADER_SCHEMA, HEADER_SCHEMA_IDENTITY); private final String topicName; private final Converter keyConverter; private final Converter valueConverter; private final List transformChain; + private final RecordAssembler recordAssembler; // Reusable assembler + private final String transformIdentity; // precomputed transform chain identity + + private static final int VALUE_WRAPPER_SCHEMA_CACHE_MAX = 32; + private final Cache valueWrapperSchemaCache = new LRUCache<>(VALUE_WRAPPER_SCHEMA_CACHE_MAX); public DefaultRecordProcessor(String topicName, Converter keyConverter, Converter valueConverter) { this.transformChain = new ArrayList<>(); this.topicName = topicName; this.keyConverter = keyConverter; this.valueConverter = valueConverter; + this.recordAssembler = new RecordAssembler(); + this.transformIdentity = ""; // no transforms } public DefaultRecordProcessor(String topicName, Converter keyConverter, Converter valueConverter, List transforms) { @@ -73,6 +85,15 @@ public DefaultRecordProcessor(String topicName, Converter keyConverter, Converte this.topicName = topicName; this.keyConverter = keyConverter; this.valueConverter = valueConverter; + this.recordAssembler = new RecordAssembler(); + + // Precompute transform identity (names joined by comma) + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < this.transformChain.size(); i++) { + if (i > 0) sb.append(','); + sb.append(this.transformChain.get(i).getName()); + } + this.transformIdentity = sb.toString(); } @Override @@ -86,15 +107,19 @@ public ProcessingResult process(int partition, Record kafkaRecord) { GenericRecord baseRecord = wrapValue(valueResult); GenericRecord transformedRecord = applyTransformChain(baseRecord, partition, kafkaRecord); - GenericRecord finalRecord = new RecordAssembler(transformedRecord) + + String schemaIdentity = generateCompositeSchemaIdentity(headerResult, keyResult, valueResult); + + GenericRecord record = recordAssembler + .reset(transformedRecord) .withHeader(headerResult) .withKey(keyResult) + .withSchemaIdentity(schemaIdentity) .withMetadata(partition, kafkaRecord.offset(), kafkaRecord.timestamp()) .assemble(); - Schema finalSchema = finalRecord.getSchema(); - String schemaIdentity = generateCompositeSchemaIdentity(headerResult, keyResult, valueResult, transformChain); + Schema schema = record.getSchema(); - return new ProcessingResult(finalRecord, finalSchema, schemaIdentity); + return new ProcessingResult(record, schema, schemaIdentity); } catch (ConverterException e) { return getProcessingResult(kafkaRecord, "Convert operation failed for record: %s", DataError.ErrorType.CONVERT_ERROR, e); } catch (TransformException e) { @@ -132,14 +157,26 @@ private ProcessingResult getProcessingResult(Record kafkaRecord, String format, private ConversionResult processHeaders(Record kafkaRecord) throws ConverterException { try { - Map headers = new HashMap<>(); Header[] recordHeaders = kafkaRecord.headers(); - if (recordHeaders != null) { - for (Header header : recordHeaders) { - ByteBuffer value = header.value() != null ? - ByteBuffer.wrap(header.value()) : null; - headers.put(header.key(), value); - } + if (recordHeaders == null || recordHeaders.length == 0) { + return EMPTY_HEADERS_RESULT; + } + + int n = recordHeaders.length; + + // Small maps: use Map.of for zero/one header handled above; for one here (defensive), use Map.of + if (n == 1) { + Header h = recordHeaders[0]; + ByteBuffer value = h.value() != null ? ByteBuffer.wrap(h.value()) : null; + Map headers = Map.of(h.key(), value); + return new ConversionResult(headers, HEADER_SCHEMA, HEADER_SCHEMA_IDENTITY); + } + + // Larger maps: pre-size HashMap + Map headers = new HashMap<>(Math.max(16, (int) (n / 0.75f) + 1)); + for (Header header : recordHeaders) { + ByteBuffer value = header.value() != null ? ByteBuffer.wrap(header.value()) : null; + headers.put(header.key(), value); } return new ConversionResult(headers, HEADER_SCHEMA, HEADER_SCHEMA_IDENTITY); } catch (Exception e) { @@ -148,15 +185,21 @@ private ConversionResult processHeaders(Record kafkaRecord) throws ConverterExce } private GenericRecord wrapValue(ConversionResult valueResult) { - Schema.Field valueField = new Schema.Field(KAFKA_VALUE_FIELD, valueResult.getSchema(), null, null); Object valueContent = valueResult.getValue(); - - Schema recordSchema = Schema.createRecord("KafkaValueWrapper", null, "kafka.automq.table.process", false); - recordSchema.setFields(Collections.singletonList(valueField)); + Schema recordSchema = valueWrapperSchemaCache.get(valueResult.getSchemaIdentity()); + if (recordSchema == null) { + Schema.Field valueField = new Schema.Field( + KAFKA_VALUE_FIELD, + ensureOptional(valueResult.getSchema()), + null, null); + Schema schema = Schema.createRecord("KafkaValueWrapper", null, "kafka.automq.table.process", false); + schema.setFields(Collections.singletonList(valueField)); + valueWrapperSchemaCache.put(valueResult.getSchemaIdentity(), schema); + recordSchema = schema; + } GenericRecord baseRecord = new GenericData.Record(recordSchema); baseRecord.put(KAFKA_VALUE_FIELD, valueContent); - return baseRecord; } @@ -181,23 +224,12 @@ private GenericRecord applyTransformChain(GenericRecord baseRecord, int partitio private String generateCompositeSchemaIdentity( ConversionResult headerResult, ConversionResult keyResult, - ConversionResult valueResult, - List transforms) { - - // Extract schema identities with null safety + ConversionResult valueResult) { + // Extract schema identities String headerIdentity = headerResult.getSchemaIdentity(); String keyIdentity = keyResult.getSchemaIdentity(); String valueIdentity = valueResult.getSchemaIdentity(); - - // Generate transform chain identity - String transformIdentity = transforms.isEmpty() ? - "noTransform" : - transforms.stream() - .map(Transform::getName) - .collect(java.util.stream.Collectors.joining("->")); - - // Join all parts with the identity separator - return String.join("|", headerIdentity, keyIdentity, valueIdentity, transformIdentity); + return "h:" + headerIdentity + "|v:" + valueIdentity + "|k:" + keyIdentity + "|t:" + transformIdentity; } @Override diff --git a/core/src/main/java/kafka/automq/table/process/RecordAssembler.java b/core/src/main/java/kafka/automq/table/process/RecordAssembler.java index 1fc6466edb..1fadf34958 100644 --- a/core/src/main/java/kafka/automq/table/process/RecordAssembler.java +++ b/core/src/main/java/kafka/automq/table/process/RecordAssembler.java @@ -19,8 +19,12 @@ package kafka.automq.table.process; +import org.apache.kafka.common.cache.LRUCache; + +import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaNormalization; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -34,6 +38,7 @@ * This class also serves as the holder for the public contract of field names. */ public final class RecordAssembler { + private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); public static final String KAFKA_HEADER_FIELD = "_kafka_header"; public static final String KAFKA_KEY_FIELD = "_kafka_key"; @@ -53,15 +58,31 @@ public final class RecordAssembler { .name(METADATA_TIMESTAMP_FIELD).doc("Record timestamp").type().longType().noDefault() .endRecord(); - private final GenericRecord baseRecord; + private static final int SCHEMA_CACHE_MAX = 32; + // Cache of assembled schema + precomputed indexes bound to a schema identity + private final LRUCache assemblerSchemaCache = new LRUCache<>(SCHEMA_CACHE_MAX); + + // Reusable state - reset for each record + private GenericRecord baseRecord; private ConversionResult headerResult; private ConversionResult keyResult; private int partition; private long offset; private long timestamp; + private String schemaIdentity; - public RecordAssembler(GenericRecord baseRecord) { + public RecordAssembler() { + } + + public RecordAssembler reset(GenericRecord baseRecord) { this.baseRecord = baseRecord; + this.headerResult = null; + this.keyResult = null; + this.partition = 0; + this.offset = 0L; + this.timestamp = 0L; + this.schemaIdentity = null; + return this; } public RecordAssembler withHeader(ConversionResult headerResult) { @@ -74,6 +95,12 @@ public RecordAssembler withKey(ConversionResult keyResult) { return this; } + + public RecordAssembler withSchemaIdentity(String schemaIdentity) { + this.schemaIdentity = schemaIdentity; + return this; + } + public RecordAssembler withMetadata(int partition, long offset, long timestamp) { this.partition = partition; this.offset = offset; @@ -82,40 +109,192 @@ public RecordAssembler withMetadata(int partition, long offset, long timestamp) } public GenericRecord assemble() { - Schema finalSchema = buildFinalSchema(); - GenericRecord finalRecord = new GenericData.Record(finalSchema); - populateFields(finalRecord); - return finalRecord; + AssemblerSchema aSchema = getOrCreateAssemblerSchema(); + // Return a lightweight view that implements GenericRecord + // and adapts schema position/name lookups to the underlying values + // without copying the base record data. + return new AssembledRecordView(aSchema, baseRecord, + headerResult != null ? headerResult.getValue() : null, + keyResult != null ? keyResult.getValue() : null, + partition, offset, timestamp); + } + + private AssemblerSchema getOrCreateAssemblerSchema() { + if (schemaIdentity == null) { + long baseFp = SchemaNormalization.parsingFingerprint64(baseRecord.getSchema()); + long keyFp = keyResult != null ? SchemaNormalization.parsingFingerprint64(keyResult.getSchema()) : 0L; + long headerFp = headerResult != null ? SchemaNormalization.parsingFingerprint64(headerResult.getSchema()) : 0L; + long metadataFp = SchemaNormalization.parsingFingerprint64(METADATA_SCHEMA); + + schemaIdentity = "v:" + Long.toUnsignedString(baseFp) + + "|k:" + Long.toUnsignedString(keyFp) + + "|h:" + Long.toUnsignedString(headerFp) + + "|m:" + Long.toUnsignedString(metadataFp); + } + final String cacheKey = schemaIdentity; + AssemblerSchema cached = assemblerSchemaCache.get(cacheKey); + if (cached != null) { + return cached; + } + AssemblerSchema created = buildFinalAssemblerSchema(); + assemblerSchemaCache.put(cacheKey, created); + return created; } - private Schema buildFinalSchema() { - List finalFields = new ArrayList<>(); + private AssemblerSchema buildFinalAssemblerSchema() { + List finalFields = new ArrayList<>(baseRecord.getSchema().getFields().size() + 3); Schema baseSchema = baseRecord.getSchema(); for (Schema.Field field : baseSchema.getFields()) { - finalFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); + finalFields.add(new Schema.Field(field, field.schema())); + } + + int baseFieldCount = baseSchema.getFields().size(); + int headerIndex = -1; + int keyIndex = -1; + int metadataIndex = -1; + + if (headerResult != null) { + Schema optionalHeaderSchema = ensureOptional(headerResult.getSchema()); + finalFields.add(new Schema.Field(KAFKA_HEADER_FIELD, optionalHeaderSchema, "Kafka record headers", JsonProperties.NULL_VALUE)); + headerIndex = baseFieldCount; + } + if (keyResult != null) { + Schema optionalKeySchema = ensureOptional(keyResult.getSchema()); + finalFields.add(new Schema.Field(KAFKA_KEY_FIELD, optionalKeySchema, "Kafka record key", JsonProperties.NULL_VALUE)); + keyIndex = (headerIndex >= 0) ? baseFieldCount + 1 : baseFieldCount; } - finalFields.add(new Schema.Field(KAFKA_HEADER_FIELD, headerResult.getSchema(), "Kafka record headers", null)); - finalFields.add(new Schema.Field(KAFKA_KEY_FIELD, keyResult.getSchema(), "Kafka record key", null)); - finalFields.add(new Schema.Field(KAFKA_METADATA_FIELD, METADATA_SCHEMA, "Kafka record metadata", null)); + Schema optionalMetadataSchema = ensureOptional(METADATA_SCHEMA); + finalFields.add(new Schema.Field(KAFKA_METADATA_FIELD, optionalMetadataSchema, "Kafka record metadata", JsonProperties.NULL_VALUE)); + metadataIndex = baseFieldCount + (headerIndex >= 0 ? 1 : 0) + (keyIndex >= 0 ? 1 : 0); - return Schema.createRecord(baseSchema.getName() + "WithMetadata", null, + Schema finalSchema = Schema.createRecord(baseSchema.getName() + "WithMetadata", null, "kafka.automq.table.process", false, finalFields); + + return new AssemblerSchema(finalSchema, baseFieldCount, headerIndex, keyIndex, metadataIndex); } - private void populateFields(GenericRecord finalRecord) { - Schema baseSchema = baseRecord.getSchema(); - for (Schema.Field field : baseSchema.getFields()) { - finalRecord.put(field.name(), baseRecord.get(field.name())); + public static Schema ensureOptional(Schema schema) { + if (schema.getType() == Schema.Type.UNION) { + boolean hasNull = false; + List types = schema.getTypes(); + for (Schema type : types) { + if (type.getType() == Schema.Type.NULL) { + hasNull = true; + break; + } + } + if (hasNull) { + return schema; + } + List withNull = new ArrayList<>(types.size() + 1); + withNull.add(NULL_SCHEMA); + withNull.addAll(types); + return Schema.createUnion(withNull); + } + return Schema.createUnion(List.of(NULL_SCHEMA, schema)); + } + + /** + * A read-only GenericRecord view that adapts accesses (by name or position) + * to the underlying base record and the synthetic kafka fields. + */ + private static final class AssembledRecordView implements GenericRecord { + private final Schema finalSchema; + private final GenericRecord baseRecord; + private final Object headerValue; // May be null if not present in schema + private final Object keyValue; // May be null if not present in schema + private final int baseFieldCount; + private final int headerIndex; // -1 if absent + private final int keyIndex; // -1 if absent + private final int metadataIndex; // always >= 0 + + private GenericRecord metadataRecord; + + AssembledRecordView(AssemblerSchema aSchema, + GenericRecord baseRecord, + Object headerValue, + Object keyValue, + int partition, + long offset, + long timestamp) { + this.finalSchema = aSchema.schema; + this.baseRecord = baseRecord; + this.headerValue = headerValue; + this.keyValue = keyValue; + + this.baseFieldCount = aSchema.baseFieldCount; + this.headerIndex = aSchema.headerIndex; + this.keyIndex = aSchema.keyIndex; + this.metadataIndex = aSchema.metadataIndex; + + this.metadataRecord = new GenericData.Record(METADATA_SCHEMA); + metadataRecord.put(METADATA_PARTITION_FIELD, partition); + metadataRecord.put(METADATA_OFFSET_FIELD, offset); + metadataRecord.put(METADATA_TIMESTAMP_FIELD, timestamp); + } + + @Override + public void put(String key, Object v) { + throw new UnsupportedOperationException("AssembledRecordView is read-only"); } - finalRecord.put(KAFKA_HEADER_FIELD, headerResult.getValue()); - finalRecord.put(KAFKA_KEY_FIELD, keyResult.getValue()); + @Override + public Object get(String key) { + Schema.Field field = finalSchema.getField(key); + if (field == null) { + return null; + } + return get(field.pos()); + } + + @Override + public Schema getSchema() { + return finalSchema; + } + + @Override + public void put(int i, Object v) { + throw new UnsupportedOperationException("AssembledRecordView is read-only"); + } - GenericRecord metadata = new GenericData.Record(METADATA_SCHEMA); - metadata.put(METADATA_PARTITION_FIELD, partition); - metadata.put(METADATA_OFFSET_FIELD, offset); - metadata.put(METADATA_TIMESTAMP_FIELD, timestamp); - finalRecord.put(KAFKA_METADATA_FIELD, metadata); + @Override + public Object get(int i) { + if (i < 0 || i >= finalSchema.getFields().size()) { + throw new IndexOutOfBoundsException("Field position out of bounds: " + i); + } + // Base fields delegate directly + if (i < baseFieldCount) { + return baseRecord.get(i); + } + // Synthetic fields + if (i == headerIndex) { + return headerValue; + } + if (i == keyIndex) { + return keyValue; + } + if (i == metadataIndex) { + return metadataRecord; + } + // Should not happen if schema is consistent + return null; + } + } + + private static final class AssemblerSchema { + final Schema schema; + final int baseFieldCount; + final int headerIndex; + final int keyIndex; + final int metadataIndex; + + AssemblerSchema(Schema schema, int baseFieldCount, int headerIndex, int keyIndex, int metadataIndex) { + this.schema = schema; + this.baseFieldCount = baseFieldCount; + this.headerIndex = headerIndex; + this.keyIndex = keyIndex; + this.metadataIndex = metadataIndex; + } } } diff --git a/core/src/main/java/kafka/automq/table/process/convert/RawConverter.java b/core/src/main/java/kafka/automq/table/process/convert/RawConverter.java index f5cf73f88a..f358426272 100644 --- a/core/src/main/java/kafka/automq/table/process/convert/RawConverter.java +++ b/core/src/main/java/kafka/automq/table/process/convert/RawConverter.java @@ -24,6 +24,7 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaNormalization; import java.nio.ByteBuffer; @@ -31,7 +32,7 @@ public class RawConverter implements Converter { public static final RawConverter INSTANCE = new RawConverter(); private static final Schema SCHEMA = SchemaBuilder.builder().bytesType(); - private static final String SCHEMA_IDENTITY = String.valueOf(SCHEMA.hashCode()); + private static final String SCHEMA_IDENTITY = Long.toUnsignedString(SchemaNormalization.parsingFingerprint64(SCHEMA)); @Override public ConversionResult convert(String topic, ByteBuffer buffer) throws ConverterException { diff --git a/core/src/main/java/kafka/automq/table/process/convert/StringConverter.java b/core/src/main/java/kafka/automq/table/process/convert/StringConverter.java index c8eddfe630..f2e6081359 100644 --- a/core/src/main/java/kafka/automq/table/process/convert/StringConverter.java +++ b/core/src/main/java/kafka/automq/table/process/convert/StringConverter.java @@ -24,6 +24,7 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaNormalization; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -32,7 +33,7 @@ public class StringConverter implements Converter { public static final StringConverter INSTANCE = new StringConverter(); private static final Schema SCHEMA = SchemaBuilder.builder().stringType(); - private static final String SCHEMA_IDENTITY = String.valueOf(SCHEMA.hashCode()); + private static final String SCHEMA_IDENTITY = Long.toUnsignedString(SchemaNormalization.parsingFingerprint64(SCHEMA)); @Override public ConversionResult convert(String topic, ByteBuffer buffer) throws ConverterException { diff --git a/core/src/main/java/kafka/automq/table/process/transform/DebeziumUnwrapTransform.java b/core/src/main/java/kafka/automq/table/process/transform/DebeziumUnwrapTransform.java index 853454d08a..3f2a3f0835 100644 --- a/core/src/main/java/kafka/automq/table/process/transform/DebeziumUnwrapTransform.java +++ b/core/src/main/java/kafka/automq/table/process/transform/DebeziumUnwrapTransform.java @@ -25,8 +25,8 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +35,8 @@ import java.util.Map; import java.util.Objects; +import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap; + /** * Transform for unwrapping Debezium CDC formatted records. * @@ -83,6 +85,10 @@ public class DebeziumUnwrapTransform implements Transform { .optionalString(CDC_FIELD_SOURCE) .endRecord(); + // Cache enriched schemas keyed by base schema fingerprint (bounded, concurrent) + private static final int ENRICHED_SCHEMA_CACHE_MAX = 1024; + private final Map enrichedSchemaCache = new BoundedConcurrentHashMap<>(ENRICHED_SCHEMA_CACHE_MAX); + @Override public void configure(Map configs) { // ignore @@ -172,39 +178,38 @@ private GenericRecord enrichWithMetadata(GenericRecord businessData, TransformContext context) throws TransformException { try { Schema schemaWithMetadata = createSchemaWithMetadata(businessData.getSchema()); - GenericRecordBuilder builder = new GenericRecordBuilder(schemaWithMetadata); + // Build the enriched record using GenericData.Record to reduce allocations + GenericData.Record result = new GenericData.Record(schemaWithMetadata); for (Schema.Field field : businessData.getSchema().getFields()) { - builder.set(field.name(), businessData.get(field.name())); + result.put(field.name(), businessData.get(field.name())); } - Schema cdcSchema = schemaWithMetadata.getField(CDC_RECORD_NAME).schema(); - GenericRecordBuilder cdcBuilder = new GenericRecordBuilder(cdcSchema); - - cdcBuilder.set(CDC_FIELD_OP, mapOperation(operation)); + GenericData.Record cdc = new GenericData.Record(CDC_SCHEMA); + cdc.put(CDC_FIELD_OP, mapOperation(operation)); Object tsMs = debeziumRecord.get(FIELD_TS_MS); if (tsMs instanceof Long) { - cdcBuilder.set(CDC_FIELD_TS, tsMs); + cdc.put(CDC_FIELD_TS, tsMs); } - cdcBuilder.set(CDC_FIELD_OFFSET, context.getKafkaRecord().offset()); + cdc.put(CDC_FIELD_OFFSET, context.getKafkaRecord().offset()); GenericRecord source = getRecordValue(debeziumRecord, FIELD_SOURCE); if (source != null) { - String schema = null; + String schemaName = null; if (source.hasField("schema")) { - schema = getStringValue(source, "schema"); + schemaName = getStringValue(source, "schema"); } - String db = (schema == null) ? getStringValue(source, "db") : schema; + String db = (schemaName == null) ? getStringValue(source, "db") : schemaName; String table = getStringValue(source, "table"); if (db != null && table != null) { - cdcBuilder.set(CDC_FIELD_SOURCE, db + "." + table); + cdc.put(CDC_FIELD_SOURCE, db + "." + table); } } - builder.set(CDC_RECORD_NAME, cdcBuilder.build()); - return builder.build(); + result.put(CDC_RECORD_NAME, cdc); + return result; } catch (Exception e) { throw new TransformException("Failed to enrich record with Debezium metadata:" + e.getMessage(), e); @@ -224,22 +229,25 @@ private String mapOperation(String originalOp) { } private Schema createSchemaWithMetadata(Schema originalSchema) { - List enhancedFields = new ArrayList<>(); - for (Schema.Field field : originalSchema.getFields()) { - enhancedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); - } - enhancedFields.add(new Schema.Field(CDC_RECORD_NAME, CDC_SCHEMA, "CDC metadata", null)); - - String enhancedName = originalSchema.getName() != null ? - originalSchema.getName() + "_cdc_enriched" : "enriched_record"; - - return Schema.createRecord( - enhancedName, - "Record enriched with CDC metadata", - originalSchema.getNamespace(), - false, - enhancedFields - ); + long fp = org.apache.avro.SchemaNormalization.parsingFingerprint64(originalSchema); + return enrichedSchemaCache.computeIfAbsent(fp, k -> { + List enhancedFields = new ArrayList<>(); + for (Schema.Field field : originalSchema.getFields()) { + enhancedFields.add(new Schema.Field(field, field.schema())); + } + enhancedFields.add(new Schema.Field(CDC_RECORD_NAME, CDC_SCHEMA, "CDC metadata", null)); + + String enhancedName = originalSchema.getName() != null ? + originalSchema.getName() + "_cdc_enriched" : "enriched_record"; + + return Schema.createRecord( + enhancedName, + "Record enriched with CDC metadata", + originalSchema.getNamespace(), + false, + enhancedFields + ); + }); } diff --git a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java index 8eb44ef4b7..b9d9421827 100644 --- a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java +++ b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java @@ -158,6 +158,54 @@ private static GenericRecord serializeAndDeserialize(GenericRecord record, Schem } } + private static Map toStringKeyMap(Object value) { + if (value == null) { + return null; + } + Map map = (Map) value; + Map result = new HashMap<>(map.size()); + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey() == null ? null : entry.getKey().toString(); + result.put(key, normalizeValue(entry.getValue())); + } + return result; + } + + private static Object normalizeValue(Object value) { + if (value == null) { + return null; + } + if (value instanceof CharSequence) { + return value.toString(); + } + if (value instanceof List) { + List list = (List) value; + List normalized = new ArrayList<>(list.size()); + for (Object element : list) { + normalized.add(normalizeValue(element)); + } + return normalized; + } + if (value instanceof Map) { + return toStringKeyMap(value); + } + return value; + } + + private static Map normalizeMapValues(Object value) { + if (value == null) { + return null; + } + Map map = (Map) value; + Map result = new HashMap<>(map.size()); + for (Map.Entry entry : map.entrySet()) { + @SuppressWarnings("unchecked") + K key = (K) entry.getKey(); + result.put(key, normalizeValue(entry.getValue())); + } + return result; + } + @Test public void testSchemaEvolution() { @@ -190,7 +238,7 @@ public void testSchemaEvolution() { Record bind = recordBinder.bind(avroRecord); assertEquals(12345L, bind.get(0)); // id - assertEquals("John Doe", bind.get(1)); // name + assertEquals("John Doe", bind.get(1).toString()); // name assertNull(bind.get(2)); // age - doesn't exist in Avro record } @@ -222,7 +270,7 @@ public void testWrapperReusability() { Record bind1 = recordBinder.bind(record1); assertEquals(1L, bind1.get(0)); - assertEquals("Alice", bind1.get(1)); + assertEquals("Alice", bind1.get(1).toString()); // Reuse wrapper for second record GenericRecord record2 = new GenericData.Record(avroSchema); @@ -231,7 +279,7 @@ public void testWrapperReusability() { Record bind2 = recordBinder.bind(record2); assertEquals(2L, bind2.get(0)); - assertEquals("Bob", bind2.get(1)); + assertEquals("Bob", bind2.get(1).toString()); } @@ -258,7 +306,7 @@ public void testStringConversion() { Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); // Verify the field value - assertEquals("test_string", icebergRecord.getField("stringField")); + assertEquals("test_string", icebergRecord.getField("stringField").toString()); // Send the record to the table testSendRecord(icebergSchema, icebergRecord); @@ -697,7 +745,7 @@ public void testListConversion() { Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); // Verify the field value - assertEquals(Arrays.asList("a", "b", "c"), icebergRecord.getField("listField")); + assertEquals(Arrays.asList("a", "b", "c"), normalizeValue(icebergRecord.getField("listField"))); // Send the record to the table testSendRecord(icebergSchema, icebergRecord); @@ -727,7 +775,7 @@ public void testStringMapConversion() { Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); // Verify the field value - assertEquals(map, icebergRecord.getField("mapField")); + assertEquals(map, normalizeValue(icebergRecord.getField("mapField"))); // Send the record to the table testSendRecord(icebergSchema, icebergRecord); @@ -757,7 +805,7 @@ public void testIntMapConversion() { Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); // Verify the field value - assertEquals(map, icebergRecord.getField("mapField")); + assertEquals(map, normalizeValue(icebergRecord.getField("mapField"))); // Send the record to the table testSendRecord(icebergSchema, icebergRecord); @@ -812,8 +860,7 @@ public void testMapWithNonStringKeysConversion() { Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); // Convert the list of records back to a map - @SuppressWarnings("unchecked") - Map mapField = (Map) icebergRecord.getField("mapField"); + Map mapField = normalizeMapValues(icebergRecord.getField("mapField")); // Verify the field value assertEquals(expectedMap, mapField); @@ -856,7 +903,7 @@ public void testNestedRecordConversion() { // Verify the field values Record nestedIcebergRecord = (Record) icebergRecord.getField("nestedField"); - assertEquals("nested_string", nestedIcebergRecord.getField("nestedStringField")); + assertEquals("nested_string", nestedIcebergRecord.getField("nestedStringField").toString()); assertEquals(42, nestedIcebergRecord.getField("nestedIntField")); // Send the record to the table @@ -891,7 +938,7 @@ public void testOptionalFieldConversion() { Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); // Verify the field values - assertEquals("optional_string", icebergRecord.getField("optionalStringField")); + assertEquals("optional_string", icebergRecord.getField("optionalStringField").toString()); assertEquals(42, icebergRecord.getField("optionalIntField")); assertNull(icebergRecord.getField("optionalStringNullField")); assertNull(icebergRecord.getField("optionalIntNullField")); @@ -925,7 +972,7 @@ public void testDefaultFieldConversion() { Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); // Verify the field values - assertEquals("default_string", icebergRecord.getField("defaultStringField")); + assertEquals("default_string", icebergRecord.getField("defaultStringField").toString()); assertEquals(42, icebergRecord.getField("defaultIntField")); // Send the record to the table @@ -973,7 +1020,7 @@ public void testUnionFieldConversion() { // Verify the field value Object unionField1 = icebergRecord.getField("unionField1"); - assertEquals("union_string", unionField1); + assertEquals("union_string", unionField1.toString()); Object unionField2 = icebergRecord.getField("unionField2"); assertEquals(42, unionField2); @@ -1118,8 +1165,8 @@ public void testFieldCountWithComplexTypes() { Record icebergRecord = recordBinder.bind(avroRecord); // Access fields to trigger counting - assertEquals(Arrays.asList("a", "b", "c"), icebergRecord.getField("stringList")); - assertEquals(map, icebergRecord.getField("stringMap")); + assertEquals(Arrays.asList("a", "b", "c"), normalizeValue(icebergRecord.getField("stringList"))); + assertEquals(map, normalizeValue(icebergRecord.getField("stringMap"))); // Total: 4 (list) + 5 (map) = 9 fields long fieldCount = recordBinder.getAndResetFieldCount(); diff --git a/core/src/test/java/kafka/automq/table/process/DefaultRecordProcessorTest.java b/core/src/test/java/kafka/automq/table/process/DefaultRecordProcessorTest.java index d115e353a7..c7d3683cde 100644 --- a/core/src/test/java/kafka/automq/table/process/DefaultRecordProcessorTest.java +++ b/core/src/test/java/kafka/automq/table/process/DefaultRecordProcessorTest.java @@ -24,8 +24,12 @@ import kafka.automq.table.process.convert.RawConverter; import kafka.automq.table.process.convert.StringConverter; import kafka.automq.table.process.exception.ConverterException; +import kafka.automq.table.process.exception.InvalidDataException; +import kafka.automq.table.process.exception.SchemaRegistrySystemException; +import kafka.automq.table.process.exception.TransformException; import kafka.automq.table.process.transform.FlattenTransform; +import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.Record; @@ -39,19 +43,26 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroSerializer; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("S3Unit") @@ -251,8 +262,262 @@ void testHeaderProcessing() { assertEquals(ByteBuffer.wrap("v1".getBytes()), headerMap.get("h1")); } + @Test + void testProcessHeadersWithMultipleEntriesIncludingNullValue() { + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, new RawConverter(), new RawConverter()); + Header[] headers = { + new RecordHeader("h1", "v1".getBytes(StandardCharsets.UTF_8)), + new RecordHeader("h2", null), + new RecordHeader("h3", "v3".getBytes(StandardCharsets.UTF_8)) + }; + Record kafkaRecord = createKafkaRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), headers); + + ProcessingResult result = processor.process(TEST_PARTITION, kafkaRecord); + + assertTrue(result.isSuccess()); + GenericRecord finalRecord = result.getFinalRecord(); + @SuppressWarnings("unchecked") + Map headerMap = (Map) finalRecord.get(RecordAssembler.KAFKA_HEADER_FIELD); + assertEquals(3, headerMap.size()); + assertEquals(ByteBuffer.wrap("v1".getBytes(StandardCharsets.UTF_8)), headerMap.get("h1")); + assertNull(headerMap.get("h2")); + assertEquals(ByteBuffer.wrap("v3".getBytes(StandardCharsets.UTF_8)), headerMap.get("h3")); + } + + @Test + void testProcessHeadersReuseEmptyResultInstance() { + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, new RawConverter(), new RawConverter()); + + Record recordWithoutHeaders = createKafkaRecord("key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8), null); + ProcessingResult firstResult = processor.process(TEST_PARTITION, recordWithoutHeaders); + assertTrue(firstResult.isSuccess()); + @SuppressWarnings("unchecked") + Map firstHeaders = (Map) firstResult.getFinalRecord().get(RecordAssembler.KAFKA_HEADER_FIELD); + assertTrue(firstHeaders.isEmpty()); + + Record recordWithEmptyHeaders = createKafkaRecord("key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8), new Header[0]); + ProcessingResult secondResult = processor.process(TEST_PARTITION, recordWithEmptyHeaders); + assertTrue(secondResult.isSuccess()); + @SuppressWarnings("unchecked") + Map secondHeaders = (Map) secondResult.getFinalRecord().get(RecordAssembler.KAFKA_HEADER_FIELD); + assertTrue(secondHeaders.isEmpty()); + assertSame(firstHeaders, secondHeaders); + } + + @Test + void testMetadataFieldsPopulatedOnSuccess() { + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, new RawConverter(), new RawConverter()); + int partition = 7; + long offset = 456L; + long timestamp = 1_234_567_890L; + Record kafkaRecord = new SimpleRecord(offset, timestamp, + "k".getBytes(StandardCharsets.UTF_8), "v".getBytes(StandardCharsets.UTF_8), new Header[0]); + + ProcessingResult result = processor.process(partition, kafkaRecord); + + assertTrue(result.isSuccess()); + GenericRecord finalRecord = result.getFinalRecord(); + GenericRecord metadata = (GenericRecord) finalRecord.get(RecordAssembler.KAFKA_METADATA_FIELD); + assertNotNull(metadata); + assertEquals(partition, ((Integer) metadata.get(RecordAssembler.METADATA_PARTITION_FIELD)).intValue()); + assertEquals(offset, ((Long) metadata.get(RecordAssembler.METADATA_OFFSET_FIELD)).longValue()); + assertEquals(timestamp, ((Long) metadata.get(RecordAssembler.METADATA_TIMESTAMP_FIELD)).longValue()); + } + + @Test + void testWrapValueSchemaCacheReusedBetweenCalls() { + Schema valueSchema = SchemaBuilder.record("CachedValue") + .namespace("kafka.automq.table.process.test") + .fields() + .name("field").type().stringType().noDefault() + .endRecord(); + AtomicBoolean alternateIdentity = new AtomicBoolean(false); + + Converter keyConverter = new StringConverter(); + Converter valueConverter = (topic, buffer) -> { + GenericRecord record = new GenericRecordBuilder(valueSchema) + .set("field", alternateIdentity.get() ? "value-b" : "value-a") + .build(); + String identity = alternateIdentity.get() ? "value-schema-b" : "value-schema-a"; + return new ConversionResult(record, identity); + }; + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, keyConverter, valueConverter); + + Record kafkaRecord1 = createKafkaRecord("key1".getBytes(StandardCharsets.UTF_8), "value1".getBytes(StandardCharsets.UTF_8), new Header[0]); + ProcessingResult result1 = processor.process(TEST_PARTITION, kafkaRecord1); + assertTrue(result1.isSuccess()); + Cache cache = extractValueWrapperSchemaCache(processor); + assertEquals(1L, cache.size()); + + Record kafkaRecord2 = createKafkaRecord("key2".getBytes(StandardCharsets.UTF_8), "value2".getBytes(StandardCharsets.UTF_8), new Header[0]); + ProcessingResult result2 = processor.process(TEST_PARTITION, kafkaRecord2); + assertTrue(result2.isSuccess()); + assertEquals(1L, cache.size()); + + alternateIdentity.set(true); + Record kafkaRecord3 = createKafkaRecord("key3".getBytes(StandardCharsets.UTF_8), "value3".getBytes(StandardCharsets.UTF_8), new Header[0]); + ProcessingResult result3 = processor.process(TEST_PARTITION, kafkaRecord3); + assertTrue(result3.isSuccess()); + assertEquals(2L, cache.size()); + } + + @Test + void testCompositeSchemaIdentityReflectsTransformChain() { + Schema valueSchema = SchemaBuilder.record("IdentityRecord") + .namespace("kafka.automq.table.process.test") + .fields() + .name("field").type().stringType().noDefault() + .endRecord(); + Converter keyConverter = new StringConverter(); + Converter valueConverter = (topic, buffer) -> { + GenericRecord record = new GenericRecordBuilder(valueSchema) + .set("field", "payload") + .build(); + return new ConversionResult(record, "identity-value"); + }; + + DefaultRecordProcessor ordered = new DefaultRecordProcessor(TEST_TOPIC, keyConverter, valueConverter, + List.of(new NamedPassthroughTransform("A"), new NamedPassthroughTransform("B"))); + Record kafkaRecord = createKafkaRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), new Header[0]); + ProcessingResult orderedResult = ordered.process(TEST_PARTITION, kafkaRecord); + assertTrue(orderedResult.isSuccess()); + String orderedIdentity = orderedResult.getFinalSchemaIdentity(); + assertTrue(orderedIdentity.endsWith("|t:A,B")); + + DefaultRecordProcessor reversed = new DefaultRecordProcessor(TEST_TOPIC, keyConverter, valueConverter, + List.of(new NamedPassthroughTransform("B"), new NamedPassthroughTransform("A"))); + ProcessingResult reversedResult = reversed.process(TEST_PARTITION, kafkaRecord); + assertTrue(reversedResult.isSuccess()); + assertNotEquals(orderedIdentity, reversedResult.getFinalSchemaIdentity()); + } + + @Test + void testTransformReturningNullProducesError() { + Schema valueSchema = SchemaBuilder.record("NullRecord") + .namespace("kafka.automq.table.process.test") + .fields() + .name("field").type().stringType().noDefault() + .endRecord(); + Converter keyConverter = new StringConverter(); + Converter valueConverter = (topic, buffer) -> { + GenericRecord record = new GenericRecordBuilder(valueSchema) + .set("field", "payload") + .build(); + return new ConversionResult(record, "null-transform-identity"); + }; + + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, keyConverter, valueConverter, List.of(new NullingTransform())); + Record kafkaRecord = createKafkaRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), new Header[0]); + + ProcessingResult result = processor.process(TEST_PARTITION, kafkaRecord); + + assertFalse(result.isSuccess()); + assertEquals(DataError.ErrorType.TRANSFORMATION_ERROR, result.getError().getType()); + assertTrue(result.getError().getMessage().contains("NullingTransform")); + } + + @Test + void testTransformThrowsInvalidDataException() { + Schema valueSchema = SchemaBuilder.record("InvalidRecord") + .namespace("kafka.automq.table.process.test") + .fields() + .name("field").type().stringType().noDefault() + .endRecord(); + Converter keyConverter = new StringConverter(); + Converter valueConverter = (topic, buffer) -> { + GenericRecord record = new GenericRecordBuilder(valueSchema) + .set("field", "payload") + .build(); + return new ConversionResult(record, "invalid-transform-identity"); + }; + + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, keyConverter, valueConverter, List.of(new InvalidDataThrowingTransform())); + Record kafkaRecord = createKafkaRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), new Header[0]); + + ProcessingResult result = processor.process(TEST_PARTITION, kafkaRecord); + + assertFalse(result.isSuccess()); + assertEquals(DataError.ErrorType.DATA_ERROR, result.getError().getType()); + assertTrue(result.getError().getMessage().contains("Invalid data")); + } + + @Test + void testTransformThrowsTransformException() { + Schema valueSchema = SchemaBuilder.record("TransformRecord") + .namespace("kafka.automq.table.process.test") + .fields() + .name("field").type().stringType().noDefault() + .endRecord(); + Converter keyConverter = new StringConverter(); + Converter valueConverter = (topic, buffer) -> { + GenericRecord record = new GenericRecordBuilder(valueSchema) + .set("field", "payload") + .build(); + return new ConversionResult(record, "transform-exception-identity"); + }; + + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, keyConverter, valueConverter, List.of(new ThrowingTransform())); + Record kafkaRecord = createKafkaRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), new Header[0]); + + ProcessingResult result = processor.process(TEST_PARTITION, kafkaRecord); + + assertFalse(result.isSuccess()); + assertEquals(DataError.ErrorType.TRANSFORMATION_ERROR, result.getError().getType()); + assertTrue(result.getError().getMessage().contains("transform failure")); + } + + @Test + void testConverterRestClientNotFoundReturnsDataError() { + Converter restNotFoundConverter = (topic, buffer) -> { + RestClientException restException = new RestClientException("missing", HTTP_NOT_FOUND, 40403); + throw new RuntimeException("wrapper", restException); + }; + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, new RawConverter(), restNotFoundConverter); + Record kafkaRecord = createKafkaRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), null); + + ProcessingResult result = processor.process(TEST_PARTITION, kafkaRecord); + + assertFalse(result.isSuccess()); + assertEquals(DataError.ErrorType.CONVERT_ERROR, result.getError().getType()); + String errorMessage = result.getError().getMessage(); + assertNotNull(errorMessage); + assertTrue(errorMessage.contains("Schema or subject not found for record"), () -> "actual message: " + errorMessage); + assertTrue(errorMessage.contains("topic=" + TEST_TOPIC), () -> "actual message: " + errorMessage); + } + + @Test + void testConverterRestClientServerErrorPropagates() { + Converter restErrorConverter = (topic, buffer) -> { + RestClientException restException = new RestClientException("server", 500, 50001); + throw new RuntimeException("wrapper", restException); + }; + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, new RawConverter(), restErrorConverter); + Record kafkaRecord = createKafkaRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), null); + + assertThrows(SchemaRegistrySystemException.class, () -> processor.process(TEST_PARTITION, kafkaRecord)); + } + + @Test + void testProcessWithNullRecordThrows() { + DefaultRecordProcessor processor = new DefaultRecordProcessor(TEST_TOPIC, new RawConverter(), new RawConverter()); + + assertThrows(NullPointerException.class, () -> processor.process(TEST_PARTITION, null)); + } + + private Cache extractValueWrapperSchemaCache(DefaultRecordProcessor processor) { + try { + Field cacheField = DefaultRecordProcessor.class.getDeclaredField("valueWrapperSchemaCache"); + cacheField.setAccessible(true); + @SuppressWarnings("unchecked") + Cache cache = (Cache) cacheField.get(processor); + return cache; + } catch (ReflectiveOperationException e) { + throw new AssertionError("Failed to access valueWrapperSchemaCache", e); + } + } + /** - * A simplified implementation of the Record interface for testing purposes. + * Test helper implementations. */ private static class SimpleRecord implements Record { private final long offset; @@ -345,4 +610,68 @@ public Header[] headers() { return headers; } } + + private static final class NamedPassthroughTransform implements Transform { + private final String name; + + private NamedPassthroughTransform(String name) { + this.name = name; + } + + @Override + public void configure(Map configs) { + // no-op + } + + @Override + public GenericRecord apply(GenericRecord record, TransformContext context) { + return record; + } + + @Override + public String getName() { + return name; + } + } + + private static final class NullingTransform implements Transform { + @Override + public void configure(Map configs) { + // no-op + } + + @Override + public GenericRecord apply(GenericRecord record, TransformContext context) { + return null; + } + } + + private static final class InvalidDataThrowingTransform implements Transform { + @Override + public void configure(Map configs) { + // no-op + } + + @Override + public GenericRecord apply(GenericRecord record, TransformContext context) { + throw new InvalidDataException("Invalid data from transform"); + } + + @Override + public String getName() { + return "InvalidDataTransform"; + } + } + + private static final class ThrowingTransform implements Transform { + @Override + public void configure(Map configs) { + // no-op + } + + @Override + public GenericRecord apply(GenericRecord record, TransformContext context) { + throw new TransformException("transform failure"); + } + } } diff --git a/core/src/test/java/kafka/automq/table/process/RecordAssemblerTest.java b/core/src/test/java/kafka/automq/table/process/RecordAssemblerTest.java new file mode 100644 index 0000000000..de3fa1470a --- /dev/null +++ b/core/src/test/java/kafka/automq/table/process/RecordAssemblerTest.java @@ -0,0 +1,171 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.process; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +@Tag("S3Unit") +class RecordAssemblerTest { + + @Test + void ensureOptionalShouldWrapNonUnionSchema() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + + Schema optionalSchema = RecordAssembler.ensureOptional(stringSchema); + + assertEquals(Schema.Type.UNION, optionalSchema.getType()); + List types = optionalSchema.getTypes(); + assertEquals(2, types.size()); + assertEquals(Schema.Type.NULL, types.get(0).getType()); + assertSame(stringSchema, types.get(1)); + } + + @Test + void ensureOptionalShouldPrefixNullWhenMissing() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + Schema intSchema = Schema.create(Schema.Type.INT); + Schema unionWithoutNull = Schema.createUnion(List.of(stringSchema, intSchema)); + + Schema optionalSchema = RecordAssembler.ensureOptional(unionWithoutNull); + + assertEquals(Schema.Type.UNION, optionalSchema.getType()); + List types = optionalSchema.getTypes(); + assertEquals(3, types.size()); + assertEquals(Schema.Type.NULL, types.get(0).getType()); + assertSame(stringSchema, types.get(1)); + assertSame(intSchema, types.get(2)); + } + + @Test + void ensureOptionalShouldReturnOriginalUnionWhenNullPresent() { + Schema unionWithNull = Schema.createUnion(List.of(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + + Schema result = RecordAssembler.ensureOptional(unionWithNull); + + assertSame(unionWithNull, result); + } + + @Test + void assembleShouldExposeOptionalSyntheticFields() { + Schema baseSchema = SchemaBuilder.record("BaseRecord") + .namespace("kafka.automq.table.process.test") + .fields() + .name("name").type().stringType().noDefault() + .endRecord(); + GenericRecord baseRecord = new GenericData.Record(baseSchema); + baseRecord.put("name", "Alice"); + + Schema headerSchema = SchemaBuilder.record("HeaderRecord") + .namespace("kafka.automq.table.process.test") + .fields() + .name("headerKey").type().stringType().noDefault() + .endRecord(); + GenericRecord headerRecord = new GenericData.Record(headerSchema); + headerRecord.put("headerKey", "headerValue"); + ConversionResult headerResult = new ConversionResult(headerRecord, "header-identity"); + + Schema keySchema = Schema.create(Schema.Type.BYTES); + ByteBuffer keyValue = ByteBuffer.wrap("key-value".getBytes(StandardCharsets.UTF_8)); + ConversionResult keyResult = new ConversionResult(keyValue, keySchema, "key-identity"); + + int partition = 5; + long offset = 42L; + long timestamp = 1_700_000_000_000L; + + RecordAssembler assembler = new RecordAssembler(); + GenericRecord assembledRecord = assembler.reset(baseRecord) + .withHeader(headerResult) + .withKey(keyResult) + .withMetadata(partition, offset, timestamp) + .assemble(); + + Schema finalSchema = assembledRecord.getSchema(); + assertEquals(4, finalSchema.getFields().size()); + + Schema headerFieldSchema = finalSchema.getField(RecordAssembler.KAFKA_HEADER_FIELD).schema(); + assertEquals(Schema.Type.UNION, headerFieldSchema.getType()); + assertEquals(Schema.Type.NULL, headerFieldSchema.getTypes().get(0).getType()); + assertSame(headerSchema, headerFieldSchema.getTypes().get(1)); + + Schema keyFieldSchema = finalSchema.getField(RecordAssembler.KAFKA_KEY_FIELD).schema(); + assertEquals(Schema.Type.UNION, keyFieldSchema.getType()); + assertEquals(Schema.Type.NULL, keyFieldSchema.getTypes().get(0).getType()); + assertSame(keySchema, keyFieldSchema.getTypes().get(1)); + + Schema metadataFieldSchema = finalSchema.getField(RecordAssembler.KAFKA_METADATA_FIELD).schema(); + assertEquals(Schema.Type.UNION, metadataFieldSchema.getType()); + assertEquals(Schema.Type.NULL, metadataFieldSchema.getTypes().get(0).getType()); + Schema metadataSchema = metadataFieldSchema.getTypes().get(1); + assertEquals("KafkaMetadata", metadataSchema.getName()); + + assertEquals("Alice", assembledRecord.get("name").toString()); + assertSame(headerRecord, assembledRecord.get(RecordAssembler.KAFKA_HEADER_FIELD)); + assertSame(keyValue, assembledRecord.get(RecordAssembler.KAFKA_KEY_FIELD)); + + GenericRecord metadataRecord = (GenericRecord) assembledRecord.get(RecordAssembler.KAFKA_METADATA_FIELD); + assertNotNull(metadataRecord); + assertEquals(partition, metadataRecord.get(RecordAssembler.METADATA_PARTITION_FIELD)); + assertEquals(offset, metadataRecord.get(RecordAssembler.METADATA_OFFSET_FIELD)); + assertEquals(timestamp, metadataRecord.get(RecordAssembler.METADATA_TIMESTAMP_FIELD)); + } + + @Test + void assembleShouldSkipHeaderWhenAbsent() { + Schema baseSchema = SchemaBuilder.record("BaseRecordNoHeader") + .namespace("kafka.automq.table.process.test") + .fields() + .name("id").type().longType().noDefault() + .endRecord(); + GenericRecord baseRecord = new GenericData.Record(baseSchema); + baseRecord.put("id", 100L); + + Schema keySchema = Schema.create(Schema.Type.STRING); + ConversionResult keyResult = new ConversionResult("primary-key", keySchema, "key-identity"); + + RecordAssembler assembler = new RecordAssembler(); + GenericRecord assembledRecord = assembler.reset(baseRecord) + .withKey(keyResult) + .withMetadata(1, 2L, 3L) + .assemble(); + + Schema finalSchema = assembledRecord.getSchema(); + assertNull(finalSchema.getField(RecordAssembler.KAFKA_HEADER_FIELD)); + assertNotNull(finalSchema.getField(RecordAssembler.KAFKA_KEY_FIELD)); + assertNotNull(finalSchema.getField(RecordAssembler.KAFKA_METADATA_FIELD)); + + assertEquals("primary-key", assembledRecord.get(RecordAssembler.KAFKA_KEY_FIELD)); + GenericRecord metadataRecord = (GenericRecord) assembledRecord.get(RecordAssembler.KAFKA_METADATA_FIELD); + assertNotNull(metadataRecord.getSchema().getField(RecordAssembler.METADATA_PARTITION_FIELD)); + } +}