Skip to content

Commit

Permalink
NIFI-4759 Fixed a bug that left a hard-coded reference to _id in as t…
Browse files Browse the repository at this point in the history
…he update key for MongoDB upserts.
  • Loading branch information
MikeThomsen committed Jan 12, 2018
1 parent 6153fb6 commit 6b269dc
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.bson.Document;
import org.bson.types.ObjectId;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -155,12 +153,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
try {
// Read the contents of the FlowFile into a byte array
final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));

// parse
final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
Expand All @@ -173,13 +166,19 @@ public void process(final InputStream in) throws IOException {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue();
final Document query = new Document(updateKey, ((Map)doc).get(updateKey));

Object keyVal = ((Map)doc).get(updateKey);
if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) {
keyVal = new ObjectId((String) keyVal);
}

final Document query = new Document(updateKey, keyVal);

if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove("_id");
update.remove(updateKey);
collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
}
logger.info("updated {} into MongoDB", new Object[] { flowFile });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -256,4 +257,74 @@ public void testUpsertWithOperators() throws Exception {
Assert.assertEquals("Msg had wrong value", msg, "Hi");
}
}

/*
* Start NIFI-4759 Regression Tests
*
* 2 issues with ID field:
*
* * Assumed _id is the update key, causing failures when the user configured a different one in the UI.
* * Treated _id as a string even when it is an ObjectID sent from another processor as a string value.
*
* Expected behavior:
*
* * update key field should work no matter what (legal) value it is set to be.
* * _ids that are ObjectID should become real ObjectIDs when added to Mongo.
* * _ids that are arbitrary strings should be still go in as strings.
*
*/
@Test
public void testNiFi_4759_Regressions() {
String[] upserts = new String[]{
"{\n" +
"\t\"_id\": \"12345\",\n" +
"\t\"$set\": {\n" +
"\t\t\"msg\": \"Hello, world\"\n" +
"\t}\n" +
"}",

"{\n" +
"\t\"_id\": \"5a5617b9c1f5de6d8276e87d\",\n" +
"\t\"$set\": {\n" +
"\t\t\"msg\": \"Hello, world\"\n" +
"\t}\n" +
"}",

"{\n" +
"\t\"updateKey\": \"12345\",\n" +
"\t\"$set\": {\n" +
"\t\t\"msg\": \"Hello, world\"\n" +
"\t}\n" +
"}",
};

String[] updateKeyProps = new String[] { "_id", "_id", "updateKey" };
Object[] updateKeys = new Object[] { "12345", new ObjectId("5a5617b9c1f5de6d8276e87d"), "12345" };
int index = 0;

runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");

for (String upsert : upserts) {
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, updateKeyProps[index]);
for (int x = 0; x < 5; x++) {
runner.enqueue(upsert);
}
runner.run(5, true, true);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 5);

Document query = new Document(updateKeyProps[index], updateKeys[index]);
Document result = collection.find(query).first();
Assert.assertNotNull("Result was null", result);
Assert.assertEquals("Count was wrong", 1, collection.count(query));
runner.clearTransferState();
index++;
}
}

/*
* End NIFI-4759 Regression Tests
*/
}

0 comments on commit 6b269dc

Please sign in to comment.