Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,43 @@

package org.apache.nifi.avro;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.io.BinaryEncoder;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Tags({"avro", "result", "set", "writer", "serializer", "record", "recordset", "row"})
@CapabilityDescription("Writes the contents of a RecordSet in Binary Avro format.")
public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
Expand All @@ -59,7 +68,7 @@ private enum CodecType {
LZO
}

private static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
private static final PropertyDescriptor COMPRESSION_FORMAT = new Builder()
.name("compression-format")
.displayName("Compression Format")
.description("Compression type to use when writing Avro files. Default is None.")
Expand All @@ -68,16 +77,42 @@ private enum CodecType {
.required(true)
.build();

static final PropertyDescriptor ENCODER_POOL_SIZE = new Builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this configuration is performance related and depends on environment, I'd suggest supporting variable registry EL.

.name("encoder-pool-size")
.displayName("Encoder Pool Size")
.description("Avro Writers require the use of an Encoder. Creation of Encoders is expensive, but once created, they can be reused. This property controls the maximum number of Encoders that" +
" can be pooled and reused. Setting this value too small can result in degraded performance, but setting it higher can result in more heap being used. This property is ignored if the" +
" Avro Writer is configured with a Schema Write Strategy of 'Embed Avro Schema'.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("32")
.build();

private final Map<String, Schema> compiledAvroSchemaCache = new LinkedHashMap<String, Schema>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<String, Schema> eldest) {
return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE;
}
};
private volatile BlockingQueue<BinaryEncoder> encoderPool;

static final AllowableValue AVRO_EMBEDDED = new AllowableValue("avro-embedded", "Embed Avro Schema",
"The FlowFile will have the Avro schema embedded into the content, as is typical with Avro");

@OnEnabled
public void createEncoderPool(final ConfigurationContext context) {
final int capacity = context.getProperty(ENCODER_POOL_SIZE).evaluateAttributeExpressions().asInteger();
encoderPool = new LinkedBlockingQueue<>(capacity);
}

@OnDisabled
public void cleanup() {
if (encoderPool != null) {
encoderPool.clear();
}
}

@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException {
final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
Expand All @@ -103,7 +138,7 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem
if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat));
} else {
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out);
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger());
}
} catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);
Expand Down Expand Up @@ -155,6 +190,7 @@ private CodecFactory getCodecFactory(String property) {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(COMPRESSION_FORMAT);
properties.add(ENCODER_POOL_SIZE);
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,51 @@

package org.apache.nifi.avro;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
private final SchemaAccessWriter schemaAccessWriter;
private final RecordSchema recordSchema;
private final Schema avroSchema;
private final BinaryEncoder encoder;
private final OutputStream buffered;
private final DatumWriter<GenericRecord> datumWriter;
private final BlockingQueue<BinaryEncoder> recycleQueue;

public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema,
final SchemaAccessWriter schemaAccessWriter, final OutputStream out) throws IOException {
public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter,
final OutputStream out, final BlockingQueue<BinaryEncoder> recycleQueue, final ComponentLog logger) {
super(out);
this.recordSchema = recordSchema;
this.schemaAccessWriter = schemaAccessWriter;
this.avroSchema = avroSchema;
this.buffered = new BufferedOutputStream(out);
this.recycleQueue = recycleQueue;

BinaryEncoder reusableEncoder = recycleQueue.poll();
if (reusableEncoder == null) {
logger.debug("Was not able to obtain a BinaryEncoder from reuse pool. This is normal for the first X number of iterations (where X is equal to the max size of the pool), " +
"but if this continues, it indicates that increasing the size of the pool will likely yield better performance for this Avro Writer.");
}

encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, reusableEncoder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should add a debug log here to provide information whether current number of pool size fits the actual usage. If there are more null reusableEncorder and user want to improve performance, then they can increase pool size ... etc.


datumWriter = new GenericDatumWriter<>(avroSchema);
encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
}

@Override
Expand Down Expand Up @@ -88,4 +99,13 @@ public void flush() throws IOException {
public String getMimeType() {
return "application/avro-binary";
}

@Override
public void close() throws IOException {
if (encoder != null) {
recycleQueue.offer(encoder);
}

super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,30 @@

package org.apache.nifi.avro;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
Expand All @@ -43,22 +56,9 @@
import java.util.Objects;
import java.util.TimeZone;

import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public abstract class TestWriteAvroResult {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,49 @@

package org.apache.nifi.avro;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.util.MockComponentLog;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {

private final BlockingQueue<BinaryEncoder> encoderPool = new LinkedBlockingQueue<>(32);

@Override
protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException {
return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out);
return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out, encoderPool,
new MockComponentLog("id", new Object()));
}

@Override
Expand All @@ -55,4 +78,30 @@ protected void verify(final WriteResult writeResult) {
new Schema.Parser().parse(schemaText);
}


@Test
@Ignore("This test takes many seconds to run and is only really useful for comparing performance of the writer before and after changes, so it is @Ignored, but left in place to be run manually " +
"for performance comparisons before & after changes are made.")
public void testPerf() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);

final OutputStream out = new NullOutputStream();

final Record record = new MapRecord(recordSchema, Collections.singletonMap("name", "John Doe"));
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);

final ComponentLog logger = new MockComponentLog("id", new Object());

final long start = System.nanoTime();
for (int i=0; i < 10_000_000; i++) {
try (final RecordSetWriter writer = new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, new NopSchemaAccessWriter(), out, encoderPool, logger)) {
writer.write(RecordSet.of(record.getSchema(), record));
}
}

final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
System.out.println(millis);
}
}