Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -34,6 +37,7 @@
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
Expand All @@ -42,6 +46,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@SupportsBatching
Expand All @@ -53,6 +58,16 @@
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
public class ConvertAvroToJSON extends AbstractProcessor {

@VisibleForTesting
static final PropertyDescriptor WRAP_AS_ARRAY
= new PropertyDescriptor.Builder()
.name("Expose stream of records as array of JSON Objects")
.description("Determines how stream of records is exposed: either as a sequence of single Objects (false), writing every Object to a new line, or as an array of Objects (true). Default value is true, meaning that the Avro content is exposed as a sequence of root-level Object entries.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue(String.valueOf("true"))
.build();

static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been converted to JSON")
Expand All @@ -62,6 +77,15 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
.build();

private static final List<PropertyDescriptor> PROPERTIES
= ImmutableList.<PropertyDescriptor>builder()
.add(WRAP_AS_ARRAY)
.build();

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
Expand All @@ -77,11 +101,14 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
return;
}

final boolean wrapAsArray = context.getProperty(WRAP_AS_ARRAY).asBoolean();

try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);

final OutputStream out = new BufferedOutputStream(rawOut);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {

Expand All @@ -90,21 +117,24 @@ public void process(final InputStream rawIn, final OutputStream rawOut) throws I
final String json = genericData.toString(record);

int recordCount = 0;
if (reader.hasNext()) {
if (reader.hasNext() && wrapAsArray) {
out.write('[');
}

out.write(json.getBytes(StandardCharsets.UTF_8));
recordCount++;

while (reader.hasNext()) {
out.write(',');
if (wrapAsArray) {
out.write(',');
}

final GenericRecord nextRecord = reader.next(record);
out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8));
recordCount++;
}

if (recordCount > 1) {
if (recordCount > 1 && wrapAsArray) {
out.write(']');
}
}
Expand Down