From 75dce9c1ed1018cc14562478fd83d727501700d3 Mon Sep 17 00:00:00 2001 From: Joe Date: Wed, 9 Sep 2015 10:40:15 +0200 Subject: [PATCH 1/2] New property (wrap as array) in avro2json converter Create a new property (wrap as array) in ConvertAvroToJson, which determines how stream of records is exposed: either as a sequence of single Objects (false), writing every Object to a new line, or as an array of Objects. Default value is true, meaning that the Avro content is exposed as a sequence of root-level Object entries. False value is useful, when you want to write your records as single intances to a target component (e.g. Kafka). Let's assume you have an Avro content as stream of records (record1, record2, ...). If wrap as array is false, the converter will expose the records as sequence of single JSON objects: record1 record2 ... recordN Please bear in mind, that the final output is not a valid JSON content. You can then forward this content e.g. to Kafka, where every record will be a single Kafka message. If wrap as array is true, the output looks like this: [record1,record2,...,recordN] It is useful when you want to convert your Avro content to a valid JSON array. --- .../nifi-avro-processors/pom.xml | 4 +++ .../processors/avro/ConvertAvroToJSON.java | 36 +++++++++++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml index 989f762b1b98..fa9a59eb43ca 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/pom.xml @@ -54,6 +54,10 @@ junit test + + com.google.guava + guava + diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index 8832a7327df3..b7b23fa06036 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -23,8 +23,11 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.HashSet; +import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -34,6 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; @@ -42,6 +46,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; @SideEffectFree @SupportsBatching @@ -53,6 +58,16 @@ @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") public class ConvertAvroToJSON extends AbstractProcessor { + @VisibleForTesting + static final PropertyDescriptor WRAP_AS_ARRAY + = new PropertyDescriptor.Builder() + .name("Expose stream of records as array of JSON Objects") + .description("Determines how stream of records is exposed: either as a sequence of single Objects (false), writing every Object to a new line, or as an array of Objects (true). Default value is true, meaning that the Avro content is exposed as a sequence of root-level Object entries.") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue(String.valueOf("true")) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after it has been converted to JSON") @@ -62,6 +77,15 @@ public class ConvertAvroToJSON extends AbstractProcessor { .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") .build(); + private static final List PROPERTIES + = ImmutableList.builder() + .add(WRAP_AS_ARRAY) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } @Override public Set getRelationships() { final Set rels = new HashSet<>(); @@ -77,11 +101,14 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro return; } + final boolean wrapAsArray = context.getProperty(WRAP_AS_ARRAY).asBoolean(); + try { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn); + final OutputStream out = new BufferedOutputStream(rawOut); final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { @@ -90,7 +117,7 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I final String json = genericData.toString(record); int recordCount = 0; - if (reader.hasNext()) { + if (reader.hasNext() && wrapAsArray) { out.write('['); } @@ -98,13 +125,16 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I recordCount++; while (reader.hasNext()) { - out.write(','); + if (wrapAsArray) { + out.write(','); + } + final GenericRecord nextRecord = reader.next(record); out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8)); recordCount++; } - if (recordCount > 1) { + if (recordCount > 1 && wrapAsArray) { out.write(']'); } } From 96d6c1cc13e15c2467eb0216668eb5e18a7d48af Mon Sep 17 00:00:00 2001 From: Joe Date: Thu, 10 Sep 2015 13:53:25 +0200 Subject: [PATCH 2/2] NIFI 945: Test new property in avro2json converter --- .../processors/avro/ConvertAvroToJSON.java | 2 ++ .../avro/TestConvertAvroToJSON.java | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index b7b23fa06036..4d6a35d52290 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -127,6 +127,8 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I while (reader.hasNext()) { if (wrapAsArray) { out.write(','); + } else { + out.write(System.lineSeparator().getBytes()); } final GenericRecord nextRecord = reader.next(record); diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 2d84202bcb54..8c3c778aec6e 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -99,4 +99,31 @@ private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final Dat return out; } + @Test + public void testMultipleAvroMessagesNoWrapArray() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + runner.setProperty(ConvertAvroToJSON.WRAP_AS_ARRAY, "false"); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "George"); + user2.put("favorite_number", 1024); + user2.put("favorite_color", "red"); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}"); + } + }