From b963fb76b96edd383e6fda1568ea21fd49a69990 Mon Sep 17 00:00:00 2001 From: zenfenan Date: Sat, 9 Jun 2018 17:35:59 +0530 Subject: [PATCH 1/2] NIFI-5284: Added JSON_TYPE support to RunMongoAggregation --- .../mongodb/AbstractMongoProcessor.java | 37 +++++++++++++++++++ .../nifi/processors/mongodb/GetMongo.java | 36 +----------------- .../mongodb/RunMongoAggregation.java | 14 ++++--- .../mongodb/RunMongoAggregationIT.java | 36 +++++++++++++++++- 4 files changed, 82 insertions(+), 41 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 339bee70e5df..d2939ac8ef3c 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.nifi.processors.mongodb; +import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientOptions.Builder; @@ -29,6 +30,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.authentication.exception.ProviderCreationException; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -45,6 +47,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -57,6 +61,13 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; + protected static final String JSON_TYPE_EXTENDED = "Extended"; + protected static final String JSON_TYPE_STANDARD = "Standard"; + protected static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", + "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); + protected static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", + "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); + protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() .name("Mongo URI") .displayName("Mongo URI") @@ -65,6 +76,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("Mongo Database Name") .displayName("Mongo Database Name") @@ -73,6 +85,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() .name("Mongo Collection Name") .description("The name of the collection to use") @@ -80,6 +93,19 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + + protected static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() + .allowableValues(JSON_EXTENDED, JSON_STANDARD) + .defaultValue(JSON_TYPE_EXTENDED) + .displayName("JSON Type") + .name("json-type") + .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + + " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + + " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl-context-service") .displayName("SSL Context Service") @@ -88,6 +114,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .required(false) .identifiesControllerService(SSLContextService.class) .build(); + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("ssl-client-auth") .displayName("Client Auth") @@ -155,6 +182,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { descriptors.add(CLIENT_AUTH); } + protected ObjectMapper objectMapper; protected MongoClient mongoClient; @OnScheduled @@ -275,4 +303,13 @@ protected void writeBatch(String payload, FlowFile parent, ProcessContext contex session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, rel); } + + protected void configureMapper(String setting) { + objectMapper = new ObjectMapper(); + if (setting.equals(JSON_TYPE_STANDARD)) { + objectMapper.registerModule(ObjectIdSerializer.getModule()); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + objectMapper.setDateFormat(df); + } + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 10eb0c723a21..8fc03be6dbd3 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -45,8 +45,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -55,7 +53,6 @@ import java.util.Map; import java.util.Set; - @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_ALLOWED) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") @@ -134,24 +131,6 @@ public class GetMongo extends AbstractMongoProcessor { .addValidator(Validator.VALID) .build(); - static final String JSON_TYPE_EXTENDED = "Extended"; - static final String JSON_TYPE_STANDARD = "Standard"; - static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", - "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); - static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", - "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); - static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() - .allowableValues(JSON_EXTENDED, JSON_STANDARD) - .defaultValue(JSON_TYPE_EXTENDED) - .displayName("JSON Type") - .name("json-type") - .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + - " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + - " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(true) - .build(); - private final static Set relationships; private final static List propertyDescriptors; @@ -189,8 +168,6 @@ public final List getSupportedPropertyDescriptors() { return propertyDescriptors; } - private ObjectMapper mapper; - //Turn a list of Mongo result documents into a String representation of a JSON array private String buildBatch(List documents, String jsonTypeSetting, String prettyPrintSetting) throws IOException { StringBuilder builder = new StringBuilder(); @@ -198,7 +175,7 @@ private String buildBatch(List documents, String jsonTypeSetting, Stri Document document = documents.get(index); String asJson; if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - asJson = getObjectWriter(mapper, prettyPrintSetting).writeValueAsString(document); + asJson = getObjectWriter(objectMapper, prettyPrintSetting).writeValueAsString(document); } else { asJson = document.toJson(new JsonWriterSettings(true)); } @@ -210,15 +187,6 @@ private String buildBatch(List documents, String jsonTypeSetting, Stri return "[" + builder.toString() + "]"; } - private void configureMapper(String setting) { - mapper = new ObjectMapper(); - if (setting.equals(JSON_TYPE_STANDARD)) { - mapper.registerModule(ObjectIdSerializer.getModule()); - DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - mapper.setDateFormat(df); - } - } - private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); @@ -334,7 +302,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session flowFile = session.write(flowFile, out -> { String json; if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - json = getObjectWriter(mapper, usePrettyPrint).writeValueAsString(cursor.next()); + json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()); } else { json = cursor.next().toJson(); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index 19969c420adf..25ea08625bd7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -96,6 +96,7 @@ static final List buildAggregationQuery(String query) throws IOException { _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(CHARSET); _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(QUERY_ATTRIBUTE); _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(RESULTS_PER_FLOWFILE); @@ -120,11 +121,14 @@ public final List getSupportedPropertyDescriptors() { return propertyDescriptors; } - static String buildBatch(List batch) { - ObjectMapper mapper = new ObjectMapper(); + private String buildBatch(List batch, ProcessContext context) { + + final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); + configureMapper(jsonTypeSetting); + String retVal; try { - retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); + retVal = objectMapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); } catch (Exception e) { retVal = null; } @@ -167,13 +171,13 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro while (iter.hasNext()) { batch.add(iter.next()); if (batch.size() == resultsPerFlowfile) { - writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); + writeBatch(buildBatch(batch, context), flowFile, context, session, attrs, REL_RESULTS); batch = new ArrayList(); } } if (batch.size() > 0) { - writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); + writeBatch(buildBatch(batch, context), flowFile, context, session, attrs, REL_RESULTS); } if (flowFile != null) { diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java index f2ddbca13c85..02d9ad4619a9 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java @@ -33,6 +33,7 @@ import org.junit.Test; import java.io.IOException; +import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.HashMap; import java.util.List; @@ -48,6 +49,7 @@ public class RunMongoAggregationIT { private TestRunner runner; private MongoClient mongoClient; private Map mappings; + private Calendar now = Calendar.getInstance(); @Before public void setup() { @@ -68,7 +70,7 @@ public void setup() { for (int x = 0; x < values.length; x++) { for (int y = 0; y < x + 2; y++) { - Document doc = new Document().append("val", values[x]); + Document doc = new Document().append("val", values[x]).append("date", now.getTime()); collection.insertOne(doc); } mappings.put(values[x], x + 2); @@ -78,7 +80,6 @@ public void setup() { @After public void teardown() { runner = null; - mongoClient.getDatabase(DB_NAME).drop(); } @@ -164,6 +165,37 @@ public void testInvalidQuery(){ runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1); } + @Test + public void testJsonTypes() throws IOException { + + runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_STANDARD); + runner.setProperty(RunMongoAggregation.QUERY, "[ { \"$project\": { \"myArray\": [ \"$val\", \"$date\" ] } } ]"); + runner.enqueue("test"); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + ObjectMapper mapper = new ObjectMapper(); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + for (MockFlowFile mockFlowFile : flowFiles) { + byte[] raw = runner.getContentAsByteArray(mockFlowFile); + Map> read = mapper.readValue(raw, Map.class); + Assert.assertTrue(read.get("myArray").get(1).equalsIgnoreCase( format.format(now.getTime()))); + } + + runner.clearTransferState(); + + runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_EXTENDED); + runner.enqueue("test"); + runner.run(1, true, true); + + flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + for (MockFlowFile mockFlowFile : flowFiles) { + byte[] raw = runner.getContentAsByteArray(mockFlowFile); + Map> read = mapper.readValue(raw, Map.class); + Assert.assertTrue(read.get("myArray").get(1) == now.getTimeInMillis()); + } + } + private void evaluateRunner(int original) throws IOException { runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size()); runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original); From 9ec1257604c25f4cdc340a9324ddbe68c9600917 Mon Sep 17 00:00:00 2001 From: zenfenan Date: Sun, 10 Jun 2018 11:44:18 +0530 Subject: [PATCH 2/2] NIFI-5284: Further improvements --- .../mongodb/AbstractMongoProcessor.java | 3 ++- .../nifi/processors/mongodb/GetMongo.java | 2 +- .../mongodb/RunMongoAggregation.java | 27 +++++++++---------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index d2939ac8ef3c..fa3bc108114f 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -304,8 +304,9 @@ protected void writeBatch(String payload, FlowFile parent, ProcessContext contex session.transfer(flowFile, rel); } - protected void configureMapper(String setting) { + protected synchronized void configureMapper(String setting) { objectMapper = new ObjectMapper(); + if (setting.equals(JSON_TYPE_STANDARD)) { objectMapper.registerModule(ObjectIdSerializer.getModule()); DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 8fc03be6dbd3..356f3f4712c1 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -205,7 +205,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final ComponentLog logger = getLogger(); - Map attributes = new HashMap(); + Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); final Document query; diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index 25ea08625bd7..684c5f500c74 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -121,11 +121,7 @@ public final List getSupportedPropertyDescriptors() { return propertyDescriptors; } - private String buildBatch(List batch, ProcessContext context) { - - final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); - configureMapper(jsonTypeSetting); - + private String buildBatch(List batch) { String retVal; try { retVal = objectMapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); @@ -147,12 +143,15 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } } - String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); - String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); - Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); - Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); + final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); + + configureMapper(jsonTypeSetting); - Map attrs = new HashMap(); + Map attrs = new HashMap<>(); if (queryAttr != null && queryAttr.trim().length() > 0) { attrs.put(queryAttr, query); } @@ -166,18 +165,18 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro it.batchSize(batchSize != null ? batchSize : 1); iter = it.iterator(); - List batch = new ArrayList(); + List batch = new ArrayList<>(); while (iter.hasNext()) { batch.add(iter.next()); if (batch.size() == resultsPerFlowfile) { - writeBatch(buildBatch(batch, context), flowFile, context, session, attrs, REL_RESULTS); - batch = new ArrayList(); + writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); + batch = new ArrayList<>(); } } if (batch.size() > 0) { - writeBatch(buildBatch(batch, context), flowFile, context, session, attrs, REL_RESULTS); + writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); } if (flowFile != null) {