From 9c20f7f2f44ea12f2a4340bc67b9a36062e57e17 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 30 Jun 2017 20:43:50 +0200 Subject: [PATCH 1/3] NIFI-4082 - Added EL on GetMongo properties --- .../nifi/processors/mongodb/GetMongo.java | 21 +++++++++++++++---- .../nifi/processors/mongodb/GetMongoTest.java | 9 +++++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 8b73b02f0384..ea1e243a2be3 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -59,6 +59,13 @@ public class GetMongo extends AbstractMongoProcessor { public static final Validator DOCUMENT_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject).input(value); + + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return builder.valid(true).explanation("Contains Expression Language").build(); + } + String reason = null; try { Document.parse(value); @@ -66,7 +73,7 @@ public ValidationResult validate(final String subject, final String value, final reason = e.getClass().getName(); } - return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + return builder.explanation(reason).valid(reason == null).build(); } }; @@ -76,18 +83,21 @@ public ValidationResult validate(final String subject, final String value, final .name("Query") .description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried") .required(false) + .expressionLanguageSupported(true) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() .name("Projection") .description("The fields to be returned from the documents in the result set; must be a valid BSON document") .required(false) + .expressionLanguageSupported(true) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() .name("Sort") .description("The fields by which to sort; must be a valid BSON document") .required(false) + .expressionLanguageSupported(true) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() @@ -171,9 +181,12 @@ public void process(OutputStream out) throws IOException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final ComponentLog logger = getLogger(); - final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).getValue()) : null; - final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).getValue()) : null; - final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).getValue()) : null; + final Document query = context.getProperty(QUERY).isSet() + ? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null; + final Document projection = context.getProperty(PROJECTION).isSet() + ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null; + final Document sort = context.getProperty(SORT).isSet() + ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; final MongoCollection collection = getCollection(context); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index e39148d17ece..ee510e6cb0f1 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -120,8 +120,9 @@ public void testValidators() { Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException")); // invalid projection + runner.setVariable("projection", "{a: x,y,z}"); runner.setProperty(GetMongo.QUERY, "{a: 1}"); - runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}"); + runner.setProperty(GetMongo.PROJECTION, "${projection}"); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); results = new HashSet<>(); @@ -146,7 +147,8 @@ public void testValidators() { @Test public void testReadOneDocument() throws Exception { - runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}"); + runner.setVariable("query", "{a: 1, b: 3}"); + runner.setProperty(GetMongo.QUERY, "${query}"); runner.run(); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); @@ -180,8 +182,9 @@ public void testProjection() throws Exception { @Test public void testSort() throws Exception { + runner.setVariable("sort", "{a: -1, b: -1, c: 1}"); runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}"); - runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}"); + runner.setProperty(GetMongo.SORT, "${sort}"); runner.run(); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); From f052f686f259895e16286fe81c914bfd69111936 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 12 Jul 2017 18:51:36 +0200 Subject: [PATCH 2/3] NIFI-4082 - Added EL on DB, URI and Collection --- .../mongodb/AbstractMongoProcessor.java | 27 ++++++++++++++----- .../nifi/processors/mongodb/GetMongo.java | 4 +-- .../nifi/processors/mongodb/PutMongo.java | 4 +-- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 486a0771f40f..6f165c28cf54 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; @@ -48,18 +49,21 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .name("Mongo URI") .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("Mongo Database Name") .description("The name of the database to use") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() .name("Mongo Collection Name") .description("The name of the collection to use") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() @@ -124,11 +128,10 @@ public final void createClient(ProcessContext context) throws IOException { } try { - final String uri = context.getProperty(URI).getValue(); if(sslContext == null) { - mongoClient = new MongoClient(new MongoClientURI(uri)); + mongoClient = new MongoClient(new MongoClientURI(getURI(context))); } else { - mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext))); + mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext))); } } catch (Exception e) { getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e); @@ -153,12 +156,24 @@ public final void closeClient() { } protected MongoDatabase getDatabase(final ProcessContext context) { - final String databaseName = context.getProperty(DATABASE_NAME).getValue(); + return getDatabase(context, null); + } + + protected MongoDatabase getDatabase(final ProcessContext context, final FlowFile flowFile) { + final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue(); return mongoClient.getDatabase(databaseName); } protected MongoCollection getCollection(final ProcessContext context) { - final String collectionName = context.getProperty(COLLECTION_NAME).getValue(); - return getDatabase(context).getCollection(collectionName); + return getCollection(context, null); + } + + protected MongoCollection getCollection(final ProcessContext context, final FlowFile flowFile) { + final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue(); + return getDatabase(context, flowFile).getCollection(collectionName); + } + + protected String getURI(final ProcessContext context) { + return context.getProperty(URI).evaluateAttributeExpressions().getValue(); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index ea1e243a2be3..2b5a8c297f3e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -173,7 +173,7 @@ public void process(OutputStream out) throws IOException { } }); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, REL_SUCCESS); } @@ -246,7 +246,7 @@ public void process(OutputStream out) throws IOException { }); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, REL_SUCCESS); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index 51e5265f1ccd..5b5ad523d42e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -148,7 +148,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String mode = context.getProperty(MODE).getValue(); final WriteConcern writeConcern = getWriteConcern(context); - final MongoCollection collection = getCollection(context).withWriteConcern(writeConcern); + final MongoCollection collection = getCollection(context, flowFile).withWriteConcern(writeConcern); try { // Read the contents of the FlowFile into a byte array @@ -176,7 +176,7 @@ public void process(final InputStream in) throws IOException { logger.info("updated {} into MongoDB", new Object[] { flowFile }); } - session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue()); + session.getProvenanceReporter().send(flowFile, getURI(context)); session.transfer(flowFile, REL_SUCCESS); } catch (Exception e) { logger.error("Failed to insert {} into MongoDB due to {}", new Object[] {flowFile, e}, e); From 9d28ffe2757bed1deaef55f8376378e363698d79 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 18 Jul 2017 14:52:05 +0200 Subject: [PATCH 3/3] NIFI-4082 - Added UT for EL evaluation (URI, DB, Collection) and fixed ex. message for document validator. --- .../org/apache/nifi/processors/mongodb/GetMongo.java | 2 +- .../org/apache/nifi/processors/mongodb/GetMongoTest.java | 9 ++++++--- .../org/apache/nifi/processors/mongodb/PutMongoTest.java | 9 ++++++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 2b5a8c297f3e..79f50b239cef 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -70,7 +70,7 @@ public ValidationResult validate(final String subject, final String value, final try { Document.parse(value); } catch (final RuntimeException e) { - reason = e.getClass().getName(); + reason = e.getLocalizedMessage(); } return builder.explanation(reason).valid(reason == null).build(); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index ee510e6cb0f1..455d705bcb37 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -60,9 +60,12 @@ public class GetMongoTest { @Before public void setup() { runner = TestRunners.newTestRunner(GetMongo.class); - runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); - runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); - runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + runner.setVariable("uri", MONGO_URI); + runner.setVariable("db", DB_NAME); + runner.setVariable("collection", COLLECTION_NAME); + runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java index 6f22976de3bf..10f81d1e0393 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java @@ -61,9 +61,12 @@ public class PutMongoTest { @Before public void setup() { runner = TestRunners.newTestRunner(PutMongo.class); - runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); - runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); - runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + runner.setVariable("uri", MONGO_URI); + runner.setVariable("db", DATABASE_NAME); + runner.setVariable("collection", COLLECTION_NAME); + runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));