From 380e342643885b8919599a1e8d329dfbb89600a6 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Fri, 16 Mar 2018 21:51:19 -0400 Subject: [PATCH 1/2] NIFI-4989 Made PutMongo able to use nested lookup keys, a query param and multiple lookup keys. --- .../nifi/processors/mongodb/PutMongo.java | 98 +++++++++++--- .../nifi/processors/mongodb/PutMongoIT.java | 122 ++++++++++++++++++ 2 files changed, 205 insertions(+), 15 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index 078a3e8a226d..aa5b62c98051 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -30,6 +30,9 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; @@ -38,11 +41,13 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; import org.bson.Document; import org.bson.types.ObjectId; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -86,19 +91,27 @@ public class PutMongo extends AbstractMongoProcessor { .name("Update Query Key") .description("Key name used to build the update query criteria; this property is valid only when using update mode, " + "otherwise it is ignored") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .addValidator(Validator.VALID) .defaultValue("_id") .build(); + static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder() + .name("putmongo-update-query") + .displayName("Update Query") + .required(false) + .addValidator(Validator.VALID) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder() - .displayName("Update Mode") - .name("put-mongo-update-mode") - .required(true) - .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS) - .defaultValue(UPDATE_WITH_DOC.getValue()) - .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " + - "or specify a document that contains update operators like $set and $unset") - .build(); + .displayName("Update Mode") + .name("put-mongo-update-mode") + .required(true) + .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS) + .defaultValue(UPDATE_WITH_DOC.getValue()) + .description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " + + "or specify a document that contains update operators like $set and $unset") + .build(); static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() .name("Character Set") .description("The Character Set in which the data is encoded") @@ -116,6 +129,7 @@ public class PutMongo extends AbstractMongoProcessor { _propertyDescriptors.add(MODE); _propertyDescriptors.add(UPSERT); _propertyDescriptors.add(UPDATE_QUERY_KEY); + _propertyDescriptors.add(UPDATE_QUERY); _propertyDescriptors.add(UPDATE_MODE); _propertyDescriptors.add(WRITE_CONCERN); _propertyDescriptors.add(CHARACTER_SET); @@ -137,6 +151,31 @@ public List getSupportedPropertyDescriptors() { return propertyDescriptors; } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + List problems = new ArrayList<>(); + + final boolean queryKey = validationContext.getProperty(UPDATE_QUERY_KEY).isSet() + && !StringUtils.isBlank(validationContext.getProperty(UPDATE_QUERY_KEY).getValue()); + final boolean query = validationContext.getProperty(UPDATE_QUERY).isSet(); + + if (queryKey && query) { + problems.add(new ValidationResult.Builder() + .valid(false) + .explanation("Both update query key and update query cannot be set at the same time.") + .build() + ); + } else if (!queryKey && !query) { + problems.add(new ValidationResult.Builder() + .valid(false) + .explanation("Either the update query key or the update query field must be set.") + .build() + ); + } + + return problems; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final FlowFile flowFile = session.get(); @@ -169,14 +208,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); + final String updateQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final Document query; - Object keyVal = ((Map)doc).get(updateKey); - if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) { - keyVal = new ObjectId((String) keyVal); + if (!StringUtils.isBlank(updateKey)) { + query = parseUpdateKey(updateKey, (Map)doc); + removeUpdateKeys(updateKey, (Map)doc); + } else { + query = Document.parse(updateQuery); } - final Document query = new Document(updateKey, keyVal); - if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert)); } else { @@ -196,6 +237,33 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } + private void removeUpdateKeys(String updateKeyParam, Map doc) { + String[] parts = updateKeyParam.split(",[\\s]*"); + for (String part : parts) { + if (part.contains(".")) { + doc.remove(part); + } + } + } + + private Document parseUpdateKey(String updateKey, Map doc) { + Document retVal; + if (updateKey.equals("_id") && ObjectId.isValid(((String) doc.get(updateKey)))) { + retVal = new Document("_id", new ObjectId((String) doc.get(updateKey))); + } else if (updateKey.contains(",")) { + String[] parts = updateKey.split(",[\\s]*"); + retVal = new Document(); + for (String part : parts) { + retVal.append(part, doc.get(part)); + } + } else { + retVal = new Document(updateKey, doc.get(updateKey)); + } + + return retVal; + } + + protected WriteConcern getWriteConcern(final ProcessContext context) { final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); WriteConcern writeConcern = null; diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java index 7e7f33086bba..ea17a318965f 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.mongodb; +import com.mongodb.client.MongoCursor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; @@ -31,9 +32,11 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -96,6 +99,125 @@ public void testValidators() { Assert.assertEquals(0, results.size()); } + @Test + public void testQueryAndUpdateKey() { + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); + runner.setProperty(PutMongo.UPDATE_QUERY, "{}"); + runner.assertNotValid(); + } + + @Test + public void testNoQueryAndNoUpdateKey() { + runner.removeProperty(PutMongo.UPDATE_QUERY); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, ""); + runner.assertNotValid(); + } + + @Test + public void testBlankUpdateKey() { + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, " "); + runner.assertNotValid(); + } + + @Test + public void testUpdateQuery() { + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering"); + collection.insertOne(document); + String updateBody = "{\n" + + "\t\"$set\": {\n" + + "\t\t\"email\": \"john.smith@test.com\",\n" + + "\t\t\"grade\": \"Sr. Principle Eng.\"\n" + + "\t},\n" + + "\t\"$inc\": {\n" + + "\t\t\"writes\": 1\n" + + "\t}\n" + + "}"; + Map attr = new HashMap<>(); + attr.put("mongo.update.query", document.toJson()); + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, ""); + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}"); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody, attr); + updateTests(document); + } + + @Test + public void testUpdateBySimpleKey() { + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering"); + collection.insertOne(document); + String updateBody = "{\n" + + "\t\"name\": \"John Smith\",\n" + + "\t\"$set\": {\n" + + "\t\t\"email\": \"john.smith@test.com\",\n" + + "\t\t\"grade\": \"Sr. Principle Eng.\"\n" + + "\t},\n" + + "\t\"$inc\": {\n" + + "\t\t\"writes\": 1\n" + + "\t}\n" + + "}"; + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name"); + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody); + updateTests(document); + } + + @Test + public void testUpdateByComplexKey() { + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering") + .append("contacts", new Document().append("email", "john.smith@test.com") + .append("phone", "555-555-5555")); + collection.insertOne(document); + String updateBody = "{\n" + + "\t\"contacts.phone\": \"555-555-5555\",\n" + + "\t\"contacts.email\": \"john.smith@test.com\",\n" + + "\t\"$set\": {\n" + + "\t\t\"contacts.twitter\": \"@JohnSmith\"\n" + + "\t},\n" + + "\t\"$inc\": {\n" + + "\t\t\"writes\": 1\n" + + "\t}\n" + + "}"; + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "contacts.phone,contacts.email"); + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + MongoCursor iterator = collection.find(new Document("name", "John Smith")).iterator(); + Assert.assertTrue("Document did not come back.", iterator.hasNext()); + Document val = iterator.next(); + Map contacts = (Map)val.get("contacts"); + Assert.assertNotNull(contacts); + Assert.assertTrue(contacts.containsKey("twitter") && contacts.get("twitter").equals("@JohnSmith")); + Assert.assertTrue(val.containsKey("writes") && val.get("writes").equals(1)); + } + + private void updateTests(Document document) { + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + MongoCursor iterator = collection.find(document).iterator(); + Assert.assertTrue("Document did not come back.", iterator.hasNext()); + Document val = iterator.next(); + Assert.assertTrue(val.containsKey("email") && val.get("email").equals("john.smith@test.com")); + Assert.assertTrue(val.containsKey("grade") && val.get("grade").equals("Sr. Principle Eng.")); + Assert.assertTrue(val.containsKey("writes") && val.get("writes").equals(1)); + } + @Test public void testInsertOne() throws Exception { Document doc = DOCUMENTS.get(0); From a9241045619ac3789e33862e4f9df159f225224a Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sun, 18 Mar 2018 08:07:19 -0400 Subject: [PATCH 2/2] NIFI-4989 Added some additional tests and fixes based on a code review. --- .../nifi/processors/mongodb/PutMongo.java | 5 +- .../nifi/processors/mongodb/PutMongoIT.java | 49 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index aa5b62c98051..0483a1338b11 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -98,6 +98,7 @@ public class PutMongo extends AbstractMongoProcessor { static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder() .name("putmongo-update-query") .displayName("Update Query") + .description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert.") .required(false) .addValidator(Validator.VALID) .expressionLanguageSupported(true) @@ -208,14 +209,14 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); - final String updateQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); final Document query; if (!StringUtils.isBlank(updateKey)) { query = parseUpdateKey(updateKey, (Map)doc); removeUpdateKeys(updateKey, (Map)doc); } else { - query = Document.parse(updateQuery); + query = Document.parse(filterQuery); } if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java index ea17a318965f..07d5251da8eb 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java @@ -151,6 +151,7 @@ public void testUpdateBySimpleKey() { .append("name", "John Smith") .append("department", "Engineering"); collection.insertOne(document); + String updateBody = "{\n" + "\t\"name\": \"John Smith\",\n" + "\t\"$set\": {\n" + @@ -169,6 +170,54 @@ public void testUpdateBySimpleKey() { updateTests(document); } + @Test + public void testUpdateWithFullDocByKeys() { + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name,department"); + testUpdateFullDocument(); + } + + @Test + public void testUpdateWithFullDocByQuery() { + String query = "{ \"name\": \"John Smith\"}"; + runner.setProperty(PutMongo.UPDATE_QUERY_KEY, ""); + runner.setProperty(PutMongo.UPDATE_QUERY, query); + testUpdateFullDocument(); + } + + private void testUpdateFullDocument() { + Document document = new Document() + .append("name", "John Smith") + .append("department", "Engineering"); + collection.insertOne(document); + String updateBody = "{\n" + + "\t\"name\": \"John Smith\",\n" + + "\t\"department\": \"Engineering\",\n" + + "\t\"contacts\": {\n" + + "\t\t\"phone\": \"555-555-5555\",\n" + + "\t\t\"email\": \"john.smith@test.com\",\n" + + "\t\t\"twitter\": \"@JohnSmith\"\n" + + "\t}\n" + + "}"; + runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_DOC); + runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); + runner.setValidateExpressionUsage(true); + runner.enqueue(updateBody); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + MongoCursor cursor = collection.find(document).iterator(); + Document found = cursor.next(); + Assert.assertEquals(found.get("name"), document.get("name")); + Assert.assertEquals(found.get("department"), document.get("department")); + Document contacts = (Document)found.get("contacts"); + Assert.assertNotNull(contacts); + Assert.assertEquals(contacts.get("twitter"), "@JohnSmith"); + Assert.assertEquals(contacts.get("email"), "john.smith@test.com"); + Assert.assertEquals(contacts.get("phone"), "555-555-5555"); + Assert.assertEquals(collection.count(document), 1); + } + @Test public void testUpdateByComplexKey() { Document document = new Document()