From 0e69e6cacebefc2e6c49a0af436f9a6ac515de06 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 8 Nov 2018 14:54:05 -0500 Subject: [PATCH 1/2] NIFI-5805: Pool the BinaryEncoders used by the WriteAvroResultWithExternalSchema writer. Unfortunately, the writer that embeds schemas does not allow for this optimization due to the Avro API --- .../apache/nifi/avro/AvroRecordSetWriter.java | 59 +++++++++++++++---- .../WriteAvroResultWithExternalSchema.java | 28 ++++++--- .../apache/nifi/avro/TestWriteAvroResult.java | 40 ++++++------- .../TestWriteAvroResultWithoutSchema.java | 56 ++++++++++++++++-- 4 files changed, 137 insertions(+), 46 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 7e498414d44f..2d021d48caa9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -17,27 +17,22 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; +import org.apache.avro.io.BinaryEncoder; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; @@ -45,6 +40,19 @@ import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + @Tags({"avro", "result", "set", "writer", "serializer", "record", "recordset", "row"}) @CapabilityDescription("Writes the contents of a RecordSet in Binary Avro format.") public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory { @@ -59,7 +67,7 @@ private enum CodecType { LZO } - private static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() + private static final PropertyDescriptor COMPRESSION_FORMAT = new Builder() .name("compression-format") .displayName("Compression Format") .description("Compression type to use when writing Avro files. Default is None.") @@ -68,16 +76,40 @@ private enum CodecType { .required(true) .build(); + static final PropertyDescriptor ENCODER_POOL_SIZE = new Builder() + .name("encoder-pool-size") + .displayName("Encoder Pool Size") + .description("Avro Writers require the use of an Encoder. Creation of Encoders is expensive, but once created, they can be reused. This property controls the maximum number of Encoders that" + + " can be pooled and reused. Setting this value too small can result in degraded performance, but setting it higher can result in more heap being used.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("32") + .build(); + private final Map compiledAvroSchemaCache = new LinkedHashMap() { @Override protected boolean removeEldestEntry(final Map.Entry eldest) { return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE; } }; + private volatile BlockingQueue encoderPool; static final AllowableValue AVRO_EMBEDDED = new AllowableValue("avro-embedded", "Embed Avro Schema", "The FlowFile will have the Avro schema embedded into the content, as is typical with Avro"); + @OnEnabled + public void createEncoderPool(final ConfigurationContext context) { + final int capacity = context.getProperty(ENCODER_POOL_SIZE).asInteger(); + encoderPool = new LinkedBlockingQueue<>(capacity); + } + + @OnDisabled + public void cleanup() { + if (encoderPool != null) { + encoderPool.clear(); + } + } + @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException { final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); @@ -103,7 +135,7 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem if (AVRO_EMBEDDED.getValue().equals(strategyValue)) { return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat)); } else { - return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out); + return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool); } } catch (final SchemaNotFoundException e) { throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e); @@ -155,6 +187,7 @@ private CodecFactory getCodecFactory(String property) { protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(COMPRESSION_FORMAT); + properties.add(ENCODER_POOL_SIZE); return properties; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index 8464e4521214..2507fc5ac842 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -17,11 +17,6 @@ package org.apache.nifi.avro; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -33,6 +28,12 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { private final SchemaAccessWriter schemaAccessWriter; private final RecordSchema recordSchema; @@ -40,17 +41,21 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { private final BinaryEncoder encoder; private final OutputStream buffered; private final DatumWriter datumWriter; + private final BlockingQueue recycleQueue; public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, - final SchemaAccessWriter schemaAccessWriter, final OutputStream out) throws IOException { + final SchemaAccessWriter schemaAccessWriter, final OutputStream out, final BlockingQueue recycleQueue) { super(out); this.recordSchema = recordSchema; this.schemaAccessWriter = schemaAccessWriter; this.avroSchema = avroSchema; this.buffered = new BufferedOutputStream(out); + this.recycleQueue = recycleQueue; + + BinaryEncoder reusableEncoder = recycleQueue.poll(); + encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, reusableEncoder); datumWriter = new GenericDatumWriter<>(avroSchema); - encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null); } @Override @@ -88,4 +93,13 @@ public void flush() throws IOException { public String getMimeType() { return "application/avro-binary"; } + + @Override + public void close() throws IOException { + if (encoder != null) { + recycleQueue.offer(encoder); + } + + super.close(); + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java index 8c20ba7d2131..4751f7473e6c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java @@ -17,17 +17,30 @@ package org.apache.nifi.avro; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.Array; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.math.BigDecimal; import java.io.OutputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; @@ -43,22 +56,9 @@ import java.util.Objects; import java.util.TimeZone; -import org.apache.avro.Conversions; -import org.apache.avro.LogicalType; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData.Array; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.WriteResult; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public abstract class TestWriteAvroResult { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java index 33c0857e6941..14bd2c1260fb 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java @@ -17,26 +17,46 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; +import org.apache.nifi.schema.access.NopSchemaAccessWriter; import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy; import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.NullOutputStream; import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult { + private final BlockingQueue encoderPool = new LinkedBlockingQueue<>(32); + @Override protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { - return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out); + return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out, encoderPool); } @Override @@ -55,4 +75,28 @@ protected void verify(final WriteResult writeResult) { new Schema.Parser().parse(schemaText); } + + @Test + @Ignore("This test takes many seconds to run and is only really useful for comparing performance of the writer before and after changes, so it is @Ignored, but left in place to be run manually " + + "for performance comparisons before & after changes are made.") + public void testPerf() throws IOException { + final List fields = new ArrayList<>(); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema recordSchema = new SimpleRecordSchema(fields); + + final OutputStream out = new NullOutputStream(); + + final Record record = new MapRecord(recordSchema, Collections.singletonMap("name", "John Doe")); + final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); + + final long start = System.nanoTime(); + for (int i=0; i < 10_000_000; i++) { + try (final RecordSetWriter writer = new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, new NopSchemaAccessWriter(), out, encoderPool)) { + writer.write(RecordSet.of(record.getSchema(), record)); + } + } + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + System.out.println(millis); + } } From 3573036182fc6b3ac2d5177d571eb4f0eb10120e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 9 Nov 2018 09:36:36 -0500 Subject: [PATCH 2/2] NIFI-5805: Addressed PR review feedback --- .../java/org/apache/nifi/avro/AvroRecordSetWriter.java | 9 ++++++--- .../nifi/avro/WriteAvroResultWithExternalSchema.java | 10 ++++++++-- .../nifi/avro/TestWriteAvroResultWithoutSchema.java | 9 +++++++-- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 2d021d48caa9..0e70d96068a3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @@ -80,9 +81,11 @@ private enum CodecType { .name("encoder-pool-size") .displayName("Encoder Pool Size") .description("Avro Writers require the use of an Encoder. Creation of Encoders is expensive, but once created, they can be reused. This property controls the maximum number of Encoders that" + - " can be pooled and reused. Setting this value too small can result in degraded performance, but setting it higher can result in more heap being used.") + " can be pooled and reused. Setting this value too small can result in degraded performance, but setting it higher can result in more heap being used. This property is ignored if the" + + " Avro Writer is configured with a Schema Write Strategy of 'Embed Avro Schema'.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("32") .build(); @@ -99,7 +102,7 @@ protected boolean removeEldestEntry(final Map.Entry eldest) { @OnEnabled public void createEncoderPool(final ConfigurationContext context) { - final int capacity = context.getProperty(ENCODER_POOL_SIZE).asInteger(); + final int capacity = context.getProperty(ENCODER_POOL_SIZE).evaluateAttributeExpressions().asInteger(); encoderPool = new LinkedBlockingQueue<>(capacity); } @@ -135,7 +138,7 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem if (AVRO_EMBEDDED.getValue().equals(strategyValue)) { return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat)); } else { - return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool); + return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger()); } } catch (final SchemaNotFoundException e) { throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index 2507fc5ac842..a50efa01f833 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -23,6 +23,7 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.record.Record; @@ -43,8 +44,8 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { private final DatumWriter datumWriter; private final BlockingQueue recycleQueue; - public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, - final SchemaAccessWriter schemaAccessWriter, final OutputStream out, final BlockingQueue recycleQueue) { + public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter, + final OutputStream out, final BlockingQueue recycleQueue, final ComponentLog logger) { super(out); this.recordSchema = recordSchema; this.schemaAccessWriter = schemaAccessWriter; @@ -53,6 +54,11 @@ public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSc this.recycleQueue = recycleQueue; BinaryEncoder reusableEncoder = recycleQueue.poll(); + if (reusableEncoder == null) { + logger.debug("Was not able to obtain a BinaryEncoder from reuse pool. This is normal for the first X number of iterations (where X is equal to the max size of the pool), " + + "but if this continues, it indicates that increasing the size of the pool will likely yield better performance for this Avro Writer."); + } + encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, reusableEncoder); datumWriter = new GenericDatumWriter<>(avroSchema); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java index 14bd2c1260fb..c592df0921e8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java @@ -23,6 +23,7 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.NopSchemaAccessWriter; import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy; import org.apache.nifi.serialization.RecordSetWriter; @@ -35,6 +36,7 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.util.MockComponentLog; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -56,7 +58,8 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult { @Override protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { - return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out, encoderPool); + return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out, encoderPool, + new MockComponentLog("id", new Object())); } @Override @@ -89,9 +92,11 @@ public void testPerf() throws IOException { final Record record = new MapRecord(recordSchema, Collections.singletonMap("name", "John Doe")); final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); + final ComponentLog logger = new MockComponentLog("id", new Object()); + final long start = System.nanoTime(); for (int i=0; i < 10_000_000; i++) { - try (final RecordSetWriter writer = new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, new NopSchemaAccessWriter(), out, encoderPool)) { + try (final RecordSetWriter writer = new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, new NopSchemaAccessWriter(), out, encoderPool, logger)) { writer.write(RecordSet.of(record.getSchema(), record)); } }