From 8821a46c00be7cff788a56da90bd026cd88946cb Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 3 Nov 2016 09:24:33 -0400 Subject: [PATCH 1/3] NIFI-969 Added support for standard JSON polish --- .../processors/avro/ConvertAvroToJSON.java | 163 ++++++++++-------- .../avro/TestConvertAvroToJSON.java | 59 +++++++ 2 files changed, 148 insertions(+), 74 deletions(-) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index 2ddf66e4d1fa..e91159cfd1ef 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -18,6 +18,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -32,10 +33,14 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.JsonEncoder; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -43,13 +48,13 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; @@ -92,6 +97,14 @@ public class ConvertAvroToJSON extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) .build(); + static final PropertyDescriptor USE_STANDARD_JSON = new PropertyDescriptor.Builder() + .name("Use Standard JSON") + .description("Determines if the resulting JSON output is in AVRO-JSON format or standard JSON format. Default 'false'") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .required(true) + .defaultValue("false") + .allowableValues("false", "true") + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -102,31 +115,50 @@ public class ConvertAvroToJSON extends AbstractProcessor { .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") .build(); - private List properties; - private volatile Schema schema = null; + private final static List PROPERTIES; - @Override - protected void init(ProcessorInitializationContext context) { - super.init(context); + private final static Set RELATIONSHIPS; - final List properties = new ArrayList<>(); + static { + List properties = new ArrayList<>(); properties.add(CONTAINER_OPTIONS); properties.add(WRAP_SINGLE_RECORD); properties.add(SCHEMA); - this.properties = Collections.unmodifiableList(properties); + properties.add(USE_STANDARD_JSON); + PROPERTIES = Collections.unmodifiableList(properties); + + Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(relationships); + } + + private volatile Schema schema; + + private volatile boolean useContainer; + + private volatile boolean wrapSingleRecord; + + private volatile boolean useStandardJson; + + @OnScheduled + public void schedule(ProcessContext context) { + String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue(); + this.useContainer = containerOption.equals(CONTAINER_ARRAY); + this.wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; + String stringSchema = context.getProperty(SCHEMA).getValue(); + this.schema = stringSchema == null ? null : new Schema.Parser().parse(stringSchema); + this.useStandardJson = context.getProperty(USE_STANDARD_JSON).asBoolean(); } @Override protected List getSupportedPropertyDescriptors() { - return properties; + return PROPERTIES; } @Override public Set getRelationships() { - final Set rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_FAILURE); - return rels; + return RELATIONSHIPS; } @Override @@ -136,81 +168,54 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro return; } - final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue(); - final boolean useContainer = containerOption.equals(CONTAINER_ARRAY); - // Wrap a single record (inclusive of no records) only when a container is being used - final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; - - final String stringSchema = context.getProperty(SCHEMA).getValue(); - final boolean schemaLess = stringSchema != null; - try { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { final GenericData genericData = GenericData.get(); - if (schemaLess) { - if (schema == null) { - schema = new Schema.Parser().parse(stringSchema); - } - try (final InputStream in = new BufferedInputStream(rawIn); - final OutputStream out = new BufferedOutputStream(rawOut)) { - final DatumReader reader = new GenericDatumReader(schema); - final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); - final GenericRecord record = reader.read(null, decoder); - - // Schemaless records are singletons, so both useContainer and wrapSingleRecord - // need to be true before we wrap it with an array + try (OutputStream out = new BufferedOutputStream(rawOut); InputStream in = new BufferedInputStream(rawIn)) { + DatumReader reader = new GenericDatumReader(schema); + if (schema != null) { + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); + GenericRecord currRecord = reader.read(null, decoder); if (useContainer && wrapSingleRecord) { out.write('['); } - - final byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : genericData.toString(record).getBytes(StandardCharsets.UTF_8); + byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT + : (useStandardJson ? toStandardJSON(schema, currRecord) : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); out.write(outputBytes); - if (useContainer && wrapSingleRecord) { out.write(']'); } - } - } else { - try (final InputStream in = new BufferedInputStream(rawIn); - final OutputStream out = new BufferedOutputStream(rawOut); - final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { - - int recordCount = 0; - GenericRecord currRecord = null; - if (reader.hasNext()) { - currRecord = reader.next(); - recordCount++; - } - - // Open container if desired output is an array format and there are are multiple records or - // if configured to wrap single record - if (reader.hasNext() && useContainer || wrapSingleRecord) { - out.write('['); - } - - // Determine the initial output record, inclusive if we should have an empty set of Avro records - final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8); - out.write(outputBytes); - - while (reader.hasNext()) { - if (useContainer) { - out.write(','); - } else { - out.write('\n'); + } else { + try (DataFileStream stream = new DataFileStream<>(in, reader)) { + int recordCount = 0; + GenericRecord currRecord = null; + if (stream.hasNext()) { + currRecord = stream.next(); + recordCount++; + } + if (stream.hasNext() && useContainer || wrapSingleRecord) { + out.write('['); + } + byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT + : (useStandardJson ? toStandardJSON(stream.getSchema(), currRecord) : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); + out.write(outputBytes); + while (stream.hasNext()) { + if (useContainer) { + out.write(','); + } else { + out.write('\n'); + } + + currRecord = stream.next(currRecord); + out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); + recordCount++; + } + if (recordCount > 1 && useContainer || wrapSingleRecord) { + out.write(']'); } - - currRecord = reader.next(currRecord); - out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); - recordCount++; - } - - // Close container if desired output is an array format and there are multiple records or if - // configured to wrap a single record - if (recordCount > 1 && useContainer || wrapSingleRecord) { - out.write(']'); } } } @@ -225,4 +230,14 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); session.transfer(flowFile, REL_SUCCESS); } + + private byte[] toStandardJSON(Schema shcemaToUse, GenericRecord datum) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DatumWriter writer = new GenericDatumWriter(shcemaToUse); + JsonEncoder encoder = EncoderFactory.get().jsonEncoder(shcemaToUse, bos); + writer.write(datum, encoder); + encoder.flush(); + bos.flush(); + return bos.toByteArray(); + } } diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 0884eb311bed..733ae57ef4e3 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -238,6 +238,37 @@ public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer() thro out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); } + @Test + public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer_StandardJson() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + runner.setProperty(ConvertAvroToJSON.USE_STANDARD_JSON, "true"); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\":\"Alyssa\",\"favorite_number\":{\"int\":256},\"favorite_color\":null}"); + } + @Test public void testMultipleAvroMessages() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); @@ -265,6 +296,34 @@ public void testMultipleAvroMessages() throws IOException { out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null},{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}]"); } + @Test + public void testMultipleAvroMessagesStandardJson() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); + runner.setProperty(ConvertAvroToJSON.USE_STANDARD_JSON, "true"); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "George"); + user2.put("favorite_number", 1024); + user2.put("favorite_color", "red"); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("[{\"name\":\"Alyssa\",\"favorite_number\":{\"int\":256},\"favorite_color\":null},{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}]"); + } + @Test public void testNonJsonHandledProperly() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); From 9b433eea03b897abaffa41f19fa4b5f465b2daae Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 9 Nov 2016 11:22:22 -0500 Subject: [PATCH 2/3] NIFI-969 addressed PR comments --- .../nifi/processors/avro/ConvertAvroToJSON.java | 15 +++++++-------- .../processors/avro/TestConvertAvroToJSON.java | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index e91159cfd1ef..c2531a2b4d5d 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -97,8 +97,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) .build(); - static final PropertyDescriptor USE_STANDARD_JSON = new PropertyDescriptor.Builder() - .name("Use Standard JSON") + static final PropertyDescriptor USE_AVRO_JSON = new PropertyDescriptor.Builder().name("Use AVRO JSON") .description("Determines if the resulting JSON output is in AVRO-JSON format or standard JSON format. Default 'false'") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .required(true) @@ -124,7 +123,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { properties.add(CONTAINER_OPTIONS); properties.add(WRAP_SINGLE_RECORD); properties.add(SCHEMA); - properties.add(USE_STANDARD_JSON); + properties.add(USE_AVRO_JSON); PROPERTIES = Collections.unmodifiableList(properties); Set relationships = new HashSet<>(); @@ -139,7 +138,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { private volatile boolean wrapSingleRecord; - private volatile boolean useStandardJson; + private volatile boolean useAvroJson; @OnScheduled public void schedule(ProcessContext context) { @@ -148,7 +147,7 @@ public void schedule(ProcessContext context) { this.wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; String stringSchema = context.getProperty(SCHEMA).getValue(); this.schema = stringSchema == null ? null : new Schema.Parser().parse(stringSchema); - this.useStandardJson = context.getProperty(USE_STANDARD_JSON).asBoolean(); + this.useAvroJson = context.getProperty(USE_AVRO_JSON).asBoolean(); } @Override @@ -183,7 +182,7 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I out.write('['); } byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT - : (useStandardJson ? toStandardJSON(schema, currRecord) : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); + : (useAvroJson ? toAvroJSON(schema, currRecord) : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); out.write(outputBytes); if (useContainer && wrapSingleRecord) { out.write(']'); @@ -200,7 +199,7 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I out.write('['); } byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT - : (useStandardJson ? toStandardJSON(stream.getSchema(), currRecord) : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); + : (useAvroJson ? toAvroJSON(stream.getSchema(), currRecord) : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); out.write(outputBytes); while (stream.hasNext()) { if (useContainer) { @@ -231,7 +230,7 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I session.transfer(flowFile, REL_SUCCESS); } - private byte[] toStandardJSON(Schema shcemaToUse, GenericRecord datum) throws IOException { + private byte[] toAvroJSON(Schema shcemaToUse, GenericRecord datum) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); DatumWriter writer = new GenericDatumWriter(shcemaToUse); JsonEncoder encoder = EncoderFactory.get().jsonEncoder(shcemaToUse, bos); diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 733ae57ef4e3..a1f8093cd0f5 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -246,7 +246,7 @@ public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer_Standa Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); String stringSchema = schema.toString(); runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); - runner.setProperty(ConvertAvroToJSON.USE_STANDARD_JSON, "true"); + runner.setProperty(ConvertAvroToJSON.USE_AVRO_JSON, "true"); final GenericRecord user1 = new GenericData.Record(schema); user1.put("name", "Alyssa"); @@ -302,7 +302,7 @@ public void testMultipleAvroMessagesStandardJson() throws IOException { final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); - runner.setProperty(ConvertAvroToJSON.USE_STANDARD_JSON, "true"); + runner.setProperty(ConvertAvroToJSON.USE_AVRO_JSON, "true"); final GenericRecord user1 = new GenericData.Record(schema); user1.put("name", "Alyssa"); From 94f4862ad8df202c92f16c4779f51408f4b20804 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 9 Nov 2016 11:53:30 -0500 Subject: [PATCH 3/3] NIFI-969 addressed PR comments round 2 --- .../org/apache/nifi/processors/avro/ConvertAvroToJSON.java | 6 +++--- .../apache/nifi/processors/avro/TestConvertAvroToJSON.java | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index c2531a2b4d5d..faaad3fb3da3 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -230,10 +230,10 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I session.transfer(flowFile, REL_SUCCESS); } - private byte[] toAvroJSON(Schema shcemaToUse, GenericRecord datum) throws IOException { + private byte[] toAvroJSON(Schema schemaToUse, GenericRecord datum) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DatumWriter writer = new GenericDatumWriter(shcemaToUse); - JsonEncoder encoder = EncoderFactory.get().jsonEncoder(shcemaToUse, bos); + DatumWriter writer = new GenericDatumWriter(schemaToUse); + JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schemaToUse, bos); writer.write(datum, encoder); encoder.flush(); bos.flush(); diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index a1f8093cd0f5..e25bf8a24c3b 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -18,6 +18,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; @@ -239,7 +240,7 @@ public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer() thro } @Test - public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer_StandardJson() throws IOException { + public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer_AvroJson() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); @@ -297,7 +298,7 @@ public void testMultipleAvroMessages() throws IOException { } @Test - public void testMultipleAvroMessagesStandardJson() throws IOException { + public void testMultipleAvroMessagesAvroJson() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); @@ -313,6 +314,7 @@ public void testMultipleAvroMessagesStandardJson() throws IOException { user2.put("favorite_number", 1024); user2.put("favorite_color", "red"); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2); runner.enqueue(out1.toByteArray());