Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
protected MongoDBClientService clientService;

@OnScheduled
public final void createClient(ProcessContext context) throws IOException {
public void createClient(ProcessContext context) throws IOException {
if (context.getProperty(CLIENT_SERVICE).isSet()) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
Expand All @@ -36,14 +39,18 @@
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document;
import org.bson.json.JsonWriterSettings;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand Down Expand Up @@ -85,6 +92,22 @@ public class GetMongo extends AbstractMongoQueryProcessor {
.allowableValues(YES_PP, NO_PP)
.addValidator(Validator.VALID)
.build();
static final PropertyDescriptor USER_NAME = new PropertyDescriptor.Builder()
.name("User Name")
.displayName("username")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name and displayName values are switched.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing it out. I will add it

.description("User for mongodb authentication")
.required(false)
.addValidator(Validator.VALID)
.sensitive(false)
.build();

static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name value should match the formatting of the username field and a displayName should also be present.

.description("The Password for the user")
.required(false)
.addValidator(Validator.VALID)
.sensitive(true)
.build();

private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
Expand All @@ -107,6 +130,8 @@ public class GetMongo extends AbstractMongoQueryProcessor {
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
_propertyDescriptors.add(SEND_EMPTY_RESULTS);
_propertyDescriptors.add(USER_NAME);
_propertyDescriptors.add(PASSWORD);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);

final Set<Relationship> _relationships = new HashSet<>();
Expand Down Expand Up @@ -253,7 +278,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
});

outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
String uriPass = "";
if (context.getProperty(USER_NAME).getValue() != null) {
uriPass = "mongodb://" + context.getProperty(USER_NAME).getValue() + ":" + context.getProperty(PASSWORD).getValue() + "@" + getURI(context).substring(10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe the URI is validated anywhere to ensure it starts with mongodb://, so arbitrarily starting the index at 10 isn't guaranteed to produce the expected outcome.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there is a note in the MongoDB docs regarding usernames and passwords that contain special characters:

If the username or password includes the at sign @, colon :, slash /, or the percent sign % character, use percent encoding.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add in the validation for the starting 10 characters for the URI.

I did see the MongoDB doc, for percent encoding which would be better:
a) forcing the user to handle the percent encoding and printing it in the description as a note for user.
b) explicitly handling of it by the processor.

current implementation is goes the first way. But if you think It is better for processor to handle, I will look into it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not something I would expect the user to do manually. It should be done automatically by the processor.

} else {
uriPass = getURI(context);
}
session.getProvenanceReporter().receive(outgoingFlowFile, uriPass);
session.transfer(outgoingFlowFile, REL_SUCCESS);
sent++;
}
Expand All @@ -271,4 +302,59 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

}

@OnScheduled
public void createClient(ProcessContext context) throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this method appears to duplicate AbstractMongoProcessor#createClient(). I suggest the duplicated code should be refactored.

Copy link
Copy Markdown
Contributor Author

@karthik-kadajji karthik-kadajji Apr 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true I could have done the changes in AbstractMongoProcessor, but it would end up affecting the components of PutMongo. Also Since request was for just getMongo I did it this way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all 6 of the MongoDB processors need this change if one does, so it should be made in a consistent manner. It appears the Jira is incomplete as it only indicates the GetMongo processor, but that's what this review process is intended to mitigate. Requirements gathering is notoriously difficult and a single user's request has to be balanced against the ongoing needs of the project as a whole. Any code changes introduced need to be well-understood, complete, tested, sustainable, and maintainable. I use the analogy from camping of "leave it better than you found it."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alopresto We started moving toward using a controller service to manage the client connection I think ~3 releases ago. The best place for these changes might be there, so we can start moving toward deprecating per-processor connection management altogether.

if (context.getProperty(CLIENT_SERVICE).isSet()) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
return;
}

if (mongoClient != null) {
closeClient();
}

getLogger().info("Creating MongoClient");

// Set up the client for secure (SSL/TLS communications) if configured to do so
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
final SSLContext sslContext;

if (sslService != null) {
final SSLContextService.ClientAuth clientAuth;
if (StringUtils.isBlank(rawClientAuth)) {
clientAuth = SSLContextService.ClientAuth.REQUIRED;
} else {
try {
clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
} catch (final IllegalArgumentException iae) {
throw new IllegalStateException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
}
}
sslContext = sslService.createSSLContext(clientAuth);
} else {
sslContext = null;
}

try {
String uriPass = "";
if (sslContext == null) {
if (context.getProperty(USER_NAME).getValue() != null) {
uriPass = "mongodb://" + context.getProperty(USER_NAME).getValue() + ":" + context.getProperty(PASSWORD).getValue() + "@" + getURI(context).substring(10);

} else {
uriPass = getURI(context);
}
mongoClient = new MongoClient(new MongoClientURI(uriPass));
} else {
mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
}
} catch (Exception e) {
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
throw e;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
Expand All @@ -43,6 +44,8 @@
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collection;
import java.util.ArrayList;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -668,4 +671,49 @@ public void testSendEmpty() throws Exception {
MockFlowFile flowFile = flowFiles.get(0);
Assert.assertEquals(0, flowFile.getSize());
}

@Test
public void testReadUserPaswd() throws Exception {
final String username = "myuser";
final String password = "password";
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
final MongoDatabase db = mongoClient.getDatabase(DB_NAME);
final BasicDBObject createUserCommand = new BasicDBObject("createUser", username).append("pwd", password).append("roles",
java.util.Collections.singletonList(new BasicDBObject("role", "dbOwner").append("db", DB_NAME)));

BasicDBObject getUsersInfoCommand = new BasicDBObject("usersInfo", new BasicDBObject("user", username).append("db", DB_NAME));
Document result = db.runCommand(getUsersInfoCommand);
BasicDBObject dropUserCommand = new BasicDBObject("dropUser", username);

ArrayList users = (ArrayList) result.get("users");
if (!users.isEmpty()) {
db.runCommand(dropUserCommand);
}
db.runCommand(createUserCommand);

//setting new property
runner.removeProperty(AbstractMongoProcessor.URI);
runner.setVariable("uri", "mongodb://localhost:27017/?authSource=" + DB_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(GetMongo.PASSWORD, password);
runner.setProperty(GetMongo.USER_NAME, username);

runner.setVariable("query", "{\"_id\": \"doc_2\"}");
runner.setProperty(GetMongo.QUERY, "${query}");
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);

List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> parsed = mapper.readValue(raw, Map.class);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

// Drop the user which was created
db.runCommand(dropUserCommand);

Assert.assertTrue(parsed.get("date_field").getClass() == String.class);
Assert.assertTrue(((String) parsed.get("date_field")).startsWith(format.format(CAL.getTime())));
}
}