Skip to content

Commit

Permalink
closes #135 BulkOperation return BulkWriteException
Browse files Browse the repository at this point in the history
  • Loading branch information
William Delanoue committed Aug 27, 2015
1 parent 521f88d commit fea1abc
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 49 deletions.
96 changes: 63 additions & 33 deletions src/main/java/com/github/fakemongo/FongoConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import com.mongodb.MongoClient;
import com.mongodb.MongoNamespace;
import com.mongodb.WriteConcern;
import com.mongodb.WriteConcernException;
import com.mongodb.WriteConcernResult;
import com.mongodb.WriteResult;
import com.mongodb.bulk.BulkWriteError;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.bulk.BulkWriteUpsert;
import com.mongodb.bulk.DeleteRequest;
import com.mongodb.bulk.InsertRequest;
import com.mongodb.bulk.UpdateRequest;
import com.mongodb.bulk.WriteConcernError;
import com.mongodb.bulk.WriteRequest;
import static com.mongodb.bulk.WriteRequest.Type.INSERT;
import static com.mongodb.bulk.WriteRequest.Type.REPLACE;
Expand All @@ -46,22 +49,22 @@
import com.mongodb.operation.FongoBsonArrayWrapper;
import com.mongodb.util.JSON;
import java.util.ArrayList;
import static java.util.Arrays.asList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonDocumentReader;
import org.bson.BsonInt64;
import org.bson.BsonNull;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.FieldNameValidator;
import org.bson.codecs.Codec;
import org.bson.codecs.Decoder;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -151,19 +154,7 @@ public WriteConcernResult update(MongoNamespace namespace, boolean ordered, Writ
public WriteConcernResult delete(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern, List<DeleteRequest> deletes) {
LOG.info("delete() namespace:{} deletes:{}", namespace, deletes);
final DBCollection collection = dbCollection(namespace);
int count = 0;
for (DeleteRequest delete : deletes) {
final DBObject parse = dbObject(delete.getFilter());
if (delete.isMulti()) {
final WriteResult writeResult = collection.remove(parse, writeConcern);
count += writeResult.getN();
} else {
final DBObject dbObject = collection.findAndRemove(parse);
if (dbObject != null) {
count++;
}
}
}
int count = delete(collection, writeConcern, deletes);
if (writeConcern.isAcknowledged()) {
return WriteConcernResult.acknowledged(count, count != 0, null);
} else {
Expand All @@ -175,22 +166,56 @@ public WriteConcernResult delete(MongoNamespace namespace, boolean ordered, Writ
public BulkWriteResult insertCommand(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern, List<InsertRequest> inserts) {
LOG.info("insertCommand() namespace:{} inserts:{}", namespace, inserts);
final DBCollection collection = dbCollection(namespace);
final List<BulkWriteUpsert> list = new ArrayList<BulkWriteUpsert>();
int i = 0;
for (InsertRequest insert : inserts) {
final DBObject parse = dbObject(insert.getDocument());
final WriteResult writeResult = collection.insert(parse, writeConcern);
if (writeResult.getUpsertedId() != null) {
list.add(new BulkWriteUpsert(i, new BsonObjectId((ObjectId) writeResult.getUpsertedId())));
BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner(fongo.getServerAddress(), ordered, writeConcern);
IndexMap indexMap = IndexMap.create();
final BulkWriteOperation bulkWriteOperation = collection.initializeOrderedBulkOperation();

try {
for (InsertRequest insert : inserts) {
FieldNameValidator validator = new CollectibleDocumentFieldNameValidator();
for (String updateName : insert.getDocument().keySet()) {
if (!validator.validate(updateName)) {
throw new IllegalArgumentException("Invalid BSON field name " + updateName);
}
}

bulkWriteOperation.insert(dbObject(insert.getDocument()));
indexMap = indexMap.add(1, 0);
}
final com.mongodb.BulkWriteResult bulkWriteResult = bulkWriteOperation.execute(writeConcern);
bulkWriteBatchCombiner.addResult(bulkWriteResult(bulkWriteResult), indexMap);
} catch (WriteConcernException writeException) {
if (writeException.getResponse().get("wtimeout") != null) {
bulkWriteBatchCombiner.addWriteConcernErrorResult(getWriteConcernError(writeException));
} else {
bulkWriteBatchCombiner.addWriteErrorResult(getBulkWriteError(writeException), indexMap);
}
i++;
LOG.debug("insertCommand() namespace:{} insert:{}, parse:{}", namespace, insert.getDocument(), parse.getClass());
}
if (writeConcern.isAcknowledged()) {
return BulkWriteResult.acknowledged(WriteRequest.Type.INSERT, inserts.size(), writeConcern.isAcknowledged() ? inserts.size() : null, list);
} else {
return BulkWriteResult.unacknowledged();
return bulkWriteBatchCombiner.getResult();
}

private static final List<String> IGNORED_KEYS = asList("ok", "err", "code");

BulkWriteError getBulkWriteError(final WriteConcernException writeException) {
return new BulkWriteError(writeException.getErrorCode(), writeException.getErrorMessage(),
translateGetLastErrorResponseToErrInfo(writeException.getResponse()), 0);
}

WriteConcernError getWriteConcernError(final WriteConcernException writeException) {
return new WriteConcernError(writeException.getErrorCode(),
((BsonString) writeException.getResponse().get("err")).getValue(),
translateGetLastErrorResponseToErrInfo(writeException.getResponse()));
}

private BsonDocument translateGetLastErrorResponseToErrInfo(final BsonDocument response) {
BsonDocument errInfo = new BsonDocument();
for (Map.Entry<String, BsonValue> entry : response.entrySet()) {
if (IGNORED_KEYS.contains(entry.getKey())) {
continue;
}
errInfo.put(entry.getKey(), entry.getValue());
}
return errInfo;
}

@Override
Expand Down Expand Up @@ -255,6 +280,15 @@ public BulkWriteResult updateCommand(MongoNamespace namespace, boolean ordered,
public BulkWriteResult deleteCommand(MongoNamespace namespace, boolean ordered, WriteConcern writeConcern, List<DeleteRequest> deletes) {
LOG.info("deleteCommand() namespace:{} deletes:{}", namespace, deletes);
final DBCollection collection = dbCollection(namespace);
int count = delete(collection, writeConcern, deletes);
if (writeConcern.isAcknowledged()) {
return BulkWriteResult.acknowledged(WriteRequest.Type.DELETE, count, writeConcern.isAcknowledged() ? deletes.size() : null, Collections.<BulkWriteUpsert>emptyList());
} else {
return BulkWriteResult.unacknowledged();
}
}

private int delete(DBCollection collection, WriteConcern writeConcern, List<DeleteRequest> deletes) {
int count = 0;
for (DeleteRequest delete : deletes) {
final DBObject parse = dbObject(delete.getFilter());
Expand All @@ -268,11 +302,7 @@ public BulkWriteResult deleteCommand(MongoNamespace namespace, boolean ordered,
}
}
}
if (writeConcern.isAcknowledged()) {
return BulkWriteResult.acknowledged(WriteRequest.Type.DELETE, count, writeConcern.isAcknowledged() ? deletes.size() : null, Collections.<BulkWriteUpsert>emptyList());
} else {
return BulkWriteResult.unacknowledged();
}
return count;
}

@Override
Expand Down
29 changes: 13 additions & 16 deletions src/main/java/com/mongodb/FongoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,21 @@ public CommandResult okErrorResult(int code, String err) {
return new CommandResult(result, fongo.getServerAddress());
}

private BsonDocument bsonResultNotOk(int code, String err) {
final BsonDocument result = new BsonDocument("ok", new BsonDouble(0.0));
if (err != null) {
result.put("err", new BsonString(err));
}
result.put("code", new BsonInt32(code));
return result;
}

public CommandResult notOkErrorResult(String err) {
return notOkErrorResult(err, null);
}

public CommandResult notOkErrorResult(String err, String errmsg) {
final BsonDocument result = new BsonDocument("ok", new BsonDouble(0));
final BsonDocument result = new BsonDocument("ok", new BsonDouble(0.0));
if (err != null) {
result.put("err", new BsonString(err));
}
Expand All @@ -396,29 +405,17 @@ public CommandResult notOkErrorResult(String err, String errmsg) {
}

public CommandResult notOkErrorResult(int code, String err) {
final BsonDocument result = new BsonDocument("ok", new BsonDouble(0.0));
if (err != null) {
result.put("err", new BsonString(err));
}
result.put("code", new BsonInt32(code));
final BsonDocument result = bsonResultNotOk(code, err);
return new CommandResult(result, fongo.getServerAddress());
}

public WriteConcernException writeConcernException(int code, String err) {
final BsonDocument result = new BsonDocument("ok", new BsonDouble(0.0));
if (err != null) {
result.put("err", new BsonString(err));
}
result.put("code", new BsonInt32(code));
final BsonDocument result = bsonResultNotOk(code, err);
return new WriteConcernException(result, fongo.getServerAddress(), WriteConcernResult.unacknowledged());
}

public WriteConcernException duplicateKeyException(int code, String err) {
final BsonDocument result = new BsonDocument("ok", new BsonDouble(0.0));
if (err != null) {
result.put("err", new BsonString(err));
}
result.put("code", new BsonInt32(code));
final BsonDocument result = bsonResultNotOk(code, err);
return new DuplicateKeyException(result, fongo.getServerAddress(), WriteConcernResult.unacknowledged());
}

Expand Down
20 changes: 20 additions & 0 deletions src/test/java/com/github/fakemongo/AbstractFongoV3Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.github.fakemongo;

import com.github.fakemongo.junit.FongoRule;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
Expand Down Expand Up @@ -676,6 +677,25 @@ public void bulkWrite_ordered() {
docId(1).append("x", 2), docId(3).append("x", 4), docId(4), docId(5), docId(6));
}

@Test
public void bulkWrite_duplicatedKey() {
// 2. Ordered bulk operation - order is guaranteed
// Given
MongoCollection collection = newCollection();

// if (serverVersion().equals(Fongo.OLD_SERVER_VERSION)) {
// exception.expect(DuplicateKeyException.class);
// } else {
exception.expect(MongoBulkWriteException.class);
// }
// When
final BulkWriteResult bulkWriteResult = collection.bulkWrite(
Arrays.asList(new InsertOneModel<Document>(new Document("_id", 1)),
new InsertOneModel<Document>(new Document("_id", 2)),
new InsertOneModel<Document>(new Document("_id", 2))
));
}

@Test
public void bulkWrite_unacknowledged() {
MongoCollection collection = newCollection().withWriteConcern(WriteConcern.UNACKNOWLEDGED);
Expand Down

0 comments on commit fea1abc

Please sign in to comment.