Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public class AvroValueAdapter extends AbstractTypeAdapter<Schema> {

@Override
protected Object convertString(Object sourceValue, Schema sourceSchema, Type targetType) {
if (sourceValue instanceof Utf8 || sourceValue instanceof GenericData.EnumSymbol) {
if (sourceValue instanceof Utf8) {
return sourceValue;
}
if (sourceValue instanceof GenericData.EnumSymbol) {
return sourceValue.toString();
}
return super.convertString(sourceValue, sourceSchema, targetType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private long calculateFieldCount(Object value, Type icebergType) {

switch (icebergType.typeId()) {
case STRING:
return FieldMetric.count((String) value);
return FieldMetric.count((CharSequence) value);
case BINARY:
return FieldMetric.count((ByteBuffer) value);
case FIXED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@

package kafka.automq.table.metric;

import org.apache.avro.util.Utf8;

import java.nio.ByteBuffer;

public class FieldMetric {

public static int count(String value) {
public static int count(CharSequence value) {
if (value == null) {
return 0;
}
if (value instanceof Utf8) {
return (((Utf8) value).getByteLength() + 23) / 24;
}
if (value.isEmpty()) {
return 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import kafka.automq.table.process.exception.SchemaRegistrySystemException;
import kafka.automq.table.process.exception.TransformException;

import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.Record;

Expand All @@ -45,6 +47,7 @@

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static kafka.automq.table.process.RecordAssembler.KAFKA_VALUE_FIELD;
import static kafka.automq.table.process.RecordAssembler.ensureOptional;

/**
* Default implementation of RecordProcessor using a two-stage processing pipeline.
Expand All @@ -56,23 +59,41 @@
public class DefaultRecordProcessor implements RecordProcessor {
private static final Schema HEADER_SCHEMA = Schema.createMap(Schema.create(Schema.Type.BYTES));
private static final String HEADER_SCHEMA_IDENTITY = String.valueOf(HEADER_SCHEMA.hashCode());
private static final ConversionResult EMPTY_HEADERS_RESULT =
new ConversionResult(Map.of(), HEADER_SCHEMA, HEADER_SCHEMA_IDENTITY);
private final String topicName;
private final Converter keyConverter;
private final Converter valueConverter;
private final List<Transform> transformChain;
private final RecordAssembler recordAssembler; // Reusable assembler
private final String transformIdentity; // precomputed transform chain identity

private static final int VALUE_WRAPPER_SCHEMA_CACHE_MAX = 32;
private final Cache<String, Schema> valueWrapperSchemaCache = new LRUCache<>(VALUE_WRAPPER_SCHEMA_CACHE_MAX);

public DefaultRecordProcessor(String topicName, Converter keyConverter, Converter valueConverter) {
this.transformChain = new ArrayList<>();
this.topicName = topicName;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.recordAssembler = new RecordAssembler();
this.transformIdentity = ""; // no transforms
}

public DefaultRecordProcessor(String topicName, Converter keyConverter, Converter valueConverter, List<Transform> transforms) {
this.transformChain = transforms;
this.topicName = topicName;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.recordAssembler = new RecordAssembler();

// Precompute transform identity (names joined by comma)
StringBuilder sb = new StringBuilder();
for (int i = 0; i < this.transformChain.size(); i++) {
if (i > 0) sb.append(',');
sb.append(this.transformChain.get(i).getName());
}
this.transformIdentity = sb.toString();
}

@Override
Expand All @@ -86,15 +107,19 @@ public ProcessingResult process(int partition, Record kafkaRecord) {

GenericRecord baseRecord = wrapValue(valueResult);
GenericRecord transformedRecord = applyTransformChain(baseRecord, partition, kafkaRecord);
GenericRecord finalRecord = new RecordAssembler(transformedRecord)

String schemaIdentity = generateCompositeSchemaIdentity(headerResult, keyResult, valueResult);

GenericRecord record = recordAssembler
.reset(transformedRecord)
.withHeader(headerResult)
.withKey(keyResult)
.withSchemaIdentity(schemaIdentity)
.withMetadata(partition, kafkaRecord.offset(), kafkaRecord.timestamp())
.assemble();
Schema finalSchema = finalRecord.getSchema();
String schemaIdentity = generateCompositeSchemaIdentity(headerResult, keyResult, valueResult, transformChain);
Schema schema = record.getSchema();

return new ProcessingResult(finalRecord, finalSchema, schemaIdentity);
return new ProcessingResult(record, schema, schemaIdentity);
} catch (ConverterException e) {
return getProcessingResult(kafkaRecord, "Convert operation failed for record: %s", DataError.ErrorType.CONVERT_ERROR, e);
} catch (TransformException e) {
Expand Down Expand Up @@ -132,14 +157,26 @@ private ProcessingResult getProcessingResult(Record kafkaRecord, String format,

private ConversionResult processHeaders(Record kafkaRecord) throws ConverterException {
try {
Map<String, ByteBuffer> headers = new HashMap<>();
Header[] recordHeaders = kafkaRecord.headers();
if (recordHeaders != null) {
for (Header header : recordHeaders) {
ByteBuffer value = header.value() != null ?
ByteBuffer.wrap(header.value()) : null;
headers.put(header.key(), value);
}
if (recordHeaders == null || recordHeaders.length == 0) {
return EMPTY_HEADERS_RESULT;
}

int n = recordHeaders.length;

// Small maps: use Map.of for zero/one header handled above; for one here (defensive), use Map.of
if (n == 1) {
Header h = recordHeaders[0];
ByteBuffer value = h.value() != null ? ByteBuffer.wrap(h.value()) : null;
Map<String, ByteBuffer> headers = Map.of(h.key(), value);
return new ConversionResult(headers, HEADER_SCHEMA, HEADER_SCHEMA_IDENTITY);
}

// Larger maps: pre-size HashMap
Map<String, ByteBuffer> headers = new HashMap<>(Math.max(16, (int) (n / 0.75f) + 1));
for (Header header : recordHeaders) {
ByteBuffer value = header.value() != null ? ByteBuffer.wrap(header.value()) : null;
headers.put(header.key(), value);
}
return new ConversionResult(headers, HEADER_SCHEMA, HEADER_SCHEMA_IDENTITY);
} catch (Exception e) {
Expand All @@ -148,15 +185,21 @@ private ConversionResult processHeaders(Record kafkaRecord) throws ConverterExce
}

private GenericRecord wrapValue(ConversionResult valueResult) {
Schema.Field valueField = new Schema.Field(KAFKA_VALUE_FIELD, valueResult.getSchema(), null, null);
Object valueContent = valueResult.getValue();

Schema recordSchema = Schema.createRecord("KafkaValueWrapper", null, "kafka.automq.table.process", false);
recordSchema.setFields(Collections.singletonList(valueField));
Schema recordSchema = valueWrapperSchemaCache.get(valueResult.getSchemaIdentity());
if (recordSchema == null) {
Schema.Field valueField = new Schema.Field(
KAFKA_VALUE_FIELD,
ensureOptional(valueResult.getSchema()),
null, null);
Schema schema = Schema.createRecord("KafkaValueWrapper", null, "kafka.automq.table.process", false);
schema.setFields(Collections.singletonList(valueField));
valueWrapperSchemaCache.put(valueResult.getSchemaIdentity(), schema);
recordSchema = schema;
}

GenericRecord baseRecord = new GenericData.Record(recordSchema);
baseRecord.put(KAFKA_VALUE_FIELD, valueContent);

return baseRecord;
}

Expand All @@ -181,23 +224,12 @@ private GenericRecord applyTransformChain(GenericRecord baseRecord, int partitio
private String generateCompositeSchemaIdentity(
ConversionResult headerResult,
ConversionResult keyResult,
ConversionResult valueResult,
List<Transform> transforms) {

// Extract schema identities with null safety
ConversionResult valueResult) {
// Extract schema identities
String headerIdentity = headerResult.getSchemaIdentity();
String keyIdentity = keyResult.getSchemaIdentity();
String valueIdentity = valueResult.getSchemaIdentity();

// Generate transform chain identity
String transformIdentity = transforms.isEmpty() ?
"noTransform" :
transforms.stream()
.map(Transform::getName)
.collect(java.util.stream.Collectors.joining("->"));

// Join all parts with the identity separator
return String.join("|", headerIdentity, keyIdentity, valueIdentity, transformIdentity);
return "h:" + headerIdentity + "|v:" + valueIdentity + "|k:" + keyIdentity + "|t:" + transformIdentity;
}

@Override
Expand Down
Loading
Loading