diff --git a/core/src/main/java/kafka/automq/table/perf/AvroTestCase.java b/core/src/main/java/kafka/automq/table/perf/AvroTestCase.java new file mode 100644 index 0000000000..dd4a365d47 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/AvroTestCase.java @@ -0,0 +1,155 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import kafka.automq.table.process.Converter; +import kafka.automq.table.process.convert.AvroRegistryConverter; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +class AvroTestCase extends PerfTestCase { + private final Schema schema; + private final PayloadManager payloadManager; + + public AvroTestCase(DataType dataType, int fieldCount, int payloadCount, PerfConfig config) { + super(dataType, "avro", fieldCount, payloadCount, config); + this.schema = createSchema(dataType, fieldCount); + this.payloadManager = new PayloadManager(() -> generatePayloadWithSchema(schema), payloadCount); + } + + @Override + PayloadManager getPayloadManager() { + return payloadManager; + } + + @Override + protected byte[] generatePayload() { + return generatePayloadWithSchema(schema); + } + + private byte[] generatePayloadWithSchema(Schema schemaToUse) { + GenericRecord record = new GenericData.Record(schemaToUse); + fillRecord(record, dataType, fieldCount); + return createAvroValue(record); + } + + @Override + protected Converter createConverter() { + StaticAvroDeserializer deserializer = new StaticAvroDeserializer(schema); + return new AvroRegistryConverter(deserializer, null); + } + + private Schema createSchema(DataType dataType, int fieldCount) { + SchemaBuilder.FieldAssembler assembler = SchemaBuilder.builder() + .record("TestRecord") + .fields(); + + for (int i = 0; i < fieldCount; i++) { + String fieldName = "f" + i; + assembler = addFieldToSchema(assembler, fieldName, dataType); + } + + return assembler.endRecord(); + } + + private SchemaBuilder.FieldAssembler addFieldToSchema(SchemaBuilder.FieldAssembler assembler, String fieldName, DataType dataType) { + return switch (dataType) { + case BOOLEAN -> assembler.name(fieldName).type().booleanType().noDefault(); + case INT -> assembler.name(fieldName).type().intType().noDefault(); + case LONG -> assembler.name(fieldName).type().longType().noDefault(); + case DOUBLE -> assembler.name(fieldName).type().doubleType().noDefault(); + case TIMESTAMP -> assembler.name(fieldName).type(LogicalTypes.timestampMillis() + .addToSchema(Schema.create(Schema.Type.LONG))).withDefault(0); + case STRING -> assembler.name(fieldName).type().stringType().noDefault(); + case BINARY -> assembler.name(fieldName).type().bytesType().noDefault(); + case NESTED -> { + Schema nestedSchema = SchemaBuilder.builder().record("nested").fields() + .name("nf1").type().booleanType().noDefault() + .endRecord(); + yield assembler.name(fieldName).type(nestedSchema).noDefault(); + } + case ARRAY -> assembler.name(fieldName).type().array().items(Schema.create(Schema.Type.BOOLEAN)).noDefault(); + }; + } + + private void fillRecord(GenericRecord record, DataType dataType, int fieldCount) { + for (int i = 0; i < fieldCount; i++) { + String fieldName = "f" + i; + Object value = generateFieldValue(dataType); + record.put(fieldName, value); + } + } + + private Object generateFieldValue(DataType dataType) { + return switch (dataType) { + case BOOLEAN, INT, LONG, DOUBLE, TIMESTAMP, STRING -> dataType.generateValue(); + case BINARY -> { + ByteBuffer buffer = (ByteBuffer) dataType.generateValue(); + yield buffer; + } + case NESTED -> { + // Create nested schema inline + Schema nestedSchema = SchemaBuilder.builder().record("nested").fields() + .name("nf1").type().booleanType().noDefault() + .endRecord(); + GenericRecord nestedRecord = new GenericData.Record(nestedSchema); + nestedRecord.put("nf1", DataType.BOOLEAN.generateValue()); + yield nestedRecord; + } + case ARRAY -> List.of(DataType.BOOLEAN.generateValue()); + }; + } + + private static byte[] createAvroValue(GenericRecord record) { + try { + DatumWriter datumWriter = new GenericDatumWriter<>(record.getSchema()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + datumWriter.write(record, encoder); + encoder.flush(); + byte[] avroBytes = outputStream.toByteArray(); + + ByteBuf buf = Unpooled.buffer(1 + 4 + avroBytes.length); + buf.writeByte((byte) 0x0); + buf.writeInt(0); + buf.writeBytes(avroBytes); + + return buf.array(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/BenchmarkResult.java b/core/src/main/java/kafka/automq/table/perf/BenchmarkResult.java new file mode 100644 index 0000000000..c1c4ffe222 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/BenchmarkResult.java @@ -0,0 +1,87 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import java.util.concurrent.TimeUnit; + +public class BenchmarkResult { + private final String formatName; + private final String dataTypeName; + private final long durationMs; + private final long recordsProcessed; + private final String errorMessage; + + private BenchmarkResult(String formatName, String dataTypeName, long durationMs, long recordsProcessed, String errorMessage) { + this.formatName = formatName; + this.dataTypeName = dataTypeName; + this.durationMs = durationMs; + this.recordsProcessed = recordsProcessed; + this.errorMessage = errorMessage; + } + + public static BenchmarkResult success(String formatName, String dataTypeName, long durationMs, long recordsProcessed) { + return new BenchmarkResult(formatName, dataTypeName, durationMs, recordsProcessed, null); + } + + public static BenchmarkResult failure(String formatName, String dataTypeName, String errorMessage) { + return new BenchmarkResult(formatName, dataTypeName, 0, 0, errorMessage); + } + + public String getFormatName() { + return formatName; + } + + public String getDataTypeName() { + return dataTypeName; + } + + public long getDurationMs() { + return durationMs; + } + + public long getRecordsProcessed() { + return recordsProcessed; + } + + public String getErrorMessage() { + return errorMessage; + } + + public boolean isSuccess() { + return errorMessage == null; + } + + public long getThroughput() { + if (durationMs == 0) { + return 0; + } + return TimeUnit.SECONDS.toMillis(recordsProcessed) / durationMs; + } + + @Override + public String toString() { + if (isSuccess()) { + return String.format("%s %s: %d ms, %d records, %d records/sec", + formatName, dataTypeName, durationMs, recordsProcessed, getThroughput()); + } else { + return String.format("%s %s: FAILED - %s", formatName, dataTypeName, errorMessage); + } + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/BenchmarkWorkerConfig.java b/core/src/main/java/kafka/automq/table/perf/BenchmarkWorkerConfig.java new file mode 100644 index 0000000000..a0e1529a7a --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/BenchmarkWorkerConfig.java @@ -0,0 +1,112 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import kafka.automq.table.worker.WorkerConfig; + +import org.apache.kafka.server.record.ErrorsTolerance; +import org.apache.kafka.server.record.TableTopicConvertType; +import org.apache.kafka.server.record.TableTopicSchemaType; +import org.apache.kafka.server.record.TableTopicTransformType; + +import java.util.Collections; +import java.util.List; + +class BenchmarkWorkerConfig extends WorkerConfig { + + public BenchmarkWorkerConfig() { + super(); + } + + @Override + public String namespace() { + return "test"; + } + + @Override + public TableTopicSchemaType schemaType() { + return TableTopicSchemaType.NONE; + } + + @Override + public TableTopicConvertType valueConvertType() { + return TableTopicConvertType.BY_SCHEMA_ID; + } + + @Override + public TableTopicConvertType keyConvertType() { + return TableTopicConvertType.STRING; + } + + @Override + public TableTopicTransformType transformType() { + return TableTopicTransformType.FLATTEN; + } + + @Override + public String valueSubject() { + return null; + } + + @Override + public String valueMessageFullName() { + return null; + } + + @Override + public String keySubject() { + return null; + } + + @Override + public String keyMessageFullName() { + return null; + } + + @Override + public List idColumns() { + return Collections.emptyList(); + } + + @Override + public String partitionByConfig() { + return null; + } + + @Override + public List partitionBy() { + return Collections.emptyList(); + } + + @Override + public boolean upsertEnable() { + return false; + } + + @Override + public ErrorsTolerance errorsTolerance() { + return ErrorsTolerance.ALL; + } + + @Override + public String cdcField() { + return null; + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/DataType.java b/core/src/main/java/kafka/automq/table/perf/DataType.java new file mode 100644 index 0000000000..a167b51e73 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/DataType.java @@ -0,0 +1,81 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import org.apache.commons.lang3.RandomStringUtils; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; + +public enum DataType { + BOOLEAN("boolean", () -> ThreadLocalRandom.current().nextBoolean()), + INT("int", () -> ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)), + LONG("long", () -> ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)), + DOUBLE("double", () -> ThreadLocalRandom.current().nextDouble(Long.MAX_VALUE)), + TIMESTAMP("timestamp", () -> ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)), + STRING("string", () -> RandomStringUtils.randomAlphabetic(32)), + BINARY("binary", () -> { + byte[] bytes = new byte[32]; + ThreadLocalRandom.current().nextBytes(bytes); + return ByteBuffer.wrap(bytes); + }), + NESTED("nested", null), + ARRAY("array", null); + + private final String name; + // Supplier holds runtime-only generators (often non-serializable lambdas). Enum + // instances are serialized by name, not by fields; mark this field transient to + // avoid accidental Java serialization of the supplier and silence static analyzers. + private final transient Supplier valueGenerator; + + DataType(String name, Supplier valueGenerator) { + this.name = name; + this.valueGenerator = valueGenerator; + } + + public String getName() { + return name; + } + + public Object generateValue() { + if (valueGenerator == null) { + throw new UnsupportedOperationException("Complex type " + name + " requires specific generator"); + } + return valueGenerator.get(); + } + + public static DataType fromString(String name) { + for (DataType type : values()) { + if (type.name.equals(name)) { + return type; + } + } + return null; + } + + public PerfTestCase createAvroTestCase(int fieldCount, int payloadCount, PerfConfig config) { + return new AvroTestCase(this, fieldCount, payloadCount, config); + } + + public PerfTestCase createProtobufTestCase(int fieldCount, int payloadCount, PerfConfig config) { + return new ProtobufTestCase(this, fieldCount, payloadCount, config); + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/Deserializers.java b/core/src/main/java/kafka/automq/table/perf/Deserializers.java new file mode 100644 index 0000000000..19c6276630 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/Deserializers.java @@ -0,0 +1,71 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +class StaticAvroDeserializer implements Deserializer { + final Schema schema; + final DatumReader reader; + final DecoderFactory decoderFactory = DecoderFactory.get(); + + public StaticAvroDeserializer(Schema schema) { + this.schema = schema; + this.reader = new GenericDatumReader<>(schema); + } + + public Object deserialize(String topic, byte[] data) { + try { + return this.reader.read(null, decoderFactory.binaryDecoder(data, 5, data.length - 5, null)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} + +class StaticProtobufDeserializer implements Deserializer { + final Descriptors.Descriptor descriptor; + + public StaticProtobufDeserializer(Descriptors.Descriptor descriptor) { + this.descriptor = descriptor; + } + + @Override + public Message deserialize(String s, byte[] bytes) { + try { + return DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(bytes, 5, bytes.length - 5)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/FieldsPerf.java b/core/src/main/java/kafka/automq/table/perf/FieldsPerf.java deleted file mode 100644 index 5bd060803e..0000000000 --- a/core/src/main/java/kafka/automq/table/perf/FieldsPerf.java +++ /dev/null @@ -1,854 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.automq.table.perf; - -import kafka.automq.table.deserializer.proto.schema.DynamicSchema; -import kafka.automq.table.deserializer.proto.schema.MessageDefinition; -import kafka.automq.table.process.Converter; -import kafka.automq.table.process.DefaultRecordProcessor; -import kafka.automq.table.process.RecordProcessor; -import kafka.automq.table.process.convert.AvroRegistryConverter; -import kafka.automq.table.process.convert.ProtobufRegistryConverter; -import kafka.automq.table.process.convert.RawConverter; -import kafka.automq.table.worker.IcebergTableManager; -import kafka.automq.table.worker.IcebergWriter; -import kafka.automq.table.worker.WorkerConfig; - -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.server.record.TableTopicSchemaType; - -import com.automq.stream.utils.Systems; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Descriptors; -import com.google.protobuf.DynamicMessage; -import com.google.protobuf.Message; - -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.inmemory.InMemoryCatalog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -public class FieldsPerf { - private static final Logger LOGGER = LoggerFactory.getLogger(FieldsPerf.class.getName()); - private static final Map IN_MEMORY_FILES; - private static final List TASKS; - - private static final List FORMAT_TYPES; - private static final long RECORDS_COUNT = Systems.getEnvLong("RECORDS_COUNT", 10000000); - private static final Map> TASK_SUPPLIERS = new HashMap<>(); - - static { - // Get static field of InMemoryFileIO by reflection - try { - Class clazz = Class.forName("org.apache.iceberg.inmemory.InMemoryFileIO"); - java.lang.reflect.Field field = clazz.getDeclaredField("IN_MEMORY_FILES"); - field.setAccessible(true); - //noinspection unchecked - IN_MEMORY_FILES = (Map) field.get(null); - } catch (Throwable e) { - throw new RuntimeException(e); - } - String tasksStr = System.getenv("TASKS"); - if (StringUtils.isBlank(tasksStr)) { - tasksStr = "boolean,int,long,double,timestamp,string,binary,nested,array"; - } - TASKS = List.of(tasksStr.split(",")); - - String formatTypesStr = System.getenv("FORMAT_TYPES"); - if (StringUtils.isBlank(formatTypesStr)) { - formatTypesStr = "proto,avro"; - } - FORMAT_TYPES = List.of(formatTypesStr.split(",")); - - // https://iceberg.apache.org/spec/#schemas-and-data-types - TASK_SUPPLIERS.put("boolean", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type().booleanType().noDefault(), - () -> ThreadLocalRandom.current().nextBoolean()); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> msgBuilder.addField( - "required", - "bool", - name, - number, - null - ), - (msgBuilder, name) -> { - msgBuilder.setField(msgBuilder.getDescriptorForType().findFieldByName(name), ThreadLocalRandom.current().nextBoolean()); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("int", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type().intType().noDefault(), - () -> ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> msgBuilder.addField( - "required", - "int32", - name, - number, - null - ), - (msgBuilder, name) -> { - msgBuilder.setField(msgBuilder.getDescriptorForType().findFieldByName(name), ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("long", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type().longType().noDefault(), - () -> ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> msgBuilder.addField( - "required", - "int64", - name, - number, - null - ), - (msgBuilder, name) -> { - msgBuilder.setField(msgBuilder.getDescriptorForType().findFieldByName(name), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("double", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type().doubleType().noDefault(), - () -> ThreadLocalRandom.current().nextDouble(Long.MAX_VALUE)); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> msgBuilder.addField( - "required", - "double", - name, - number, - null - ), - (msgBuilder, name) -> { - msgBuilder.setField(msgBuilder.getDescriptorForType().findFieldByName(name), ThreadLocalRandom.current().nextDouble(Long.MAX_VALUE)); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("timestamp", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type(LogicalTypes.timestampMillis() - .addToSchema( - Schema.create(Schema.Type.LONG) - ) - ).withDefault(0), - () -> ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> msgBuilder.addField( - "required", - "int64", - name, - number, - null - ), - (msgBuilder, name) -> { - msgBuilder.setField(msgBuilder.getDescriptorForType().findFieldByName(name), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("string", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type().stringType().noDefault(), - () -> RandomStringUtils.randomAlphabetic(32)); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> msgBuilder.addField( - "required", - "string", - name, - number, - null - ), - (msgBuilder, name) -> { - msgBuilder.setField(msgBuilder.getDescriptorForType().findFieldByName(name), RandomStringUtils.randomAlphabetic(32)); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("binary", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type().bytesType().noDefault(), - () -> { - byte[] bytes = new byte[32]; - ThreadLocalRandom.current().nextBytes(bytes); - return ByteBuffer.wrap(bytes); - }); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> msgBuilder.addField( - "required", - "bytes", - name, - number, - null - ), - (msgBuilder, name) -> { - byte[] bytes = new byte[32]; - ThreadLocalRandom.current().nextBytes(bytes); - msgBuilder.setField(msgBuilder.getDescriptorForType().findFieldByName(name), bytes); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("nested", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Schema nestedSchema = SchemaBuilder.builder().record("nested").fields().name("nf1").type().booleanType().noDefault().endRecord(); - Pair> perfArgs = perfAvroArgs( - s -> s.type(nestedSchema).noDefault(), - () -> { - GenericRecord record = new GenericData.Record(nestedSchema); - record.put("nf1", ThreadLocalRandom.current().nextBoolean()); - return record; - }); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> { - MessageDefinition nested = MessageDefinition.newBuilder("NestedType" + name) - .addField("required", "bool", "nf1", 1, null) - .build(); - msgBuilder.addMessageDefinition(nested); - msgBuilder.addField("required", "NestedType" + name, name, number, null); - }, - (msgBuilder, name) -> { - Descriptors.FieldDescriptor fieldByName = msgBuilder.getDescriptorForType().findFieldByName(name); - Descriptors.Descriptor nested = msgBuilder.getDescriptorForType().findNestedTypeByName("NestedType" + name); - DynamicMessage built = DynamicMessage.newBuilder(nested) - .setField(nested.findFieldByName("nf1"), ThreadLocalRandom.current().nextBoolean()).build(); - - msgBuilder.setField(fieldByName, built); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - TASK_SUPPLIERS.put("array", List.of( - new FormatTypeSupplier( - "avro", - () -> { - Pair> perfArgs = perfAvroArgs( - s -> s.type().array().items(Schema.create(Schema.Type.BOOLEAN)).noDefault(), - () -> List.of(ThreadLocalRandom.current().nextBoolean())); - return new TypeCost("avro", new AvroPerf(perfArgs.getLeft(), perfArgs.getRight()).run()); - } - ), - new FormatTypeSupplier( - "proto", - () -> { - Pair> result = perfProtobufArgs( - (msgBuilder, name, number) -> { - msgBuilder.addField("repeated", "bool", name, number, null); - }, - (msgBuilder, name) -> { - msgBuilder.addRepeatedField(msgBuilder.getDescriptorForType().findFieldByName(name), ThreadLocalRandom.current().nextBoolean()); - } - ); - return new TypeCost("proto", new ProtoBufPerf(result.getLeft(), result.getRight()).run()); - } - ) - )); - } - - public static void main(String[] args) { - Map>> typeCosts = Map.of( - "avro", new ArrayList<>(), - "proto", new ArrayList<>() - ); - - for (String task : TASKS) { - List formatTypeSuppliers = TASK_SUPPLIERS.get(task); - for (FormatTypeSupplier formatTypeSupplier : formatTypeSuppliers) { - - if (FORMAT_TYPES.contains(formatTypeSupplier.format)) { - TypeCost typeCost = formatTypeSupplier.supplier.get(); - typeCosts.get(typeCost.type).add(Pair.of(task, typeCost.cost())); - } - } - } - for (Map.Entry>> entry : typeCosts.entrySet()) { - LOGGER.info("type: {}", entry.getKey()); - LOGGER.info("task cost: {}", entry.getValue()); - } - } - - static Pair> perfAvroArgs( - Function, SchemaBuilder.FieldAssembler> fieldTypeBuilder, - Supplier valueSupplier - ) { - SchemaBuilder.FieldAssembler fa = SchemaBuilder.builder().record("test").fields(); - int fieldCountPerRecord = 32; - for (int i = 0; i < fieldCountPerRecord; i++) { - fa = fieldTypeBuilder.apply(fa.name("f" + i)); - } - org.apache.avro.Schema schema = fa.endRecord(); - - int payloadsCount = 1000; - List payloads = new ArrayList<>(payloadsCount); - for (int i = 0; i < payloadsCount; i++) { - GenericRecord avroRecord = new GenericData.Record(schema); - for (int j = 0; j < fieldCountPerRecord; j++) { - avroRecord.put("f" + j, valueSupplier.get()); - } - payloads.add(createAvroValue(avroRecord)); - } - return Pair.of(schema, payloads); - } - - @FunctionalInterface - public interface FieldTypeBuilder { - void apply(MessageDefinition.Builder msgDefBuilder, String fieldName, int fieldNumber); - } - - @FunctionalInterface - public interface FieldValueBuilder { - void apply(DynamicMessage.Builder msgDefBuilder, String fieldName); - } - - static Pair> perfProtobufArgs( - FieldTypeBuilder fieldTypeBuilder, - FieldValueBuilder fieldValueBuilder) { - int fieldCountPerRecord = 32; - int payloadsCount = 1000; - - DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder(); - schemaBuilder.setPackage("example"); - schemaBuilder.setName("DynamicTest.proto"); - - MessageDefinition.Builder msgDefBuilder = MessageDefinition.newBuilder("TestMessage"); - - for (int i = 0; i < fieldCountPerRecord; i++) { - String fieldName = "f" + i; - int fieldNumber = i + 1; - fieldTypeBuilder.apply(msgDefBuilder, fieldName, fieldNumber); - } - - schemaBuilder.addMessageDefinition(msgDefBuilder.build()); - DynamicSchema schema; - try { - schema = schemaBuilder.build(); - } catch (Exception e) { - throw new RuntimeException("Schema build failed", e); - } - - Descriptors.Descriptor descriptor = schema.getMessageDescriptor("TestMessage"); - - List payloads = new ArrayList<>(payloadsCount); - for (int i = 0; i < payloadsCount; i++) { - DynamicMessage.Builder msgBuilder = DynamicMessage.newBuilder(descriptor); - for (int j = 0; j < fieldCountPerRecord; j++) { - String fieldName = "f" + j; - fieldValueBuilder.apply(msgBuilder, fieldName); - } - payloads.add(createProtoBufValue(msgBuilder.build())); - } - - return Pair.of(descriptor, payloads); - } - - static class AvroPerf extends AbstractPerf { - public AvroPerf(Schema schema, List payloads) { - super(schema, payloads); - } - - @Override - kafka.automq.table.process.Converter getKafkaRecordConverter(Schema schema) { - StaticAvroDeserializer deserializer = new StaticAvroDeserializer(schema); - return new AvroRegistryConverter(deserializer, null); - } - } - - - static class ProtoBufPerf extends AbstractPerf { - public ProtoBufPerf(Descriptors.Descriptor descriptor, List payloads) { - super(descriptor, payloads); - } - - @Override - kafka.automq.table.process.Converter getKafkaRecordConverter(Descriptors.Descriptor descriptor) { - StaticProtobufDeserializer deserializer = new StaticProtobufDeserializer(descriptor); - return new ProtobufRegistryConverter(deserializer); - } - } - - - abstract static class AbstractPerf { - final T schema; - final List payloads; - long timeCost; - - - public AbstractPerf(T schema, List payloads) { - this.schema = schema; - this.payloads = payloads; - } - - public long run() { - try { - // warm up - run0(100000); - long start = System.nanoTime(); - run0(RECORDS_COUNT); - timeCost = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - return timeCost; - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - public long timeCost() { - return timeCost; - } - - private void run0(long count) throws IOException { - TableIdentifier tableId = TableIdentifier.parse("test.d1"); - WorkerConfig config = new IWorkerConfig(); - IcebergWriter writer = null; - int size = 0; - for (int i = 0; i < count; i++) { - if (writer == null) { - InMemoryCatalog catalog = new InMemoryCatalog(); - catalog.initialize("test", ImmutableMap.of()); - Converter recordConvert = getKafkaRecordConverter(schema); - RecordProcessor processor = new DefaultRecordProcessor("", RawConverter.INSTANCE, recordConvert); - writer = new IcebergWriter(new IcebergTableManager(catalog, tableId, config), processor, config); - writer.setOffset(0, i); - } - byte[] payload = payloads.get(i % payloads.size()); - size += payload.length; - - writer.write(0, new SimpleRecord(i, payload)); - - if (size > 32 * 1024 * 1024) { - size = 0; - writer.complete(); - IN_MEMORY_FILES.clear(); - writer = null; - } - } - writer.complete(); - } - - abstract kafka.automq.table.process.Converter getKafkaRecordConverter(T schema); - - } - - - static byte[] createAvroValue(GenericRecord record) { - try { - DatumWriter datumWriter = new GenericDatumWriter<>(record.getSchema()); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); - datumWriter.write(record, encoder); - encoder.flush(); - byte[] avroBytes = outputStream.toByteArray(); - - ByteBuf buf = Unpooled.buffer(1 + 4 + avroBytes.length); - buf.writeByte((byte) 0x0); - buf.writeInt(0); - buf.writeBytes(avroBytes); - - return buf.array(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - static byte[] createProtoBufValue(Message message) { - try { - byte[] protobufBytes = message.toByteArray(); - - ByteBuf buf = Unpooled.buffer(1 + 4 + protobufBytes.length); - buf.writeByte((byte) 0x0); - buf.writeInt(0); - buf.writeBytes(protobufBytes); - - return buf.array(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - static class StaticAvroDeserializer implements Deserializer { - final org.apache.avro.Schema schema; - final DatumReader reader; - final DecoderFactory decoderFactory = DecoderFactory.get(); - - public StaticAvroDeserializer(org.apache.avro.Schema schema) { - this.schema = schema; - this.reader = new GenericDatumReader<>(schema); - } - - public Object deserialize(String topic, byte[] data) { - try { - return this.reader.read(null, decoderFactory.binaryDecoder(data, 5, data.length - 5, null)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - static class StaticProtobufDeserializer implements Deserializer { - final Descriptors.Descriptor descriptor; - - public StaticProtobufDeserializer(Descriptors.Descriptor descriptor) { - this.descriptor = descriptor; - } - - @Override - public Message deserialize(String s, byte[] bytes) { - try { - return DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(bytes, 5, bytes.length - 5)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - static class IWorkerConfig extends WorkerConfig { - - public IWorkerConfig() { - super(); - } - - public String namespace() { - return "test"; - } - - public TableTopicSchemaType schemaType() { - return TableTopicSchemaType.SCHEMA; - } - - public List idColumns() { - return Collections.emptyList(); - } - - public String partitionByConfig() { - return null; - } - - public List partitionBy() { - return Collections.emptyList(); - } - - public boolean upsertEnable() { - return false; - } - - public String cdcField() { - return null; - } - } - - static class SimpleRecord implements org.apache.kafka.common.record.Record { - final long offset; - final byte[] value; - - public SimpleRecord(long offset, byte[] value) { - this.offset = offset; - this.value = value; - } - - @Override - public long offset() { - return offset; - } - - @Override - public int sequence() { - return 0; - } - - @Override - public int sizeInBytes() { - return value.length; - } - - @Override - public long timestamp() { - return 0; - } - - @Override - public void ensureValid() { - - } - - @Override - public int keySize() { - return 0; - } - - @Override - public boolean hasKey() { - return false; - } - - @Override - public ByteBuffer key() { - return null; - } - - @Override - public int valueSize() { - return 0; - } - - @Override - public boolean hasValue() { - return true; - } - - @Override - public ByteBuffer value() { - return ByteBuffer.wrap(value); - } - - @Override - public boolean hasMagic(byte b) { - return false; - } - - @Override - public boolean isCompressed() { - return false; - } - - @Override - public boolean hasTimestampType(TimestampType type) { - return false; - } - - @Override - public Header[] headers() { - return new Header[0]; - } - } - - static final class TypeCost { - private final String type; - private final long cost; - - TypeCost(String type, long cost) { - this.type = type; - this.cost = cost; - } - - public String type() { - return type; - } - - public long cost() { - return cost; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (obj == null || obj.getClass() != this.getClass()) - return false; - var that = (TypeCost) obj; - return Objects.equals(this.type, that.type) && - this.cost == that.cost; - } - - @Override - public int hashCode() { - return Objects.hash(type, cost); - } - - @Override - public String toString() { - return "TypeCost[" + - "type=" + type + ", " + - "cost=" + cost + ']'; - } - - } - - static final class FormatTypeSupplier { - private final String format; - private final Supplier supplier; - - FormatTypeSupplier(String format, Supplier supplier) { - this.format = format; - this.supplier = supplier; - } - - public String format() { - return format; - } - - public Supplier supplier() { - return supplier; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) - return true; - if (obj == null || obj.getClass() != this.getClass()) - return false; - var that = (FormatTypeSupplier) obj; - return Objects.equals(this.format, that.format) && - Objects.equals(this.supplier, that.supplier); - } - - @Override - public int hashCode() { - return Objects.hash(format, supplier); - } - - @Override - public String toString() { - return "FormatTypeSupplier[" + - "format=" + format + ", " + - "supplier=" + supplier + ']'; - } - - } - -} diff --git a/core/src/main/java/kafka/automq/table/perf/FieldsPerformanceTest.java b/core/src/main/java/kafka/automq/table/perf/FieldsPerformanceTest.java new file mode 100644 index 0000000000..57e82927f0 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/FieldsPerformanceTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FieldsPerformanceTest { + private static final Logger LOGGER = LoggerFactory.getLogger(FieldsPerformanceTest.class); + + public static void main(String[] args) { + PerfConfig config = new PerfConfig(); + + Map>> results = new HashMap<>(); + results.put("avro", new ArrayList<>()); + results.put("proto", new ArrayList<>()); + + LOGGER.info("Starting performance tests with {} records per test", config.getRecordsCount()); + LOGGER.info("Enabled data types: {}", config.getEnabledDataTypes()); + LOGGER.info("Enabled formats: {}", config.getEnabledFormats()); + + for (DataType dataType : config.getEnabledDataTypes()) { + for (SerializationFormat format : config.getEnabledFormats()) { + + PerfTestCase testCase = createTestCase(dataType, format, config); + + try { + PerfTestCase.clearInMemoryFiles(); + + LOGGER.info("Running benchmark: {} {}", format.getName(), dataType.getName()); + BenchmarkResult result = testCase.runBenchmark(config.getRecordsCount()); + + if (result.isSuccess()) { + // Unify metric to duration (ms), consistent with original FieldsPerf "task cost" + results.get(format.getName()).add(Pair.of(dataType.getName(), result.getDurationMs())); + LOGGER.info("Completed: {} {} - {} ms", + format.getName(), dataType.getName(), result.getDurationMs()); + } else { + LOGGER.error("Failed: {} {} - {}", + format.getName(), dataType.getName(), result.getErrorMessage()); + } + + } catch (Exception e) { + LOGGER.error("Failed: {} {} - {}", format.getName(), dataType.getName(), e.getMessage(), e); + } + } + } + + // Output results in the same format as original + results.forEach((format, formatResults) -> { + LOGGER.info("type: {}", format); + LOGGER.info("task cost: {}", formatResults); + }); + } + + private static PerfTestCase createTestCase(DataType dataType, SerializationFormat format, PerfConfig config) { + return switch (format) { + case AVRO -> dataType.createAvroTestCase(config.getFieldCountPerRecord(), config.getPayloadsCount(), config); + case PROTOBUF -> dataType.createProtobufTestCase(config.getFieldCountPerRecord(), config.getPayloadsCount(), config); + }; + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/PayloadManager.java b/core/src/main/java/kafka/automq/table/perf/PayloadManager.java new file mode 100644 index 0000000000..012054b019 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/PayloadManager.java @@ -0,0 +1,52 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import java.util.function.Supplier; + +public class PayloadManager { + private final byte[][] payloadPool; + private final int poolSize; + private int currentIndex = 0; + + public PayloadManager(Supplier payloadGenerator, int poolSize) { + this.poolSize = Math.min(poolSize, 10000); // Limit max pool size + this.payloadPool = new byte[this.poolSize][]; + + // Pre-generate all payloads + for (int i = 0; i < this.poolSize; i++) { + this.payloadPool[i] = payloadGenerator.get(); + } + } + + public byte[] nextPayload() { + byte[] payload = payloadPool[currentIndex]; + currentIndex = (currentIndex + 1) % poolSize; + return payload; + } + + public void reset() { + currentIndex = 0; + } + + public int getPoolSize() { + return poolSize; + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/PerfConfig.java b/core/src/main/java/kafka/automq/table/perf/PerfConfig.java new file mode 100644 index 0000000000..64f6c7708a --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/PerfConfig.java @@ -0,0 +1,103 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import com.automq.stream.utils.Systems; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class PerfConfig { + private final long recordsCount; + private final int fieldCountPerRecord; + private final int payloadsCount; + private final int batchSizeBytes; + private final Set enabledDataTypes; + private final Set enabledFormats; + + public PerfConfig() { + this.recordsCount = Systems.getEnvLong("RECORDS_COUNT", 10_000_000L); + this.fieldCountPerRecord = parseIntEnv("FIELD_COUNT_PER_RECORD", 32); + this.payloadsCount = parseIntEnv("PAYLOADS_COUNT", 1000); + this.batchSizeBytes = parseIntEnv("BATCH_SIZE_BYTES", 32 * 1024 * 1024); + this.enabledDataTypes = parseDataTypes(System.getenv("TASKS")); + this.enabledFormats = parseFormats(System.getenv("FORMAT_TYPES")); + } + + private int parseIntEnv(String envName, int defaultValue) { + String value = System.getenv(envName); + if (StringUtils.isBlank(value)) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } + + private Set parseDataTypes(String tasksStr) { + if (StringUtils.isBlank(tasksStr)) { + return EnumSet.allOf(DataType.class); + } + return Arrays.stream(tasksStr.split(",")) + .map(String::trim) + .map(String::toLowerCase) + .map(DataType::fromString) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + private Set parseFormats(String formatsStr) { + if (StringUtils.isBlank(formatsStr)) { + return EnumSet.of(SerializationFormat.AVRO); + } + return Arrays.stream(formatsStr.split(",")) + .map(String::trim) + .map(String::toLowerCase) + .map(SerializationFormat::fromString) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + public long getRecordsCount() { + return recordsCount; + } + public int getFieldCountPerRecord() { + return fieldCountPerRecord; + } + public int getPayloadsCount() { + return payloadsCount; + } + public int getBatchSizeBytes() { + return batchSizeBytes; + } + public Set getEnabledDataTypes() { + return enabledDataTypes; + } + public Set getEnabledFormats() { + return enabledFormats; + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/PerfTestCase.java b/core/src/main/java/kafka/automq/table/perf/PerfTestCase.java new file mode 100644 index 0000000000..cbe0c0d250 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/PerfTestCase.java @@ -0,0 +1,226 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import kafka.automq.table.process.DefaultRecordProcessor; +import kafka.automq.table.process.RecordProcessor; +import kafka.automq.table.process.convert.RawConverter; +import kafka.automq.table.process.transform.FlattenTransform; +import kafka.automq.table.worker.IcebergTableManager; +import kafka.automq.table.worker.IcebergWriter; +import kafka.automq.table.worker.WorkerConfig; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.record.TimestampType; + +import com.google.common.collect.ImmutableMap; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryCatalog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class PerfTestCase { + // Cache InMemoryFileIO files map to avoid repeated reflection overhead + private static final Map IN_MEMORY_FILES; + + static { + Map files; + try { + Class clazz = Class.forName("org.apache.iceberg.inmemory.InMemoryFileIO"); + java.lang.reflect.Field field = clazz.getDeclaredField("IN_MEMORY_FILES"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + Map f = (Map) field.get(null); + files = f; + } catch (Exception e) { + // Fallback to empty map; clear operation becomes a no-op + files = new java.util.HashMap<>(); + } + IN_MEMORY_FILES = files; + } + + protected final DataType dataType; + protected final String formatName; + protected final int fieldCount; + protected final int payloadCount; + protected final int batchSizeBytes; + + public PerfTestCase(DataType dataType, String formatName, int fieldCount, int payloadCount, PerfConfig config) { + this.dataType = dataType; + this.formatName = formatName; + this.fieldCount = fieldCount; + this.payloadCount = payloadCount; + this.batchSizeBytes = config.getBatchSizeBytes(); + } + + abstract PayloadManager getPayloadManager(); + + public BenchmarkResult runBenchmark(long recordsCount) { + try { + // Simple warmup + runTest(100_000); + + // Actual test + long startTime = System.nanoTime(); + runTest(recordsCount); + long endTime = System.nanoTime(); + + long durationMs = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); + return BenchmarkResult.success(formatName, dataType.getName(), durationMs, recordsCount); + } catch (Exception e) { + return BenchmarkResult.failure(formatName, dataType.getName(), e.getMessage()); + } + } + + private void runTest(long recordsCount) throws IOException { + TableIdentifier tableId = TableIdentifier.parse("test.benchmark"); + WorkerConfig workerConfig = new BenchmarkWorkerConfig(); + IcebergWriter writer = null; + int currentBatchSize = 0; + final int batchSizeLimit = this.batchSizeBytes; + + for (long i = 0; i < recordsCount; i++) { + if (writer == null) { + InMemoryCatalog catalog = new InMemoryCatalog(); + catalog.initialize("test", ImmutableMap.of()); + RecordProcessor processor = new DefaultRecordProcessor("", RawConverter.INSTANCE, createConverter(), List.of(FlattenTransform.INSTANCE)); + writer = new IcebergWriter(new IcebergTableManager(catalog, tableId, workerConfig), processor, workerConfig); + writer.setOffset(0, i); + } + + byte[] payload = getPayloadManager().nextPayload(); + currentBatchSize += payload.length; + + writer.write(0, new SimpleRecord(i, payload)); + + if (currentBatchSize > batchSizeLimit) { + writer.complete(); + clearInMemoryFiles(); + writer = null; + currentBatchSize = 0; + } + } + + if (writer != null) { + writer.complete(); + } + } + + public static void clearInMemoryFiles() { + try { + IN_MEMORY_FILES.clear(); + } catch (Exception ignored) { + // Ignore cleanup failures + } + } + + protected abstract byte[] generatePayload(); + protected abstract kafka.automq.table.process.Converter createConverter(); + + static class SimpleRecord implements org.apache.kafka.common.record.Record { + final long offset; + final byte[] value; + + public SimpleRecord(long offset, byte[] value) { + this.offset = offset; + this.value = value; + } + + @Override + public long offset() { + return offset; + } + + @Override + public int sequence() { + return 0; + } + + @Override + public int sizeInBytes() { + return value.length; + } + + @Override + public long timestamp() { + return 0; + } + + @Override + public void ensureValid() { + + } + + @Override + public int keySize() { + return 0; + } + + @Override + public boolean hasKey() { + return false; + } + + @Override + public ByteBuffer key() { + return null; + } + + @Override + public int valueSize() { + return 0; + } + + @Override + public boolean hasValue() { + return true; + } + + @Override + public ByteBuffer value() { + return ByteBuffer.wrap(value); + } + + @Override + public boolean hasMagic(byte b) { + return false; + } + + @Override + public boolean isCompressed() { + return false; + } + + @Override + public boolean hasTimestampType(TimestampType type) { + return false; + } + + @Override + public Header[] headers() { + return new Header[0]; + } + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/ProtobufTestCase.java b/core/src/main/java/kafka/automq/table/perf/ProtobufTestCase.java new file mode 100644 index 0000000000..d8b685f39d --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/ProtobufTestCase.java @@ -0,0 +1,152 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +import kafka.automq.table.deserializer.proto.schema.DynamicSchema; +import kafka.automq.table.deserializer.proto.schema.MessageDefinition; +import kafka.automq.table.process.Converter; +import kafka.automq.table.process.convert.ProtobufRegistryConverter; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +class ProtobufTestCase extends PerfTestCase { + private final Descriptors.Descriptor descriptor; + private final PayloadManager payloadManager; + + public ProtobufTestCase(DataType dataType, int fieldCount, int payloadCount, PerfConfig config) { + super(dataType, "proto", fieldCount, payloadCount, config); + this.descriptor = createDescriptor(dataType, fieldCount); + this.payloadManager = new PayloadManager(() -> generatePayloadWithDescriptor(descriptor), payloadCount); + } + + @Override + PayloadManager getPayloadManager() { + return payloadManager; + } + + @Override + protected byte[] generatePayload() { + return generatePayloadWithDescriptor(descriptor); + } + + private byte[] generatePayloadWithDescriptor(Descriptors.Descriptor descriptorToUse) { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptorToUse); + fillBuilder(builder, dataType, fieldCount); + return createProtoBufValue(builder.build()); + } + + @Override + protected Converter createConverter() { + StaticProtobufDeserializer deserializer = new StaticProtobufDeserializer(descriptor); + return new ProtobufRegistryConverter(deserializer); + } + + private Descriptors.Descriptor createDescriptor(DataType dataType, int fieldCount) { + DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder(); + schemaBuilder.setPackage("example"); + schemaBuilder.setName("DynamicTest.proto"); + + MessageDefinition.Builder msgDefBuilder = MessageDefinition.newBuilder("TestMessage"); + + for (int i = 0; i < fieldCount; i++) { + String fieldName = "f" + i; + int fieldNumber = i + 1; + addFieldToMessage(msgDefBuilder, fieldName, fieldNumber, dataType); + } + + schemaBuilder.addMessageDefinition(msgDefBuilder.build()); + + try { + DynamicSchema schema = schemaBuilder.build(); + return schema.getMessageDescriptor("TestMessage"); + } catch (Exception e) { + throw new RuntimeException("Schema build failed", e); + } + } + + private void addFieldToMessage(MessageDefinition.Builder msgDefBuilder, String fieldName, int fieldNumber, DataType dataType) { + switch (dataType) { + case BOOLEAN -> msgDefBuilder.addField("required", "bool", fieldName, fieldNumber, null); + case INT -> msgDefBuilder.addField("required", "int32", fieldName, fieldNumber, null); + case LONG, TIMESTAMP -> msgDefBuilder.addField("required", "int64", fieldName, fieldNumber, null); + case DOUBLE -> msgDefBuilder.addField("required", "double", fieldName, fieldNumber, null); + case STRING -> msgDefBuilder.addField("required", "string", fieldName, fieldNumber, null); + case BINARY -> msgDefBuilder.addField("required", "bytes", fieldName, fieldNumber, null); + case NESTED -> { + MessageDefinition nested = MessageDefinition.newBuilder("NestedType" + fieldName) + .addField("required", "bool", "nf1", 1, null) + .build(); + msgDefBuilder.addMessageDefinition(nested); + msgDefBuilder.addField("required", "NestedType" + fieldName, fieldName, fieldNumber, null); + } + case ARRAY -> msgDefBuilder.addField("repeated", "bool", fieldName, fieldNumber, null); + } + } + + private void fillBuilder(DynamicMessage.Builder builder, DataType dataType, int fieldCount) { + for (int i = 0; i < fieldCount; i++) { + String fieldName = "f" + i; + setFieldValue(builder, fieldName, dataType); + } + } + + private void setFieldValue(DynamicMessage.Builder builder, String fieldName, DataType dataType) { + Descriptors.FieldDescriptor field = builder.getDescriptorForType().findFieldByName(fieldName); + + switch (dataType) { + case BOOLEAN, INT, LONG, DOUBLE, TIMESTAMP, STRING -> + builder.setField(field, dataType.generateValue()); + case BINARY -> { + byte[] bytes = new byte[32]; + java.util.concurrent.ThreadLocalRandom.current().nextBytes(bytes); + builder.setField(field, bytes); + } + case NESTED -> { + Descriptors.Descriptor nestedDescriptor = builder.getDescriptorForType().findNestedTypeByName("NestedType" + fieldName); + DynamicMessage nestedMessage = DynamicMessage.newBuilder(nestedDescriptor) + .setField(nestedDescriptor.findFieldByName("nf1"), DataType.BOOLEAN.generateValue()) + .build(); + builder.setField(field, nestedMessage); + } + case ARRAY -> + builder.addRepeatedField(field, DataType.BOOLEAN.generateValue()); + } + } + + private static byte[] createProtoBufValue(Message message) { + try { + byte[] protobufBytes = message.toByteArray(); + + ByteBuf buf = Unpooled.buffer(1 + 4 + protobufBytes.length); + buf.writeByte((byte) 0x0); + buf.writeInt(0); + buf.writeBytes(protobufBytes); + + return buf.array(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/src/main/java/kafka/automq/table/perf/SerializationFormat.java b/core/src/main/java/kafka/automq/table/perf/SerializationFormat.java new file mode 100644 index 0000000000..f36268eb6d --- /dev/null +++ b/core/src/main/java/kafka/automq/table/perf/SerializationFormat.java @@ -0,0 +1,44 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.perf; + +public enum SerializationFormat { + AVRO("avro"), + PROTOBUF("proto"); + + private final String name; + + SerializationFormat(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static SerializationFormat fromString(String name) { + for (SerializationFormat format : values()) { + if (format.name.equals(name)) { + return format; + } + } + return null; + } +}