From 646a52fe8f1f1fb473eef42488c106ddfd825cf1 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 9 Jun 2018 15:38:02 -0400 Subject: [PATCH 1/3] NIFI-5288 Quietly convert Java arrays to Lists so the MongoDB API can handle them. --- .../nifi-mongodb-processors/pom.xml | 17 +++++++++ .../processors/mongodb/PutMongoRecord.java | 26 +++++++++++++- .../processors/mongodb/PutMongoRecordIT.java | 35 +++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml index 32420be318dc..4d020759aca5 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml @@ -91,5 +91,22 @@ 1.7.0-SNAPSHOT compile + + org.apache.nifi + nifi-avro-record-utils + 1.7.0-SNAPSHOT + test + + + org.apache.nifi + nifi-schema-registry-service-api + test + + + org.apache.nifi + nifi-record-serialization-services + 1.7.0-SNAPSHOT + test + diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java index 63ebda9f1217..13c52e70fe8d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java @@ -131,7 +131,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session for (String name : schema.getFieldNames()) { document.put(name, contentMap.get(name)); } - inserts.add(document); + inserts.add(convertArrays(document)); if (inserts.size() == ceiling) { collection.insertMany(inserts); added += inserts.size(); @@ -154,4 +154,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } session.commit(); } + + private Document convertArrays(Document doc) { + Document retVal = new Document(); + for (Map.Entry entry : doc.entrySet()) { + if (entry.getValue() != null && entry.getValue().getClass().isArray()) { + List items = new ArrayList(); + Object[] values = (Object[])entry.getValue(); + for (int index = 0; index < values.length; index++) { + if (values[index] instanceof Map || values[index] instanceof Document) { + items.add(convertArrays(new Document((Map)values[index]))); + } else { + items.add(values[index]); + } + } + retVal.put(entry.getKey(), items); + } else if (entry.getValue() != null && (entry.getValue() instanceof Map || entry.getValue() instanceof Document)) { + retVal.put(entry.getKey(), convertArrays(new Document((Map)entry.getValue()))); + } else { + retVal.put(entry.getKey(), entry.getValue()); + } + } + + return retVal; + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java index 5332695cdd76..26d5807933a2 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java @@ -17,12 +17,17 @@ package org.apache.nifi.processors.mongodb; +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockSchemaRegistry; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; @@ -43,6 +48,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -191,4 +197,33 @@ public void testInsertNestedRecords() throws Exception { assertEquals(4, collection.count()); //assertEquals(doc, collection.find().first()); } + + @Test + public void testArrayConversion() throws Exception { + TestRunner runner = init(PutMongoRecord.class); + MockSchemaRegistry registry = new MockSchemaRegistry(); + String rawSchema = "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"nom\",\"type\":\"string\"," + + "\"doc\":\"Type inferred from '\\\"HAMEL\\\"'\"},{\"name\":\"prenom\",\"type\":\"string\",\"doc\":" + + "\"Type inferred from '\\\"YVES\\\"'\"},{\"name\":\"tab\",\"type\":{\"type\":\"array\",\"items\":\"string\"}" + + ",\"doc\":\"Type inferred from '[\\\"aa\\\",\\\"bb\\\"]'\"}]}"; + RecordSchema schema = AvroTypeUtil.createSchema(new Schema.Parser().parse(rawSchema)); + registry.addSchema("yves", schema); + JsonTreeReader reader = new JsonTreeReader(); + runner.addControllerService("registry", registry); + runner.addControllerService("reader", reader); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry"); + runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader"); + runner.enableControllerService(registry); + runner.enableControllerService(reader); + runner.assertValid(); + + Map attrs = new HashMap<>(); + attrs.put("schema.name", "yves"); + + runner.enqueue("{\"nom\":\"HAMEL\",\"prenom\":\"YVES\",\"tab\":[\"aa\",\"bb\"]}", attrs); + runner.run(); + + runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0); + runner.assertTransferCount(PutMongoRecord.REL_SUCCESS, 1); + } } From 7f783534a71a12fe83d2d98ffa0b84cf16a828a9 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 9 Jun 2018 20:26:42 -0400 Subject: [PATCH 2/3] NIFI-5288 Made changes based on code review. --- .../nifi/processors/mongodb/PutMongoRecordIT.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java index 26d5807933a2..2a24b323e631 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java @@ -202,12 +202,10 @@ public void testInsertNestedRecords() throws Exception { public void testArrayConversion() throws Exception { TestRunner runner = init(PutMongoRecord.class); MockSchemaRegistry registry = new MockSchemaRegistry(); - String rawSchema = "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"nom\",\"type\":\"string\"," + - "\"doc\":\"Type inferred from '\\\"HAMEL\\\"'\"},{\"name\":\"prenom\",\"type\":\"string\",\"doc\":" + - "\"Type inferred from '\\\"YVES\\\"'\"},{\"name\":\"tab\",\"type\":{\"type\":\"array\",\"items\":\"string\"}" + - ",\"doc\":\"Type inferred from '[\\\"aa\\\",\\\"bb\\\"]'\"}]}"; + String rawSchema = "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}," + + "{\"name\":\"arrayTest\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}"; RecordSchema schema = AvroTypeUtil.createSchema(new Schema.Parser().parse(rawSchema)); - registry.addSchema("yves", schema); + registry.addSchema("test", schema); JsonTreeReader reader = new JsonTreeReader(); runner.addControllerService("registry", registry); runner.addControllerService("reader", reader); @@ -218,9 +216,9 @@ public void testArrayConversion() throws Exception { runner.assertValid(); Map attrs = new HashMap<>(); - attrs.put("schema.name", "yves"); + attrs.put("schema.name", "test"); - runner.enqueue("{\"nom\":\"HAMEL\",\"prenom\":\"YVES\",\"tab\":[\"aa\",\"bb\"]}", attrs); + runner.enqueue("{\"name\":\"John Smith\",\"arrayTest\":[\"a\",\"b\",\"c\"]}", attrs); runner.run(); runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0); From a86659553329d5f38fafacfb755d7ac1c0906271 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 9 Jun 2018 20:55:54 -0400 Subject: [PATCH 3/3] NIFI-5288 Theoretically supports nested arrays. --- .../processors/mongodb/PutMongoRecord.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java index 13c52e70fe8d..37706bc8475b 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java @@ -159,16 +159,7 @@ private Document convertArrays(Document doc) { Document retVal = new Document(); for (Map.Entry entry : doc.entrySet()) { if (entry.getValue() != null && entry.getValue().getClass().isArray()) { - List items = new ArrayList(); - Object[] values = (Object[])entry.getValue(); - for (int index = 0; index < values.length; index++) { - if (values[index] instanceof Map || values[index] instanceof Document) { - items.add(convertArrays(new Document((Map)values[index]))); - } else { - items.add(values[index]); - } - } - retVal.put(entry.getKey(), items); + retVal.put(entry.getKey(), convertArrays((Object[])entry.getValue())); } else if (entry.getValue() != null && (entry.getValue() instanceof Map || entry.getValue() instanceof Document)) { retVal.put(entry.getKey(), convertArrays(new Document((Map)entry.getValue()))); } else { @@ -178,4 +169,19 @@ private Document convertArrays(Document doc) { return retVal; } + + private List convertArrays(Object[] input) { + List retVal = new ArrayList(); + for (Object o : input) { + if (o != null && o.getClass().isArray()) { + retVal.add(convertArrays((Object[])o)); + } else if (o instanceof Map) { + retVal.add(convertArrays(new Document((Map)o))); + } else { + retVal.add(o); + } + } + + return retVal; + } }