From bd9bd3d3fcd15e59b0cb3585d6cc7b609ead18a1 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 9 Apr 2018 07:28:40 -0400 Subject: [PATCH 01/14] NIFI-5059 Updated MongoDBLookupService to be able to detect record schemas or take one provided by the user. --- .../nifi/mongodb/MongoDBClientService.java | 1 + .../nifi-mongodb-services/pom.xml | 6 ++ .../mongodb/MongoDBControllerService.java | 11 +++ .../nifi/mongodb/MongoDBLookupService.java | 81 ++++++++++++++++--- .../nifi/mongodb/MongoDBLookupServiceIT.java | 49 +++++++++-- 5 files changed, 129 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java index b0f161811008..597ce0d93dac 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java @@ -31,6 +31,7 @@ default Document convertJson(String query) { void delete(Document query); boolean exists(Document query); Document findOne(Document query); + Document findOne(Document query, Document projection); List findMany(Document query); List findMany(Document query, int limit); List findMany(Document query, Document sort, int limit); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml index c595508d7026..cb0635712572 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml @@ -78,6 +78,12 @@ junit test + + org.apache.nifi + nifi-avro-record-utils + 1.7.0-SNAPSHOT + compile + diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java index 0faed0d5256c..1dd6b080208d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java @@ -79,6 +79,17 @@ public Document findOne(Document query) { return retVal; } + @Override + public Document findOne(Document query, Document projection) { + MongoCursor cursor = projection != null + ? this.col.find(query).projection(projection).limit(1).iterator() + : this.col.find(query).limit(1).iterator(); + Document retVal = cursor.tryNext(); + cursor.close(); + + return retVal; + } + @Override public List findMany(Document query) { return findMany(query, null, -1); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index fba228738a65..8ba97c8bd184 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -17,9 +17,11 @@ package org.apache.nifi.mongodb; +import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.lookup.LookupFailureException; @@ -32,12 +34,14 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.util.StringUtils; import org.bson.Document; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,6 +66,19 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) .build(); + public static final PropertyDescriptor RECORD_SCHEMA = new PropertyDescriptor.Builder() + .name("mongo-lookup-record-schema") + .displayName("Record Schema") + .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " + + "the service will attempt to determine the schema from the results.") + .required(false) + .build(); + public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() + .name("mongo-lookup-projection") + .displayName("Projection") + .description("Specifies a projection for limiting which fields will be returned.") + .required(false) + .build(); private String lookupValueField; @@ -71,6 +88,8 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo lookupDescriptors = new ArrayList<>(); lookupDescriptors.addAll(descriptors); lookupDescriptors.add(LOOKUP_VALUE_FIELD); + lookupDescriptors.add(RECORD_SCHEMA); + lookupDescriptors.add(PROJECTION); } @Override @@ -84,24 +103,15 @@ public Optional lookup(Map coordinates) throws LookupFai } try { - Document result = this.findOne(query); + Document result = projection != null ? this.findOne(query, projection) : this.findOne(query); if(result == null) { return Optional.empty(); } else if (!StringUtils.isEmpty(lookupValueField)) { return Optional.ofNullable(result.get(lookupValueField)); } else { - final List fields = new ArrayList<>(); - - for (String key : result.keySet()) { - if (key.equals("_id")) { - continue; - } - fields.add(new RecordField(key, RecordFieldType.STRING.getDataType())); - } - - final RecordSchema schema = new SimpleRecordSchema(fields); - return Optional.ofNullable(new MapRecord(schema, result)); + RecordSchema toUse = schema != null ? schema : convertSchema(result); + return Optional.ofNullable(new MapRecord(toUse, result)); } } catch (Exception ex) { getLogger().error("Error during lookup {}", new Object[]{ query.toJson() }, ex); @@ -109,10 +119,57 @@ public Optional lookup(Map coordinates) throws LookupFai } } + private RecordSchema convertSchema(Map result) { + List fields = new ArrayList<>(); + for (Map.Entry entry : result.entrySet()) { + + RecordField field; + if (entry.getValue() instanceof Integer) { + field = new RecordField(entry.getKey(), RecordFieldType.INT.getDataType()); + } else if (entry.getValue() instanceof Long) { + field = new RecordField(entry.getKey(), RecordFieldType.LONG.getDataType()); + } else if (entry.getValue() instanceof Boolean) { + field = new RecordField(entry.getKey(), RecordFieldType.BOOLEAN.getDataType()); + } else if (entry.getValue() instanceof Double) { + field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType()); + } else if (entry.getValue() instanceof Date) { + field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType()); + } else if (entry.getValue() instanceof List) { + field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType()); + } else if (entry.getValue() instanceof Map) { + RecordSchema nestedSchema = convertSchema((Map)entry.getValue()); + RecordDataType rdt = new RecordDataType(nestedSchema); + field = new RecordField(entry.getKey(), rdt); + } else { + field = new RecordField(entry.getKey(), RecordFieldType.STRING.getDataType()); + } + fields.add(field); + } + + return new SimpleRecordSchema(fields); + } + + private volatile RecordSchema schema; + private volatile Document projection; + @Override @OnEnabled public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue(); + String configuredSchema = context.getProperty(RECORD_SCHEMA).isSet() + ? context.getProperty(RECORD_SCHEMA).getValue() + : null; + if (!StringUtils.isBlank(configuredSchema)) { + schema = AvroTypeUtil.createSchema(new Schema.Parser().parse(configuredSchema)); + } + + String configuredProjection = context.getProperty(PROJECTION).isSet() + ? context.getProperty(PROJECTION).getValue() + : null; + if (!StringUtils.isBlank(configuredProjection)) { + projection = Document.parse(configuredProjection); + } + super.onEnabled(context); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index dcb3fb1dbe15..f7a005714063 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -19,6 +19,9 @@ import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.bson.Document; @@ -27,8 +30,12 @@ import org.junit.Before; import org.junit.Test; +import java.sql.Timestamp; +import java.util.Arrays; import java.util.Calendar; +import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -51,13 +58,13 @@ public void before() throws Exception { } @After - public void after() throws Exception { + public void after() { service.dropDatabase(); service.onDisable(); } @Test - public void testInit() throws Exception { + public void testInit() { runner.enableControllerService(service); runner.assertValid(service); } @@ -92,9 +99,25 @@ public void testLookupSingle() throws Exception { @Test public void testLookupRecord() throws Exception { runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); + runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }"); runner.enableControllerService(service); - Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); - service.insert(document); + + Date d = new Date(); + Timestamp ts = new Timestamp(new Date().getTime()); + List list = Arrays.asList("a", "b", "c", "d", "e"); + + service.insert(new Document() + .append("uuid", "x-y-z") + .append("dateField", d) + .append("longField", 10000L) + .append("stringField", "Hello, world") + .append("timestampField", ts) + .append("decimalField", Double.MAX_VALUE / 2.0) + .append("subrecordField", new Document() + .append("nestedString", "test") + .append("nestedLong", new Long(1000))) + .append("arrayField", list) + ); Map criteria = new HashMap<>(); criteria.put("uuid", "x-y-z"); @@ -103,8 +126,20 @@ public void testLookupRecord() throws Exception { Assert.assertNotNull("The value was null.", result.get()); Assert.assertTrue("The value was wrong.", result.get() instanceof MapRecord); MapRecord record = (MapRecord)result.get(); - Assert.assertEquals("The value was wrong.", "Hello, world", record.getAsString("message")); - Assert.assertEquals("The value was wrong.", "x-y-z", record.getAsString("uuid")); + RecordSchema subSchema = ((RecordDataType)record.getSchema().getField("subrecordField").get().getDataType()).getChildSchema(); + + Assert.assertEquals("The value was wrong.", "Hello, world", record.getValue("stringField")); + Assert.assertEquals("The value was wrong.", "x-y-z", record.getValue("uuid")); + Assert.assertEquals(new Long(10000), record.getValue("longField")); + Assert.assertEquals((Double.MAX_VALUE / 2.0), record.getValue("decimalField")); + Assert.assertEquals(d, record.getValue("dateField")); + Assert.assertEquals(ts.getTime(), ((Date)record.getValue("timestampField")).getTime()); + + Record subRecord = record.getAsRecord("subrecordField", subSchema); + Assert.assertNotNull(subRecord); + Assert.assertEquals("test", subRecord.getValue("nestedString")); + Assert.assertEquals(new Long(1000), subRecord.getValue("nestedLong")); + Assert.assertEquals(list, record.getValue("arrayField")); Map clean = new HashMap<>(); clean.putAll(criteria); @@ -120,7 +155,7 @@ public void testLookupRecord() throws Exception { } @Test - public void testServiceParameters() throws Exception { + public void testServiceParameters() { runner.enableControllerService(service); Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); service.insert(document); From b716f813a6bf23159a0da5933684f3bbbb5b80fd Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Tue, 24 Apr 2018 14:37:50 -0400 Subject: [PATCH 02/14] NIFI-5059 Changed it to use a schema registry. --- .../nifi-mongodb-services/pom.xml | 5 + .../nifi/mongodb/MongoDBLookupService.java | 91 ++++++++++++++----- .../nifi/mongodb/MongoDBLookupServiceIT.java | 28 ++++++ .../nifi/mongodb/TestSchemaRegistry.java | 46 ++++++++++ 4 files changed, 146 insertions(+), 24 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml index cb0635712572..2f70f8a32b5e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml @@ -84,6 +84,11 @@ 1.7.0-SNAPSHOT compile + + org.apache.nifi + nifi-schema-registry-service-api + compile + diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index 8ba97c8bd184..495beadc43b7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -17,29 +17,34 @@ package org.apache.nifi.mongodb; -import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.lookup.LookupService; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.util.StringUtils; import org.bson.Document; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -59,26 +64,34 @@ public class MongoDBLookupService extends MongoDBControllerService implements LookupService { public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder() - .name("mongo-lookup-value-field") - .displayName("Lookup Value Field") - .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " + - "MongoDB result document minus the _id field will be returned as a record.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .build(); - public static final PropertyDescriptor RECORD_SCHEMA = new PropertyDescriptor.Builder() - .name("mongo-lookup-record-schema") - .displayName("Record Schema") + .name("mongo-lookup-value-field") + .displayName("Lookup Value Field") + .description("The field whose value will be returned when the lookup key(s) match a record. If not specified then the entire " + + "MongoDB result document minus the _id field will be returned as a record.") + .addValidator(Validator.VALID) + .required(false) + .build(); + public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() + .name("mongo-lookup-schema-registry") + .displayName("Schema Registry") .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " + "the service will attempt to determine the schema from the results.") .required(false) + .identifiesControllerService(SchemaRegistry.class) + .build(); + public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("mongo-lookup-record-schema-name") + .displayName("Record Schema Name") + .description("If specified, the value will be used to lookup a schema in the configured schema registry.") + .required(false) + .addValidator(Validator.VALID) .build(); public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() - .name("mongo-lookup-projection") - .displayName("Projection") - .description("Specifies a projection for limiting which fields will be returned.") - .required(false) - .build(); + .name("mongo-lookup-projection") + .displayName("Projection") + .description("Specifies a projection for limiting which fields will be returned.") + .required(false) + .build(); private String lookupValueField; @@ -88,10 +101,31 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo lookupDescriptors = new ArrayList<>(); lookupDescriptors.addAll(descriptors); lookupDescriptors.add(LOOKUP_VALUE_FIELD); - lookupDescriptors.add(RECORD_SCHEMA); + lookupDescriptors.add(SCHEMA_REGISTRY); + lookupDescriptors.add(RECORD_SCHEMA_NAME); lookupDescriptors.add(PROJECTION); } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + List problems = new ArrayList<>(); + + PropertyValue registry = validationContext.getProperty(SCHEMA_REGISTRY); + PropertyValue schemaName = validationContext.getProperty(RECORD_SCHEMA_NAME); + + if (registry.isSet() && !schemaName.isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("If the registry is set, the schema name parameter must be set too.") + .build()); + } else if (!registry.isSet() && schemaName.isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("If the schema name is set, the schema registry parameter must be set too.") + .build()); + } + + return problems; + } + @Override public Optional lookup(Map coordinates) throws LookupFailureException { Map clean = new HashMap<>(); @@ -156,11 +190,20 @@ private RecordSchema convertSchema(Map result) { @OnEnabled public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue(); - String configuredSchema = context.getProperty(RECORD_SCHEMA).isSet() - ? context.getProperty(RECORD_SCHEMA).getValue() - : null; - if (!StringUtils.isBlank(configuredSchema)) { - schema = AvroTypeUtil.createSchema(new Schema.Parser().parse(configuredSchema)); + + SchemaRegistry registry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class); + final String name = context.getProperty(RECORD_SCHEMA_NAME).getValue(); + + if (registry != null) { + try { + SchemaIdentifier identifier = SchemaIdentifier.builder() + .name(name) + .build(); + schema = registry.retrieveSchema(identifier); + } catch (SchemaNotFoundException e) { + getLogger().error(String.format("Could not find schema named %s", name), e); + throw new InitializationException(e); + } } String configuredProjection = context.getProperty(PROJECTION).isSet() diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index f7a005714063..a7946dfd9634 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -18,6 +18,7 @@ package org.apache.nifi.mongodb; import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; @@ -51,6 +52,7 @@ public void before() throws Exception { runner = TestRunners.newTestRunner(TestLookupServiceProcessor.class); service = new MongoDBLookupService(); runner.addControllerService("Client Service", service); + runner.setProperty(TestLookupServiceProcessor.CLIENT_SERVICE, "Client Service"); runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME); runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME); runner.setProperty(service, MongoDBLookupService.URI, "mongodb://localhost:27017"); @@ -96,6 +98,32 @@ public void testLookupSingle() throws Exception { Assert.assertTrue(!result.isPresent()); } + @Test + public void testWithSchemaRegistry() throws Exception { + SchemaRegistry registry = new TestSchemaRegistry(); + runner.addControllerService("registry", registry); + runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); + runner.setProperty(service, MongoDBLookupService.SCHEMA_REGISTRY, "registry"); + runner.setProperty(service, MongoDBLookupService.RECORD_SCHEMA_NAME, "user"); + runner.enableControllerService(registry); + runner.enableControllerService(service); + runner.assertValid(); + + service.insert(new Document() + .append("username", "john.smith") + .append("password", "testing1234") + ); + + Map criteria = new HashMap<>(); + criteria.put("username", "john.smith"); + Optional result = service.lookup(criteria); + Assert.assertNotNull(result.get()); + MapRecord record = (MapRecord)result.get(); + + Assert.assertEquals("john.smith", record.getAsString("username")); + Assert.assertEquals("testing1234", record.getAsString("password")); + } + @Test public void testLookupRecord() throws Exception { runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java new file mode 100644 index 000000000000..f79ec6f85d2e --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mongodb; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + @Override + public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) { + List fields = new ArrayList<>(); + fields.add(new RecordField("username", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("password", RecordFieldType.STRING.getDataType())); + return new SimpleRecordSchema(fields); + } + + @Override + public Set getSuppliedSchemaFields() { + return null; + } +} From 9e048f9eb0734076906c2cc29170bfccce8b0af1 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 19 May 2018 08:35:31 -0400 Subject: [PATCH 03/14] NIFI-5059 Updated MongoDBLookupService to be a SchemaRegistryService. --- .../nifi/mongodb/MongoDBLookupService.java | 124 +++++++----------- .../nifi/mongodb/MongoDBLookupServiceIT.java | 51 +++---- .../nifi/mongodb/TestSchemaRegistry.java | 5 +- 3 files changed, 77 insertions(+), 103 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index 495beadc43b7..8c62ba0ad51e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -21,37 +21,30 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.lookup.LookupService; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.SchemaRegistryService; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.util.StringUtils; import org.bson.Document; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; @Tags({"mongo", "mongodb", "lookup", "record"}) @CapabilityDescription( @@ -61,7 +54,14 @@ "The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " + "then the entire MongoDB result document minus the _id field will be returned as a record." ) -public class MongoDBLookupService extends MongoDBControllerService implements LookupService { +public class MongoDBLookupService extends SchemaRegistryService implements LookupService { + public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder() + .name("mongo-lookup-client-service") + .displayName("Client Service") + .description("A MongoDB controller service to use with this lookup service.") + .required(true) + .identifiesControllerService(MongoDBControllerService.class) + .build(); public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder() .name("mongo-lookup-value-field") @@ -71,21 +71,6 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo .addValidator(Validator.VALID) .required(false) .build(); - public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() - .name("mongo-lookup-schema-registry") - .displayName("Schema Registry") - .description("If specified, this avro schema will be used for all objects loaded from MongoDB using this service. If left blank, " + - "the service will attempt to determine the schema from the results.") - .required(false) - .identifiesControllerService(SchemaRegistry.class) - .build(); - public static final PropertyDescriptor RECORD_SCHEMA_NAME = new PropertyDescriptor.Builder() - .name("mongo-lookup-record-schema-name") - .displayName("Record Schema Name") - .description("If specified, the value will be used to lookup a schema in the configured schema registry.") - .required(false) - .addValidator(Validator.VALID) - .build(); public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() .name("mongo-lookup-projection") .displayName("Projection") @@ -95,41 +80,14 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo private String lookupValueField; - private static final List lookupDescriptors; - - static { - lookupDescriptors = new ArrayList<>(); - lookupDescriptors.addAll(descriptors); - lookupDescriptors.add(LOOKUP_VALUE_FIELD); - lookupDescriptors.add(SCHEMA_REGISTRY); - lookupDescriptors.add(RECORD_SCHEMA_NAME); - lookupDescriptors.add(PROJECTION); - } - - @Override - protected Collection customValidate(final ValidationContext validationContext) { - List problems = new ArrayList<>(); - - PropertyValue registry = validationContext.getProperty(SCHEMA_REGISTRY); - PropertyValue schemaName = validationContext.getProperty(RECORD_SCHEMA_NAME); - - if (registry.isSet() && !schemaName.isSet()) { - problems.add(new ValidationResult.Builder() - .explanation("If the registry is set, the schema name parameter must be set too.") - .build()); - } else if (!registry.isSet() && schemaName.isSet()) { - problems.add(new ValidationResult.Builder() - .explanation("If the schema name is set, the schema registry parameter must be set too.") - .build()); - } - - return problems; - } - @Override public Optional lookup(Map coordinates) throws LookupFailureException { - Map clean = new HashMap<>(); - clean.putAll(coordinates); + Map clean = coordinates.entrySet().stream() + .filter(e -> !schemaNameProperty.equals(String.format("${%s}", e.getKey()))) + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue() + )); Document query = new Document(clean); if (coordinates.size() == 0) { @@ -137,13 +95,15 @@ public Optional lookup(Map coordinates) throws LookupFai } try { - Document result = projection != null ? this.findOne(query, projection) : this.findOne(query); + Document result = projection != null ? controllerService.findOne(query, projection) : controllerService.findOne(query); if(result == null) { return Optional.empty(); } else if (!StringUtils.isEmpty(lookupValueField)) { return Optional.ofNullable(result.get(lookupValueField)); } else { + RecordSchema schema = loadSchema(coordinates); + RecordSchema toUse = schema != null ? schema : convertSchema(result); return Optional.ofNullable(new MapRecord(toUse, result)); } @@ -153,6 +113,19 @@ public Optional lookup(Map coordinates) throws LookupFai } } + private RecordSchema loadSchema(Map coordinates) { + Map variables = coordinates.entrySet().stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue().toString() + )); + try { + return getSchema(variables, null); + } catch (Exception ex) { + return null; + } + } + private RecordSchema convertSchema(Map result) { List fields = new ArrayList<>(); for (Map.Entry entry : result.entrySet()) { @@ -183,28 +156,16 @@ private RecordSchema convertSchema(Map result) { return new SimpleRecordSchema(fields); } - private volatile RecordSchema schema; private volatile Document projection; + private MongoDBControllerService controllerService; + private String schemaNameProperty; - @Override @OnEnabled - public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { + public void onEnabled(final ConfigurationContext context) { this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue(); + this.controllerService = context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBControllerService.class); - SchemaRegistry registry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class); - final String name = context.getProperty(RECORD_SCHEMA_NAME).getValue(); - - if (registry != null) { - try { - SchemaIdentifier identifier = SchemaIdentifier.builder() - .name(name) - .build(); - schema = registry.retrieveSchema(identifier); - } catch (SchemaNotFoundException e) { - getLogger().error(String.format("Could not find schema named %s", name), e); - throw new InitializationException(e); - } - } + this.schemaNameProperty = context.getProperty(SchemaAccessUtils.SCHEMA_NAME).getValue(); String configuredProjection = context.getProperty(PROJECTION).isSet() ? context.getProperty(PROJECTION).getValue() @@ -212,8 +173,6 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE if (!StringUtils.isBlank(configuredProjection)) { projection = Document.parse(configuredProjection); } - - super.onEnabled(context); } @Override @@ -228,6 +187,11 @@ public Set getRequiredKeys() { @Override protected List getSupportedPropertyDescriptors() { - return lookupDescriptors; + List _temp = new ArrayList<>(); + _temp.addAll(super.getSupportedPropertyDescriptors()); + _temp.add(LOOKUP_VALUE_FIELD); + _temp.add(PROJECTION); + + return Collections.unmodifiableList(_temp); } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index a7946dfd9634..b0370ab86d7d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -18,6 +18,7 @@ package org.apache.nifi.mongodb; import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; @@ -46,37 +47,48 @@ public class MongoDBLookupServiceIT { private TestRunner runner; private MongoDBLookupService service; + private MongoDBControllerService controllerService; @Before public void before() throws Exception { runner = TestRunners.newTestRunner(TestLookupServiceProcessor.class); service = new MongoDBLookupService(); + controllerService = new MongoDBControllerService(); runner.addControllerService("Client Service", service); + runner.addControllerService("Client Service 2", controllerService); runner.setProperty(TestLookupServiceProcessor.CLIENT_SERVICE, "Client Service"); - runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME); - runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME); - runner.setProperty(service, MongoDBLookupService.URI, "mongodb://localhost:27017"); + runner.setProperty(controllerService, MongoDBControllerService.DATABASE_NAME, DB_NAME); + runner.setProperty(controllerService, MongoDBControllerService.COLLECTION_NAME, COL_NAME); + runner.setProperty(controllerService, MongoDBControllerService.URI, "mongodb://localhost:27017"); runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message"); + runner.setProperty(service, MongoDBLookupService.CONTROLLER_SERVICE, "Client Service 2"); + SchemaRegistry registry = new TestSchemaRegistry(); + runner.addControllerService("registry", registry); + runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); + runner.setProperty(service, SchemaAccessUtils.SCHEMA_REGISTRY, "registry"); + runner.enableControllerService(registry); + runner.enableControllerService(controllerService); + runner.enableControllerService(service); } @After public void after() { - service.dropDatabase(); - service.onDisable(); + controllerService.dropDatabase(); + controllerService.onDisable(); } @Test public void testInit() { - runner.enableControllerService(service); runner.assertValid(service); } @Test public void testLookupSingle() throws Exception { + runner.disableControllerService(service); runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message"); runner.enableControllerService(service); - Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); - service.insert(document); + Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); + controllerService.insert(document); Map criteria = new HashMap<>(); criteria.put("uuid", "x-y-z"); @@ -87,7 +99,7 @@ public void testLookupSingle() throws Exception { Map clean = new HashMap<>(); clean.putAll(criteria); - service.delete(new Document(clean)); + controllerService.delete(new Document(clean)); try { result = service.lookup(criteria); @@ -100,23 +112,18 @@ public void testLookupSingle() throws Exception { @Test public void testWithSchemaRegistry() throws Exception { - SchemaRegistry registry = new TestSchemaRegistry(); - runner.addControllerService("registry", registry); - runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); - runner.setProperty(service, MongoDBLookupService.SCHEMA_REGISTRY, "registry"); - runner.setProperty(service, MongoDBLookupService.RECORD_SCHEMA_NAME, "user"); - runner.enableControllerService(registry); - runner.enableControllerService(service); runner.assertValid(); - service.insert(new Document() + controllerService.insert(new Document() .append("username", "john.smith") .append("password", "testing1234") ); Map criteria = new HashMap<>(); criteria.put("username", "john.smith"); + criteria.put("schema.name", "user"); Optional result = service.lookup(criteria); + Assert.assertTrue(result.isPresent()); Assert.assertNotNull(result.get()); MapRecord record = (MapRecord)result.get(); @@ -126,6 +133,7 @@ public void testWithSchemaRegistry() throws Exception { @Test public void testLookupRecord() throws Exception { + runner.disableControllerService(service); runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }"); runner.enableControllerService(service); @@ -134,7 +142,7 @@ public void testLookupRecord() throws Exception { Timestamp ts = new Timestamp(new Date().getTime()); List list = Arrays.asList("a", "b", "c", "d", "e"); - service.insert(new Document() + controllerService.insert(new Document() .append("uuid", "x-y-z") .append("dateField", d) .append("longField", 10000L) @@ -171,7 +179,7 @@ public void testLookupRecord() throws Exception { Map clean = new HashMap<>(); clean.putAll(criteria); - service.delete(new Document(clean)); + controllerService.delete(new Document(clean)); try { result = service.lookup(criteria); @@ -184,9 +192,8 @@ public void testLookupRecord() throws Exception { @Test public void testServiceParameters() { - runner.enableControllerService(service); - Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); - service.insert(document); + Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }"); + controllerService.insert(document); Map criteria = new HashMap<>(); criteria.put("uuid", "x-y-z"); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java index f79ec6f85d2e..a7854b71a302 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java @@ -27,6 +27,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -41,6 +42,8 @@ public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) { @Override public Set getSuppliedSchemaFields() { - return null; + return new HashSet() {{ + add(SchemaField.SCHEMA_NAME); + }}; } } From 8f08aa4cb4c9b3ba4102c89fbec036056bc4a55c Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 4 Jun 2018 12:23:17 -0400 Subject: [PATCH 04/14] NIFI-5059 Added two changes from a code review. --- .../java/org/apache/nifi/mongodb/MongoDBLookupService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index 8c62ba0ad51e..e91516e830ef 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -60,7 +60,7 @@ public class MongoDBLookupService extends SchemaRegistryService implements Looku .displayName("Client Service") .description("A MongoDB controller service to use with this lookup service.") .required(true) - .identifiesControllerService(MongoDBControllerService.class) + .identifiesControllerService(MongoDBClientService.class) .build(); public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder() @@ -189,6 +189,7 @@ public Set getRequiredKeys() { protected List getSupportedPropertyDescriptors() { List _temp = new ArrayList<>(); _temp.addAll(super.getSupportedPropertyDescriptors()); + _temp.add(CONTROLLER_SERVICE); _temp.add(LOOKUP_VALUE_FIELD); _temp.add(PROJECTION); From 095040cb9eb89e2e88038052beb60656a4a44137 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 4 Jun 2018 14:53:41 -0400 Subject: [PATCH 05/14] NIFI-5059 Fixed two bad references. --- .../java/org/apache/nifi/mongodb/MongoDBLookupService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index e91516e830ef..9a9317ccbba8 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -157,13 +157,13 @@ private RecordSchema convertSchema(Map result) { } private volatile Document projection; - private MongoDBControllerService controllerService; + private MongoDBClientService controllerService; private String schemaNameProperty; @OnEnabled public void onEnabled(final ConfigurationContext context) { this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue(); - this.controllerService = context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBControllerService.class); + this.controllerService = context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBClientService.class); this.schemaNameProperty = context.getProperty(SchemaAccessUtils.SCHEMA_NAME).getValue(); From 2ea5e100a42b2e07e7eba3055edda75dc1d7394f Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 7 Jun 2018 07:44:49 -0400 Subject: [PATCH 06/14] NIFI-5059 Refactored schema strategy handling. --- .../nifi-avro-record-utils/pom.xml | 6 ++ .../access/InferenceSchemaStrategy.java | 87 +++++++++++++++++++ .../nifi/schema/access/SchemaAccessUtils.java | 4 +- .../JsonInferenceSchemaRegistryService.java | 77 ++++++++++++++++ .../serialization/SchemaRegistryService.java | 16 ++-- .../nifi/serialization/FakeProcessor.groovy | 44 ++++++++++ ...tJsonInferenceSchemaRegistryService.groovy | 59 +++++++++++++ .../access/JsonSchemaAccessStrategy.java | 36 ++++++++ .../nifi/mongodb/MongoDBLookupService.java | 78 ++++++++--------- .../nifi/mongodb/MongoDBLookupServiceIT.java | 4 +- 10 files changed, 360 insertions(+), 51 deletions(-) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml index 864581e6c4f2..83db3da54c50 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml @@ -43,6 +43,12 @@ org.apache.nifi nifi-record + + org.apache.nifi + nifi-mock + 1.7.0-SNAPSHOT + test + diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java new file mode 100644 index 000000000000..6cfd35d24635 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.codehaus.jackson.map.ObjectMapper; +import sun.misc.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Date; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class InferenceSchemaStrategy implements JsonSchemaAccessStrategy { + private final Set schemaFields = EnumSet.noneOf(SchemaField.class); + + @Override + public RecordSchema getSchema(Map variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + byte[] bytes = IOUtils.readFully(contentStream, -1, true); + ObjectMapper mapper = new ObjectMapper(); + + return convertSchema(mapper.readValue(bytes, Map.class)); + } + + protected RecordSchema convertSchema(Map result) { + List fields = new ArrayList<>(); + for (Map.Entry entry : result.entrySet()) { + + RecordField field; + if (entry.getValue() instanceof Integer) { + field = new RecordField(entry.getKey(), RecordFieldType.INT.getDataType()); + } else if (entry.getValue() instanceof Long) { + field = new RecordField(entry.getKey(), RecordFieldType.LONG.getDataType()); + } else if (entry.getValue() instanceof Boolean) { + field = new RecordField(entry.getKey(), RecordFieldType.BOOLEAN.getDataType()); + } else if (entry.getValue() instanceof Double) { + field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType()); + } else if (entry.getValue() instanceof Date) { + field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType()); + } else if (entry.getValue() instanceof List) { + field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType()); + } else if (entry.getValue() instanceof Map) { + RecordSchema nestedSchema = convertSchema((Map)entry.getValue()); + RecordDataType rdt = new RecordDataType(nestedSchema); + field = new RecordField(entry.getKey(), rdt); + } else { + field = new RecordField(entry.getKey(), RecordFieldType.STRING.getDataType()); + } + fields.add(field); + } + + return new SimpleRecordSchema(fields); + } + + @Override + public Set getSuppliedSchemaFields() { + return schemaFields; + } + + @Override + public RecordSchema getSchema(Map variables, Map content, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return convertSchema(content); + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java index 82ea2402c2e5..a56ed76068f4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java @@ -50,7 +50,7 @@ public class SchemaAccessUtils { "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single " + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. " + "This is based on version 3.2.x of the Confluent Schema Registry."); - + public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from JSON"); public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() .name("schema-registry") @@ -176,6 +176,8 @@ public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowabl return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { return new ConfluentSchemaRegistryStrategy(schemaRegistry); + } else if (allowableValue.equalsIgnoreCase(INFER_SCHEMA.getValue())) { + return new InferenceSchemaStrategy(); } return null; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java new file mode 100644 index 000000000000..db33290a53f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.schema.access.JsonSchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.schema.access.SchemaAccessUtils.INFER_SCHEMA; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION; + +public class JsonInferenceSchemaRegistryService extends SchemaRegistryService { + private String schemaAccess; + + @OnEnabled + public void onEnabled(ConfigurationContext context) { + this.storeSchemaAccessStrategy(context); + this.schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(2); + + final AllowableValue[] strategies = new AllowableValue[] { + SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, INFER_SCHEMA + }; + + properties.add(buildStrategyProperty(strategies)); + + properties.add(SCHEMA_REGISTRY); + properties.add(SCHEMA_NAME); + properties.add(SCHEMA_VERSION); + properties.add(SCHEMA_BRANCH_NAME); + properties.add(SCHEMA_TEXT); + + return properties; + } + + public RecordSchema getSchema(Map variables, Map content, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + if (schemaAccess.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue()) || schemaAccess.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { + return getSchema(variables, readSchema); + } else { + return ((JsonSchemaAccessStrategy)schemaAccessStrategy).getSchema(variables, content, readSchema); + } + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java index b2991914e3de..6923f480f0f4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java @@ -58,7 +58,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService { private volatile ConfigurationContext configurationContext; - private volatile SchemaAccessStrategy schemaAccessStrategy; + protected volatile SchemaAccessStrategy schemaAccessStrategy; private static InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); private final List strategyList = Collections.unmodifiableList(Arrays.asList( @@ -68,16 +68,20 @@ protected PropertyDescriptor getSchemaAcessStrategyDescriptor() { return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); } + protected PropertyDescriptor buildStrategyProperty(AllowableValue[] values) { + return new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY) + .allowableValues(values) + .defaultValue(getDefaultSchemaAccessStrategy().getValue()) + .build(); + } + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(2); final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]); - properties.add(new PropertyDescriptor.Builder() - .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY) - .allowableValues(strategies) - .defaultValue(getDefaultSchemaAccessStrategy().getValue()) - .build()); + properties.add(buildStrategyProperty(strategies)); properties.add(SCHEMA_REGISTRY); properties.add(SCHEMA_NAME); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy new file mode 100644 index 000000000000..6cf32b7bf238 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/FakeProcessor.groovy @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization + +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.processor.AbstractProcessor +import org.apache.nifi.processor.ProcessContext +import org.apache.nifi.processor.ProcessSession +import org.apache.nifi.processor.exception.ProcessException + +class FakeProcessor extends AbstractProcessor { + static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Schema Service") + .description("") + .identifiesControllerService(JsonInferenceSchemaRegistryService.class) + .required(true) + .build() + + @Override + void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + return [ + CLIENT_SERVICE + ] + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy new file mode 100644 index 000000000000..c93045285e1d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/groovy/org/apache/nifi/serialization/TestJsonInferenceSchemaRegistryService.groovy @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization + +import org.apache.nifi.schema.access.SchemaAccessUtils +import org.apache.nifi.serialization.record.type.RecordDataType +import org.apache.nifi.util.TestRunners +import org.junit.Assert +import org.junit.Test + +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY + +class TestJsonInferenceSchemaRegistryService { + @Test + void testInfer() { + def runner = TestRunners.newTestRunner(FakeProcessor.class) + def service = new JsonInferenceSchemaRegistryService() + runner.addControllerService("schemaService", service) + runner.setProperty(FakeProcessor.CLIENT_SERVICE, "schemaService") + runner.setProperty(service, service.getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()), SchemaAccessUtils.INFER_SCHEMA) + runner.enableControllerService(service) + runner.assertValid() + + def json = [ + name: "John Smith", + age: 35, + contact: [ + email: "john.smith@example.com", + phone: "123-456-7890" + ] + ] + + def schema = service.getSchema([:], json, null) + + Assert.assertNotNull(schema) + def name = schema.getField("name") + def age = schema.getField("age") + def contact = schema.getField("contact") + Assert.assertTrue(name.isPresent()) + Assert.assertTrue(age.isPresent()) + Assert.assertTrue(contact.isPresent()) + Assert.assertTrue(contact.get().dataType instanceof RecordDataType) + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java new file mode 100644 index 000000000000..30dcfe527ec8 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/JsonSchemaAccessStrategy.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.Map; + +public interface JsonSchemaAccessStrategy extends SchemaAccessStrategy { + /** + * Get a schema using a Map object instead of an input stream. This is meant to be used with JSON toolkits. + * + * @param variables Variables which is used to resolve Record Schema via Expression Language. + * This can be null or empty. + * @param content JSON content in a Map object form. + * @param readSchema The schema that was read from the input content, or null if there was none. + * @return The RecordSchema if found. + */ + RecordSchema getSchema(Map variables, Map content, RecordSchema readSchema) throws SchemaNotFoundException, IOException; +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index 9a9317ccbba8..47c959307ab3 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -17,35 +17,43 @@ package org.apache.nifi.mongodb; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.lookup.LookupService; import org.apache.nifi.schema.access.SchemaAccessUtils; -import org.apache.nifi.serialization.SchemaRegistryService; -import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.util.StringUtils; import org.bson.Document; +import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.nifi.schema.access.SchemaAccessUtils.INFER_SCHEMA; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION; + @Tags({"mongo", "mongodb", "lookup", "record"}) @CapabilityDescription( "Provides a lookup service based around MongoDB. Each key that is specified \n" + @@ -54,7 +62,7 @@ "The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " + "then the entire MongoDB result document minus the _id field will be returned as a record." ) -public class MongoDBLookupService extends SchemaRegistryService implements LookupService { +public class MongoDBLookupService extends JsonInferenceSchemaRegistryService implements LookupService { public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder() .name("mongo-lookup-client-service") .displayName("Client Service") @@ -102,10 +110,9 @@ public Optional lookup(Map coordinates) throws LookupFai } else if (!StringUtils.isEmpty(lookupValueField)) { return Optional.ofNullable(result.get(lookupValueField)); } else { - RecordSchema schema = loadSchema(coordinates); + RecordSchema schema = loadSchema(coordinates, result); - RecordSchema toUse = schema != null ? schema : convertSchema(result); - return Optional.ofNullable(new MapRecord(toUse, result)); + return Optional.ofNullable(new MapRecord(schema, result)); } } catch (Exception ex) { getLogger().error("Error during lookup {}", new Object[]{ query.toJson() }, ex); @@ -113,49 +120,21 @@ public Optional lookup(Map coordinates) throws LookupFai } } - private RecordSchema loadSchema(Map coordinates) { + private RecordSchema loadSchema(Map coordinates, Document doc) { Map variables = coordinates.entrySet().stream() .collect(Collectors.toMap( e -> e.getKey(), e -> e.getValue().toString() )); + ObjectMapper mapper = new ObjectMapper(); try { - return getSchema(variables, null); + byte[] bytes = mapper.writeValueAsBytes(doc); + return getSchema(variables, new ByteArrayInputStream(bytes), null); } catch (Exception ex) { return null; } } - private RecordSchema convertSchema(Map result) { - List fields = new ArrayList<>(); - for (Map.Entry entry : result.entrySet()) { - - RecordField field; - if (entry.getValue() instanceof Integer) { - field = new RecordField(entry.getKey(), RecordFieldType.INT.getDataType()); - } else if (entry.getValue() instanceof Long) { - field = new RecordField(entry.getKey(), RecordFieldType.LONG.getDataType()); - } else if (entry.getValue() instanceof Boolean) { - field = new RecordField(entry.getKey(), RecordFieldType.BOOLEAN.getDataType()); - } else if (entry.getValue() instanceof Double) { - field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType()); - } else if (entry.getValue() instanceof Date) { - field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType()); - } else if (entry.getValue() instanceof List) { - field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType()); - } else if (entry.getValue() instanceof Map) { - RecordSchema nestedSchema = convertSchema((Map)entry.getValue()); - RecordDataType rdt = new RecordDataType(nestedSchema); - field = new RecordField(entry.getKey(), rdt); - } else { - field = new RecordField(entry.getKey(), RecordFieldType.STRING.getDataType()); - } - fields.add(field); - } - - return new SimpleRecordSchema(fields); - } - private volatile Document projection; private MongoDBClientService controllerService; private String schemaNameProperty; @@ -187,8 +166,21 @@ public Set getRequiredKeys() { @Override protected List getSupportedPropertyDescriptors() { + AllowableValue[] strategies = new AllowableValue[] { + SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, INFER_SCHEMA + }; List _temp = new ArrayList<>(); - _temp.addAll(super.getSupportedPropertyDescriptors()); + _temp.add(new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY) + .allowableValues(strategies) + .defaultValue(getDefaultSchemaAccessStrategy().getValue()) + .build()); + + _temp.add(SCHEMA_REGISTRY); + _temp.add(SCHEMA_NAME); + _temp.add(SCHEMA_VERSION); + _temp.add(SCHEMA_BRANCH_NAME); + _temp.add(SCHEMA_TEXT); _temp.add(CONTROLLER_SERVICE); _temp.add(LOOKUP_VALUE_FIELD); _temp.add(PROJECTION); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index b0370ab86d7d..c7a36b8c0bac 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -66,6 +66,8 @@ public void before() throws Exception { runner.addControllerService("registry", registry); runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); runner.setProperty(service, SchemaAccessUtils.SCHEMA_REGISTRY, "registry"); + runner.setProperty(service, service.getPropertyDescriptor(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY.getName()), + SchemaAccessUtils.INFER_SCHEMA); runner.enableControllerService(registry); runner.enableControllerService(controllerService); runner.enableControllerService(service); @@ -174,7 +176,7 @@ public void testLookupRecord() throws Exception { Record subRecord = record.getAsRecord("subrecordField", subSchema); Assert.assertNotNull(subRecord); Assert.assertEquals("test", subRecord.getValue("nestedString")); - Assert.assertEquals(new Long(1000), subRecord.getValue("nestedLong")); + Assert.assertEquals(new Integer(1000), subRecord.getValue("nestedLong")); Assert.assertEquals(list, record.getValue("arrayField")); Map clean = new HashMap<>(); From fda8d19d2a9ad36c6117a6ebf2d13cf8b88af45b Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 7 Jun 2018 09:22:31 -0400 Subject: [PATCH 07/14] NIFI-5059 Moved schema strategy handling to JsonInferenceSchemaRegistryService. --- .../nifi/schema/access/SchemaAccessUtils.java | 2 -- .../JsonInferenceSchemaRegistryService.java | 26 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java index a56ed76068f4..bcde9a686b97 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java @@ -176,8 +176,6 @@ public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowabl return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { return new ConfluentSchemaRegistryStrategy(schemaRegistry); - } else if (allowableValue.equalsIgnoreCase(INFER_SCHEMA.getValue())) { - return new InferenceSchemaStrategy(); } return null; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java index db33290a53f2..97425433f915 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java @@ -20,9 +20,15 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.schema.access.AvroSchemaTextStrategy; +import org.apache.nifi.schema.access.InferenceSchemaStrategy; import org.apache.nifi.schema.access.JsonSchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaNamePropertyStrategy; import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; import java.io.IOException; @@ -48,6 +54,26 @@ public void onEnabled(ConfigurationContext context) { this.schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); } + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { + if (strategy == null) { + return null; + } + + if (strategy.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { + final PropertyValue schemaName = context.getProperty(SCHEMA_NAME); + final PropertyValue schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME); + final PropertyValue schemaVersion = context.getProperty(SCHEMA_VERSION); + return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, schemaBranchName, schemaVersion); + } else if (strategy.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { + return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); + } else if (strategy.equalsIgnoreCase(INFER_SCHEMA.getValue())) { + return new InferenceSchemaStrategy(); + } + + return null; + } + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(2); From 18ad0985d764a54e3eabd2161ec92ebb4375b423 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 14 Jun 2018 07:42:00 -0400 Subject: [PATCH 08/14] NIFI-5059 Updated to use new LookupService method. --- .../nifi/mongodb/MongoDBLookupService.java | 21 ++++++++++++------- .../nifi/mongodb/MongoDBLookupServiceIT.java | 16 ++++++++++++-- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index 47c959307ab3..45313a445a6d 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -38,6 +38,7 @@ import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -90,6 +91,15 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp @Override public Optional lookup(Map coordinates) throws LookupFailureException { + /* + * Unless the user hard-coded schema.name or schema.text into the schema access options, this is going + * to force schema detection. + */ + return lookup(coordinates, new HashMap<>()); + } + + @Override + public Optional lookup(Map coordinates, Map context) throws LookupFailureException { Map clean = coordinates.entrySet().stream() .filter(e -> !schemaNameProperty.equals(String.format("${%s}", e.getKey()))) .collect(Collectors.toMap( @@ -110,7 +120,7 @@ public Optional lookup(Map coordinates) throws LookupFai } else if (!StringUtils.isEmpty(lookupValueField)) { return Optional.ofNullable(result.get(lookupValueField)); } else { - RecordSchema schema = loadSchema(coordinates, result); + RecordSchema schema = loadSchema(context, result); return Optional.ofNullable(new MapRecord(schema, result)); } @@ -120,16 +130,11 @@ public Optional lookup(Map coordinates) throws LookupFai } } - private RecordSchema loadSchema(Map coordinates, Document doc) { - Map variables = coordinates.entrySet().stream() - .collect(Collectors.toMap( - e -> e.getKey(), - e -> e.getValue().toString() - )); + private RecordSchema loadSchema(Map context, Document doc) { ObjectMapper mapper = new ObjectMapper(); try { byte[] bytes = mapper.writeValueAsBytes(doc); - return getSchema(variables, new ByteArrayInputStream(bytes), null); + return getSchema(context, new ByteArrayInputStream(bytes), null); } catch (Exception ex) { return null; } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index c7a36b8c0bac..ee84fe2fb9b8 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -123,14 +123,26 @@ public void testWithSchemaRegistry() throws Exception { Map criteria = new HashMap<>(); criteria.put("username", "john.smith"); - criteria.put("schema.name", "user"); - Optional result = service.lookup(criteria); + Map context = new HashMap<>(); + context.put("schema.name", "user"); + Optional result = service.lookup(criteria, context); Assert.assertTrue(result.isPresent()); Assert.assertNotNull(result.get()); MapRecord record = (MapRecord)result.get(); Assert.assertEquals("john.smith", record.getAsString("username")); Assert.assertEquals("testing1234", record.getAsString("password")); + + /* + * Test falling back on schema detection if a user doesn't specify the context argument + */ + result = service.lookup(criteria); + Assert.assertTrue(result.isPresent()); + Assert.assertNotNull(result.get()); + record = (MapRecord)result.get(); + + Assert.assertEquals("john.smith", record.getAsString("username")); + Assert.assertEquals("testing1234", record.getAsString("password")); } @Test From 01feb7f7b1ad9569a3b3a5cac73929d27b1d1c10 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 14 Jun 2018 07:54:14 -0400 Subject: [PATCH 09/14] NIFI-5059 fixed schema inference bug. --- .../org/apache/nifi/mongodb/MongoDBLookupService.java | 8 +++----- .../org/apache/nifi/mongodb/MongoDBLookupServiceIT.java | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java index 45313a445a6d..6c4905a141c4 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java @@ -17,7 +17,6 @@ package org.apache.nifi.mongodb; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -35,7 +34,6 @@ import org.apache.nifi.util.StringUtils; import org.bson.Document; -import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -131,10 +129,8 @@ public Optional lookup(Map coordinates, Map context, Document doc) { - ObjectMapper mapper = new ObjectMapper(); try { - byte[] bytes = mapper.writeValueAsBytes(doc); - return getSchema(context, new ByteArrayInputStream(bytes), null); + return getSchema(context, doc, null); } catch (Exception ex) { return null; } @@ -157,6 +153,8 @@ public void onEnabled(final ConfigurationContext context) { if (!StringUtils.isBlank(configuredProjection)) { projection = Document.parse(configuredProjection); } + + super.onEnabled(context); } @Override diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index ee84fe2fb9b8..6049649d63e7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -188,7 +188,7 @@ public void testLookupRecord() throws Exception { Record subRecord = record.getAsRecord("subrecordField", subSchema); Assert.assertNotNull(subRecord); Assert.assertEquals("test", subRecord.getValue("nestedString")); - Assert.assertEquals(new Integer(1000), subRecord.getValue("nestedLong")); + Assert.assertEquals(new Long(1000), subRecord.getValue("nestedLong")); Assert.assertEquals(list, record.getValue("arrayField")); Map clean = new HashMap<>(); From e15c4800b6e49603bc9568db6f32cf4f7ea09485 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Fri, 15 Jun 2018 08:20:08 -0400 Subject: [PATCH 10/14] NIFI-5059 Added test for schema text strategy --- .../nifi-mongodb-services/pom.xml | 1 + .../nifi/mongodb/MongoDBLookupServiceIT.java | 25 +++++++++++++++++++ .../src/test/resources/simple.avsc | 7 ++++++ 3 files changed, 33 insertions(+) create mode 100644 nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml index 2f70f8a32b5e..add6a93f693c 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml @@ -99,6 +99,7 @@ apache-rat-plugin + src/test/resources/simple.avsc src/test/resources/test.csv src/test/resources/test.properties src/test/resources/test.xml diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index 6049649d63e7..735a40df52ee 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -17,6 +17,7 @@ package org.apache.nifi.mongodb; +import org.apache.commons.io.IOUtils; import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schemaregistry.services.SchemaRegistry; @@ -145,6 +146,30 @@ record = (MapRecord)result.get(); Assert.assertEquals("testing1234", record.getAsString("password")); } + @Test + public void testSchemaTextStrategy() throws Exception { + byte[] contents = IOUtils.toByteArray(getClass().getResourceAsStream("/simple.avsc")); + + runner.disableControllerService(service); + runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); + runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }"); + runner.setProperty(service, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(service, SchemaAccessUtils.SCHEMA_TEXT, "${schema.text}"); + runner.enableControllerService(service); + runner.assertValid(); + + controllerService.insert(new Document().append("msg", "Testing1234")); + + Map criteria = new HashMap<>(); + criteria.put("msg", "Testing1234"); + Map attrs = new HashMap<>(); + attrs.put("schema.text", new String(contents)); + + Optional results = service.lookup(criteria, attrs); + Assert.assertNotNull(results); + Assert.assertTrue(results.isPresent()); + } + @Test public void testLookupRecord() throws Exception { runner.disableControllerService(service); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc new file mode 100644 index 000000000000..2bea9cbc192f --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/resources/simple.avsc @@ -0,0 +1,7 @@ +{ + "type": "record", + "name": "SimpleRecord", + "fields": [ + { "name": "msg", "type": "string" } + ] +} \ No newline at end of file From 3b0e81113ed18f41390378e371e9a441e494ea70 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Tue, 26 Jun 2018 05:31:11 -0400 Subject: [PATCH 11/14] NIFI-5059 incremented version number to make the build work. --- .../nifi-mongodb-bundle/nifi-mongodb-services/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml index add6a93f693c..4b54073d9566 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/pom.xml @@ -81,7 +81,7 @@ org.apache.nifi nifi-avro-record-utils - 1.7.0-SNAPSHOT + 1.8.0-SNAPSHOT compile From 96525f6be662addd9e673ff9fa8ef0257c5357be Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Tue, 26 Jun 2018 07:31:32 -0400 Subject: [PATCH 12/14] NIFI-5059 fixed a stray 1.7.0 reference. --- .../nifi-record-utils/nifi-avro-record-utils/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml index 83db3da54c50..05e62f40e60e 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml @@ -46,7 +46,7 @@ org.apache.nifi nifi-mock - 1.7.0-SNAPSHOT + 1.8.0-SNAPSHOT test From 0cfa4cc43e359f26e14a57cb6b3a5688b12abe0e Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 30 Jun 2018 08:45:58 -0400 Subject: [PATCH 13/14] NIFI-5059 Added getDatabase to client service. --- .../java/org/apache/nifi/mongodb/MongoDBClientService.java | 3 +++ .../org/apache/nifi/mongodb/MongoDBControllerService.java | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java index 597ce0d93dac..5a3a4b270b52 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java @@ -17,6 +17,7 @@ package org.apache.nifi.mongodb; +import com.mongodb.client.MongoDatabase; import org.apache.nifi.controller.ControllerService; import org.bson.Document; @@ -43,4 +44,6 @@ default Document convertJson(String query) { void upsert(Document query, Document update); void dropDatabase(); void dropCollection(); + + MongoDatabase getDatabase(String name); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java index 1dd6b080208d..a1f9b2b75919 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java @@ -164,4 +164,9 @@ public void dropCollection() { this.col.drop(); this.col = null; } + + @Override + public MongoDatabase getDatabase(String name) { + return mongoClient.getDatabase(name); + } } From 05109778c76d1274c1846fa9fb2f3e43ae84243d Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 2 Jul 2018 15:47:36 -0400 Subject: [PATCH 14/14] NIFI-5059 Added changes requested in a code review. --- .../org/apache/nifi/schema/access/SchemaAccessUtils.java | 2 +- .../serialization/JsonInferenceSchemaRegistryService.java | 5 +++++ .../java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java | 5 ++--- .../{TestSchemaRegistry.java => StubSchemaRegistry.java} | 2 +- 4 files changed, 9 insertions(+), 5 deletions(-) rename nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/{TestSchemaRegistry.java => StubSchemaRegistry.java} (96%) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java index bcde9a686b97..7921dff11f8d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java @@ -50,7 +50,7 @@ public class SchemaAccessUtils { "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single " + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. " + "This is based on version 3.2.x of the Confluent Schema Registry."); - public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from JSON"); + public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from Result"); public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() .name("schema-registry") diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java index 97425433f915..b3819cfa9211 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/JsonInferenceSchemaRegistryService.java @@ -54,6 +54,11 @@ public void onEnabled(ConfigurationContext context) { this.schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); } + @Override + protected AllowableValue getDefaultSchemaAccessStrategy() { + return INFER_SCHEMA; + } + @Override protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { if (strategy == null) { diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java index 735a40df52ee..2c7f52220dd7 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java @@ -63,12 +63,10 @@ public void before() throws Exception { runner.setProperty(controllerService, MongoDBControllerService.URI, "mongodb://localhost:27017"); runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message"); runner.setProperty(service, MongoDBLookupService.CONTROLLER_SERVICE, "Client Service 2"); - SchemaRegistry registry = new TestSchemaRegistry(); + SchemaRegistry registry = new StubSchemaRegistry(); runner.addControllerService("registry", registry); runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, ""); runner.setProperty(service, SchemaAccessUtils.SCHEMA_REGISTRY, "registry"); - runner.setProperty(service, service.getPropertyDescriptor(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY.getName()), - SchemaAccessUtils.INFER_SCHEMA); runner.enableControllerService(registry); runner.enableControllerService(controllerService); runner.enableControllerService(service); @@ -83,6 +81,7 @@ public void after() { @Test public void testInit() { runner.assertValid(service); + } @Test diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java similarity index 96% rename from nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java rename to nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java index a7854b71a302..5e1725801fd5 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/TestSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/StubSchemaRegistry.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Set; -public class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry { +public class StubSchemaRegistry extends AbstractControllerService implements SchemaRegistry { @Override public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) { List fields = new ArrayList<>();