diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index df918d906c03..cf9ad81c21d1 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -228,7 +228,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { protected MongoDBClientService clientService; @OnScheduled - public final void createClient(ProcessContext context) throws IOException { + public void createClient(ProcessContext context) throws IOException { if (context.getProperty(CLIENT_SERVICE).isSet()) { clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); return; diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index b3b82b0e8143..ca131d308394 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -20,9 +20,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -36,14 +39,18 @@ import org.apache.nifi.context.PropertyContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.mongodb.MongoDBClientService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; import org.bson.Document; import org.bson.json.JsonWriterSettings; +import javax.net.ssl.SSLContext; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; @@ -85,6 +92,22 @@ public class GetMongo extends AbstractMongoQueryProcessor { .allowableValues(YES_PP, NO_PP) .addValidator(Validator.VALID) .build(); + static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder() + .name("User Name") + .displayName("username") + .description("User for mongodb authentication") + .required(false) + .addValidator(Validator.VALID) + .sensitive(false) + .build(); + + static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The Password for the user") + .required(false) + .addValidator(Validator.VALID) + .sensitive(true) + .build(); private final static Set relationships; private final static List propertyDescriptors; @@ -107,6 +130,8 @@ public class GetMongo extends AbstractMongoQueryProcessor { _propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(CLIENT_AUTH); _propertyDescriptors.add(SEND_EMPTY_RESULTS); + _propertyDescriptors.add(USER_NAME); + _propertyDescriptors.add(PASSWORD); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); final Set _relationships = new HashSet<>(); @@ -253,7 +278,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session }); outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes); - session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context)); + String uriPass = ""; + if (context.getProperty(USER_NAME).getValue() != null) { + uriPass = "mongodb://" + context.getProperty(USER_NAME).getValue() + ":" + context.getProperty(PASSWORD).getValue() + "@" + getURI(context).substring(10); + } else { + uriPass = getURI(context); + } + session.getProvenanceReporter().receive(outgoingFlowFile, uriPass); session.transfer(outgoingFlowFile, REL_SUCCESS); sent++; } @@ -271,4 +302,59 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } + + @OnScheduled + public void createClient(ProcessContext context) throws IOException { + if (context.getProperty(CLIENT_SERVICE).isSet()) { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); + return; + } + + if (mongoClient != null) { + closeClient(); + } + + getLogger().info("Creating MongoClient"); + + // Set up the client for secure (SSL/TLS communications) if configured to do so + final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue(); + final SSLContext sslContext; + + if (sslService != null) { + final SSLContextService.ClientAuth clientAuth; + if (StringUtils.isBlank(rawClientAuth)) { + clientAuth = SSLContextService.ClientAuth.REQUIRED; + } else { + try { + clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth); + } catch (final IllegalArgumentException iae) { + throw new IllegalStateException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", + rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", "))); + } + } + sslContext = sslService.createSSLContext(clientAuth); + } else { + sslContext = null; + } + + try { + String uriPass = ""; + if (sslContext == null) { + if (context.getProperty(USER_NAME).getValue() != null) { + uriPass = "mongodb://" + context.getProperty(USER_NAME).getValue() + ":" + context.getProperty(PASSWORD).getValue() + "@" + getURI(context).substring(10); + + } else { + uriPass = getURI(context); + } + mongoClient = new MongoClient(new MongoClientURI(uriPass)); + } else { + mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext))); + } + } catch (Exception e) { + getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e); + throw e; + } + } + } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java index ddea9a8596a0..a86907405091 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; @@ -43,6 +44,8 @@ import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Collection; +import java.util.ArrayList; + import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -668,4 +671,49 @@ public void testSendEmpty() throws Exception { MockFlowFile flowFile = flowFiles.get(0); Assert.assertEquals(0, flowFile.getSize()); } + + @Test + public void testReadUserPaswd() throws Exception { + final String username = "myuser"; + final String password = "password"; + mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); + final MongoDatabase db = mongoClient.getDatabase(DB_NAME); + final BasicDBObject createUserCommand = new BasicDBObject("createUser", username).append("pwd", password).append("roles", + java.util.Collections.singletonList(new BasicDBObject("role", "dbOwner").append("db", DB_NAME))); + + BasicDBObject getUsersInfoCommand = new BasicDBObject("usersInfo", new BasicDBObject("user", username).append("db", DB_NAME)); + Document result = db.runCommand(getUsersInfoCommand); + BasicDBObject dropUserCommand = new BasicDBObject("dropUser", username); + + ArrayList users = (ArrayList) result.get("users"); + if (!users.isEmpty()) { + db.runCommand(dropUserCommand); + } + db.runCommand(createUserCommand); + + //setting new property + runner.removeProperty(AbstractMongoProcessor.URI); + runner.setVariable("uri", "mongodb://localhost:27017/?authSource=" + DB_NAME); + runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + runner.setProperty(GetMongo.PASSWORD, password); + runner.setProperty(GetMongo.USER_NAME, username); + + runner.setVariable("query", "{\"_id\": \"doc_2\"}"); + runner.setProperty(GetMongo.QUERY, "${query}"); + runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); + runner.run(); + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + byte[] raw = runner.getContentAsByteArray(flowFiles.get(0)); + ObjectMapper mapper = new ObjectMapper(); + Map parsed = mapper.readValue(raw, Map.class); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); + + // Drop the user which was created + db.runCommand(dropUserCommand); + + Assert.assertTrue(parsed.get("date_field").getClass() == String.class); + Assert.assertTrue(((String) parsed.get("date_field")).startsWith(format.format(CAL.getTime()))); + } }