From 9477bfd7b65a943b4faced2259131c59c0051a4b Mon Sep 17 00:00:00 2001 From: zenfenan Date: Mon, 20 Aug 2018 16:36:25 +0530 Subject: [PATCH 1/2] NIFI-5544: GetMongo refactored --- .../nifi/processors/mongodb/GetMongo.java | 231 +++++++++--------- .../nifi/processors/mongodb/GetMongoIT.java | 35 ++- 2 files changed, 143 insertions(+), 123 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index cf9e3d75224a..dd338e99d505 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -67,22 +67,26 @@ public class GetMongo extends AbstractMongoProcessor { static final String DB_NAME = "mongo.database.name"; static final String COL_NAME = "mongo.collection.name"; - static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that have the results of a successful query execution go here.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("All input flowfiles that are part of a failed query execution go here.") + .description("All input FlowFiles that are part of a failed query execution go here.") .build(); static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") - .description("All input flowfiles that are part of a successful query execution go here.") + .description("All input FlowFiles that are part of a successful query execution go here.") .build(); static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() .name("Query") .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" + " an incoming connection from another processor to provide the query as a valid JSON document inside of " + - "the flowfile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " + + "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " + "that will result in a full collection fetch using a \"{}\" query.") .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -96,6 +100,7 @@ public class GetMongo extends AbstractMongoProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(JsonValidator.INSTANCE) .build(); + static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() .name("Sort") .description("The fields by which to sort; must be a valid BSON document") @@ -103,6 +108,7 @@ public class GetMongo extends AbstractMongoProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(JsonValidator.INSTANCE) .build(); + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() .name("Limit") .description("The maximum number of elements to return") @@ -113,7 +119,7 @@ public class GetMongo extends AbstractMongoProcessor { static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") - .description("The number of elements returned from the server in one batch") + .description("The number of elements to be returned from the server in one batch") .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) @@ -121,7 +127,7 @@ public class GetMongo extends AbstractMongoProcessor { static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() .name("results-per-flowfile") .displayName("Results Per FlowFile") - .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") + .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.") .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) @@ -129,11 +135,12 @@ public class GetMongo extends AbstractMongoProcessor { static final AllowableValue YES_PP = new AllowableValue("true", "True"); static final AllowableValue NO_PP = new AllowableValue("false", "False"); + static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder() .name("use-pretty-printing") .displayName("Pretty Print Results JSON") .description("Choose whether or not to pretty print the JSON from the results of the query. " + - "Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document") + "Choosing 'True' can greatly increase the space requirements on disk depending on the complexity of the JSON document") .required(true) .defaultValue(YES_PP.getValue()) .allowableValues(YES_PP, NO_PP) @@ -142,6 +149,7 @@ public class GetMongo extends AbstractMongoProcessor { private final static Set relationships; private final static List propertyDescriptors; + private ComponentLog logger; static { List _propertyDescriptors = new ArrayList<>(); @@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile input = null; + logger = getLogger(); + if (context.hasIncomingConnection()) { input = session.get(); - if (input == null && context.hasNonLoopConnection()) { return; } } - final ComponentLog logger = getLogger(); + final Document query = getQuery(context, session, input ); - Map attributes = new HashMap<>(); - attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + if (query == null) { + return; + } - final Document query; + final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); + final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue(); final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue()); + final Map attributes = new HashMap<>(); - String queryStr; - if (context.getProperty(QUERY).isSet()) { - queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue(); - query = Document.parse(queryStr); - } else if (!context.getProperty(QUERY).isSet() && input == null) { - queryStr = "{}"; - query = Document.parse("{}"); - } else { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - session.exportTo(input, out); - out.close(); - queryStr = new String(out.toByteArray()); - query = Document.parse(queryStr); - } catch (Exception ex) { - getLogger().error("Error reading flowfile", ex); - if (input != null) { //Likely culprit is a bad query - session.transfer(input, REL_FAILURE); - return; - } else { - throw new ProcessException(ex); - } - } - } + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); if (context.getProperty(QUERY_ATTRIBUTE).isSet()) { final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue(); - attributes.put(queryAttr, queryStr); + attributes.put(queryAttr, query.toJson()); } final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null; final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null; - final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); - final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue(); - configureMapper(jsonTypeSetting); + final MongoCollection collection = getCollection(context, input); + final FindIterable it = collection.find(query); - try { - final MongoCollection collection = getCollection(context, input); + attributes.put(DB_NAME, collection.getNamespace().getDatabaseName()); + attributes.put(COL_NAME, collection.getNamespace().getCollectionName()); - attributes.put(DB_NAME, collection.getNamespace().getDatabaseName()); - attributes.put(COL_NAME, collection.getNamespace().getCollectionName()); + if (projection != null) { + it.projection(projection); + } + if (sort != null) { + it.sort(sort); + } + if (context.getProperty(LIMIT).isSet()) { + it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger()); + } + if (context.getProperty(BATCH_SIZE).isSet()) { + it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger()); + } - final FindIterable it = query != null ? collection.find(query) : collection.find(); - if (projection != null) { - it.projection(projection); - } - if (sort != null) { - it.sort(sort); - } - if (context.getProperty(LIMIT).isSet()) { - it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger()); - } - if (context.getProperty(BATCH_SIZE).isSet()) { - it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger()); + try (MongoCursor cursor = it.iterator()) { + final List listOfDocuments = new ArrayList<>(); + Document doc; + + configureMapper(jsonTypeSetting); + + while ((doc = cursor.tryNext()) != null) { + listOfDocuments.add(doc); } - final MongoCursor cursor = it.iterator(); - ComponentLog log = getLogger(); - try { - FlowFile outgoingFlowFile; - if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { - int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger(); - List batch = new ArrayList<>(); - - while (cursor.hasNext()) { - batch.add(cursor.next()); - if (batch.size() == ceiling) { - try { - if (log.isDebugEnabled()) { - log.debug("Writing batch..."); - } - String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint); - writeBatch(payload, input, context, session, attributes, REL_SUCCESS); - batch = new ArrayList<>(); - } catch (Exception ex) { - getLogger().error("Error building batch", ex); - } - } - } - if (batch.size() > 0) { - try { - writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS); - } catch (Exception ex) { - getLogger().error("Error sending remainder of batch", ex); - } - } - } else { - while (cursor.hasNext()) { - outgoingFlowFile = (input == null) ? session.create() : session.create(input); - outgoingFlowFile = session.write(outgoingFlowFile, out -> { - String json; - if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()); - } else { - json = cursor.next().toJson(); - } - out.write(json.getBytes(charset)); - }); - outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes); - - session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context)); - session.transfer(outgoingFlowFile, REL_SUCCESS); + if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { + final int sizePerChunk = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger(); + List> chunks = chunkDocumentsList(listOfDocuments, sizePerChunk); + + for (final List chunk : chunks) { + try { + String payload = buildBatch(chunk, jsonTypeSetting, usePrettyPrint); + writeBatch(payload, input, context, session, attributes, REL_SUCCESS); + } catch (Exception e) { + logger.error("Error building batch due to {}", new Object[] {e}); } } + } else { + for (final Document document : listOfDocuments) { + FlowFile outgoingFF = (input == null) ? session.create() : session.create(input); - if (input != null) { - session.transfer(input, REL_ORIGINAL); - } + outgoingFF = session.write(outgoingFF, out -> { + String json; + + if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { + json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(document); + } else { + json = document.toJson(); + } + + out.write(json.getBytes(charset)); + }); + + outgoingFF = session.putAllAttributes(outgoingFF, attributes); - } finally { - cursor.close(); + session.getProvenanceReporter().receive(outgoingFF, getURI(context)); + session.transfer(outgoingFF, REL_SUCCESS); + } } - } catch (final RuntimeException e) { if (input != null) { - session.transfer(input, REL_FAILURE); + session.transfer(input, REL_ORIGINAL); } - context.yield(); - logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e); } + + } + + private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) { + Document query = null; + if (context.getProperty(QUERY).isSet()) { + query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue()); + } else if (!context.getProperty(QUERY).isSet() && input == null) { + query = Document.parse("{}"); + } else { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(input, out); + out.close(); + query = Document.parse(new String(out.toByteArray())); + } catch (Exception ex) { + logger.error("Error reading FlowFile : ", ex); + if (input != null) { //Likely culprit is a bad query + session.transfer(input, REL_FAILURE); + session.commit(); + } else { + throw new ProcessException(ex); + } + } + } + + return query; + } + + private List> chunkDocumentsList(List originalList, int perChunkSize) { + final List> chunks = new ArrayList<>(); + final int originalSize = originalList.size(); + + for (int i = 0; i < originalSize; i += perChunkSize) { + chunks.add(new ArrayList<>( + originalList.subList(i, Math.min(originalSize, i + perChunkSize))) + ); + } + + return chunks; } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java index aaf6fa3dfc8f..08bc8cfaf5f7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java @@ -194,8 +194,8 @@ public void testReadOneDocument() throws Exception { public void testReadMultipleDocuments() throws Exception { runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}"); runner.run(); - runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); for (int i=0; i < flowFiles.size(); i++) { flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson()); @@ -313,7 +313,7 @@ public void testQueryAttribute() { runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr); runner.run(); runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); - testQueryAttribute(attr, "{}"); + testQueryAttribute(attr, "{ }"); runner.clearTransferState(); @@ -323,7 +323,7 @@ public void testQueryAttribute() { runner.removeProperty(GetMongo.QUERY); runner.setIncomingConnection(false); runner.run(); - testQueryAttribute(attr, "{}"); + testQueryAttribute(attr, "{ }"); runner.clearTransferState(); @@ -334,7 +334,7 @@ public void testQueryAttribute() { runner.setIncomingConnection(true); runner.enqueue("{}"); runner.run(); - testQueryAttribute(attr, "{}"); + testQueryAttribute(attr, "{ }"); /* * Input flowfile with invalid query @@ -478,6 +478,7 @@ public void testKeepOriginalAttributes() { */ @Test public void testDatabaseEL() { + runner.clearTransferState(); runner.removeVariable("collection"); runner.removeVariable("db"); runner.setIncomingConnection(true); @@ -506,26 +507,36 @@ public void testDatabaseEL() { } Map> vals = new HashMap>(){{ - put("Database", new HashMap(){{ - put("db", ""); - put("collection", "test"); - }}); put("Collection", new HashMap(){{ put("db", "getmongotest"); put("collection", ""); }}); + put("Database", new HashMap(){{ + put("db", ""); + put("collection", "test"); + }}); }}; + TestRunner tmpRunner; + for (Map.Entry> entry : vals.entrySet()) { - runner.enqueue("{}", entry.getValue()); + // Creating a new runner for each set of attributes map since every subsequent runs will attempt to take the top most enqueued FlowFile + tmpRunner = TestRunners.newTestRunner(GetMongo.class); + tmpRunner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); + tmpRunner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); + tmpRunner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + tmpRunner.setIncomingConnection(true); + + tmpRunner.enqueue("{ }", entry.getValue()); + try { - runner.run(); + tmpRunner.run(); } catch (Throwable ex) { Throwable cause = ex.getCause(); Assert.assertTrue(cause instanceof ProcessException); - Assert.assertTrue(entry.getKey(), cause.getMessage().contains(entry.getKey())); + Assert.assertTrue(entry.getKey(), ex.getMessage().contains(entry.getKey())); } - runner.clearTransferState(); + tmpRunner.clearTransferState(); } } From bcdee18cc9ed7871399c38aabbc98d1e4572c657 Mon Sep 17 00:00:00 2001 From: zenfenan Date: Thu, 30 Aug 2018 23:52:58 +0530 Subject: [PATCH 2/2] NIFI-5544: PR Review changes --- .../nifi/processors/mongodb/GetMongo.java | 63 ++++++++----------- 1 file changed, 26 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index dd338e99d505..507c9ce3b37f 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -264,47 +264,48 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } try (MongoCursor cursor = it.iterator()) { - final List listOfDocuments = new ArrayList<>(); - Document doc; - configureMapper(jsonTypeSetting); - while ((doc = cursor.tryNext()) != null) { - listOfDocuments.add(doc); - } - if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) { - final int sizePerChunk = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger(); - List> chunks = chunkDocumentsList(listOfDocuments, sizePerChunk); + int sizePerBatch = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger(); + List batch = new ArrayList<>(); + + while (cursor.hasNext()) { + batch.add(cursor.next()); + + if (batch.size() == sizePerBatch) { + try { + writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS); + batch = new ArrayList<>(); + } catch (Exception e) { + logger.error("Error building batch due to {}", new Object[] {e}); + } + } + } - for (final List chunk : chunks) { + if (batch.size() > 0) { try { - String payload = buildBatch(chunk, jsonTypeSetting, usePrettyPrint); - writeBatch(payload, input, context, session, attributes, REL_SUCCESS); + writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS); } catch (Exception e) { logger.error("Error building batch due to {}", new Object[] {e}); } } } else { - for (final Document document : listOfDocuments) { - FlowFile outgoingFF = (input == null) ? session.create() : session.create(input); - - outgoingFF = session.write(outgoingFF, out -> { - String json; + FlowFile outgoingFlowFile; + while (cursor.hasNext()) { + outgoingFlowFile = (input == null) ? session.create() : session.create(input); + outgoingFlowFile = session.write(outgoingFlowFile, out -> { if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(document); + out.write(getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()).getBytes(charset)); } else { - json = document.toJson(); + out.write(cursor.next().toJson().getBytes(charset)); } - - out.write(json.getBytes(charset)); }); - outgoingFF = session.putAllAttributes(outgoingFF, attributes); - - session.getProvenanceReporter().receive(outgoingFF, getURI(context)); - session.transfer(outgoingFF, REL_SUCCESS); + outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes); + session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context)); + session.transfer(outgoingFlowFile, REL_SUCCESS); } } @@ -341,16 +342,4 @@ private Document getQuery(ProcessContext context, ProcessSession session, FlowFi return query; } - private List> chunkDocumentsList(List originalList, int perChunkSize) { - final List> chunks = new ArrayList<>(); - final int originalSize = originalList.size(); - - for (int i = 0; i < originalSize; i += perChunkSize) { - chunks.add(new ArrayList<>( - originalList.subList(i, Math.min(originalSize, i + perChunkSize))) - ); - } - - return chunks; - } }