From b1ecbe33a99f684a9ae0a649ae2069852f688d9e Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 26 Jun 2017 11:06:35 -0400 Subject: [PATCH 1/3] NIFI-4122 Added the ability to combine multiple Mongo result documents into a single output JSON array. --- .../nifi/processors/mongodb/GetMongo.java | 101 ++++++++++++++---- 1 file changed, 80 insertions(+), 21 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index fc00a080db23..847bfc2f618c 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -18,14 +18,9 @@ */ package org.apache.nifi.processors.mongodb; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -44,10 +39,16 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; +import org.codehaus.jackson.map.ObjectMapper; -import com.mongodb.client.FindIterable; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -99,6 +100,12 @@ public ValidationResult validate(final String subject, final String value, final .required(false) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() + .name("Results Per FlowFile") + .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") + .required(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); private final static Set relationships; private final static List propertyDescriptors; @@ -111,6 +118,7 @@ public ValidationResult validate(final String subject, final String value, final _propertyDescriptors.add(SORT); _propertyDescriptors.add(LIMIT); _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(RESULTS_PER_FLOWFILE); _propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(CLIENT_AUTH); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); @@ -130,6 +138,31 @@ public final List getSupportedPropertyDescriptors() { return propertyDescriptors; } + private ObjectMapper mapper = new ObjectMapper(); + + //Turn a list of Mongo result documents into a String representation of a JSON array + private String buildBatch(List documents) throws IOException { + List docs = new ArrayList<>(); + for (Document document : documents) { + String asJson = document.toJson(); + docs.add(mapper.readValue(asJson, Map.class)); + } + + return mapper.writeValueAsString(docs); + } + + private void writeBatch(String payload, ProcessContext context, ProcessSession session) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(payload.getBytes("UTF-8")); + } + }); + session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.transfer(flowFile, REL_SUCCESS); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final ComponentLog logger = getLogger(); @@ -158,17 +191,43 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final MongoCursor cursor = it.iterator(); try { FlowFile flowFile = null; - while (cursor.hasNext()) { - flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - IOUtils.write(cursor.next().toJson(), out); + if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { + int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + List batch = new ArrayList<>(); + + while (cursor.hasNext()) { + batch.add(cursor.next()); + if (batch.size() == ceiling) { + try { + getLogger().info("Writing batch..."); + String payload = buildBatch(batch); + writeBatch(payload, context, session); + batch = new ArrayList<>(); + } catch (IOException ex) { + getLogger().error("Error building batch", ex); + } } - }); - - session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); - session.transfer(flowFile, REL_SUCCESS); + } + if (batch.size() > 0) { + try { + writeBatch(buildBatch(batch), context, session); + } catch (IOException ex) { + getLogger().error("Error sending remainder of batch", ex); + } + } + } else { + while (cursor.hasNext()) { + flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + IOUtils.write(cursor.next().toJson(), out); + } + }); + + session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.transfer(flowFile, REL_SUCCESS); + } } session.commit(); From 3870223e7f5120882aaf809c810f43b3758ca6f7 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Wed, 28 Jun 2017 12:09:26 -0400 Subject: [PATCH 2/3] NIFI-4122 Made a few changes requested for this branch: * Added MIME Type. * Changed a logger from info to debug. * Fixed import ordering. * Added JUnit coverage for the Results Per FlowFile --- .../nifi/processors/mongodb/GetMongo.java | 28 ++++++++++++------- .../nifi/processors/mongodb/GetMongoTest.java | 11 ++++++++ 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 847bfc2f618c..702fa14be871 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -18,6 +18,15 @@ */ package org.apache.nifi.processors.mongodb; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -31,6 +40,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -41,14 +51,6 @@ import org.bson.Document; import org.codehaus.jackson.map.ObjectMapper; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -101,7 +103,8 @@ public ValidationResult validate(final String subject, final String value, final .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() - .name("Results Per FlowFile") + .name("results-per-flowfile") + .displayName("Results Per FlowFile") .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .required(false) .addValidator(StandardValidators.INTEGER_VALIDATOR) @@ -159,6 +162,7 @@ public void process(OutputStream out) throws IOException { out.write(payload.getBytes("UTF-8")); } }); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); session.transfer(flowFile, REL_SUCCESS); } @@ -189,6 +193,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } final MongoCursor cursor = it.iterator(); + ComponentLog log = getLogger(); try { FlowFile flowFile = null; if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { @@ -199,7 +204,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session batch.add(cursor.next()); if (batch.size() == ceiling) { try { - getLogger().info("Writing batch..."); + if (log.isDebugEnabled()) { + log.debug("Writing batch..."); + } String payload = buildBatch(batch); writeBatch(payload, context, session); batch = new ArrayList<>(); @@ -224,6 +231,7 @@ public void process(OutputStream out) throws IOException { IOUtils.write(cursor.next().toJson(), out); } }); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index 810fc4d2b9af..e89b73684eb0 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; @@ -200,4 +201,14 @@ public void testLimit() throws Exception { List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson()); } + + @Test + public void testResultsPerFlowfile() throws Exception { + runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "20"); + runner.run(); + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); + List results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); + Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json"); + } } From 8d2cc57b6e1c2165e3a651995ac6309848e4aff4 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 29 Jun 2017 05:47:20 -0400 Subject: [PATCH 3/3] NIFI-4122 Added changes suggested during code review: * Changed validator to "positive integer validator" for results per flowfile. * Improved the unit test for results per flowfile. --- .../java/org/apache/nifi/processors/mongodb/GetMongo.java | 2 +- .../org/apache/nifi/processors/mongodb/GetMongoTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 702fa14be871..8b73b02f0384 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -107,7 +107,7 @@ public ValidationResult validate(final String subject, final String value, final .displayName("Results Per FlowFile") .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .required(false) - .addValidator(StandardValidators.INTEGER_VALIDATOR) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); private final static Set relationships; diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index e89b73684eb0..02f5904434e7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -42,7 +42,7 @@ import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; -@Ignore("Integration tests that cause failures in some environments. Require that they be run from Maven to run the embedded mongo maven plugin. Maven Plugin also fails in my CentOS 7 environment.") +//@Ignore("Integration tests that cause failures in some environments. Require that they be run from Maven to run the embedded mongo maven plugin. Maven Plugin also fails in my CentOS 7 environment.") public class GetMongoTest { private static final String MONGO_URI = "mongodb://localhost"; private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase(); @@ -204,9 +204,9 @@ public void testLimit() throws Exception { @Test public void testResultsPerFlowfile() throws Exception { - runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "20"); + runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); runner.run(); - runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2); List results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0); Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json");