New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkWriteOperation.execute() now throws BulkWriteException #334

Merged
merged 1 commit into from Apr 23, 2018
File filter...
Filter file types
Jump to file or symbol
Failed to load files and symbols.
+63 −7
Diff settings

Always

Just for now

@@ -3,6 +3,7 @@
import com.mongodb.AggregationOutput;
import com.mongodb.BasicDBObject;
import com.mongodb.BulkUpdateRequestBuilder;
import com.mongodb.BulkWriteException;
import com.mongodb.BulkWriteOperation;
import com.mongodb.BulkWriteRequestBuilder;
import com.mongodb.DB;
@@ -227,6 +228,16 @@ public BulkWriteResult insertCommand(MongoNamespace namespace, boolean ordered,
indexMap.add(writeError.getIndex(), writeError.getIndex());
bulkWriteBatchCombiner.addWriteErrorResult(getBulkWriteError(writeError.getException(), writeError.getIndex()), indexMap);
}
} catch (BulkWriteException bulkWriteException) {
// we need to pull the information out of the BulkWriteException and package it up in bulkWriteBatchCombiner
// bulkWriteBatchCombiner.getResult() will end up throwing a MongoBulkWriteException (NOT a BulkWriteException!)
// because that's what mongo does??
BulkWriteResult transformed = FongoDBCollection.translateBulkWriteResultToNew(bulkWriteException.getWriteResult());
bulkWriteBatchCombiner.addResult(transformed, indexMap);
for (com.mongodb.bulk.BulkWriteError writeError : FongoDBCollection.translateWriteErrorsToNew(bulkWriteException.getWriteErrors())) {
indexMap.add(writeError.getIndex(), writeError.getIndex());
bulkWriteBatchCombiner.addWriteErrorResult(writeError, indexMap);
}
} catch (WriteConcernException writeException) {
if (writeException.getResponse().get("wtimeout") != null) {
bulkWriteBatchCombiner.addWriteConcernErrorResult(getWriteConcernError(writeException));
@@ -669,7 +680,7 @@ private BulkWriteResult bulkWriteResult(com.mongodb.BulkWriteResult bulkWriteRes
if (!bulkWriteResult.isAcknowledged()) {
return BulkWriteResult.unacknowledged();
}
return BulkWriteResult.acknowledged(bulkWriteResult.getInsertedCount(), bulkWriteResult.getMatchedCount(), bulkWriteResult.getRemovedCount(), bulkWriteResult.getModifiedCount(), FongoDBCollection.translateBulkWriteUpsertsToNew(bulkWriteResult.getUpserts(), null));
return BulkWriteResult.acknowledged(bulkWriteResult.getInsertedCount(), bulkWriteResult.getMatchedCount(), bulkWriteResult.getRemovedCount(), bulkWriteResult.getModifiedCount(), FongoDBCollection.translateBulkWriteUpsertsToNew(bulkWriteResult.getUpserts()));
}

}
@@ -2,10 +2,10 @@

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.bson.BsonDocument;
import static java.util.Collections.*;

public class FongoBulkWriteCombiner {

@@ -70,10 +70,14 @@ public BulkWriteResult getBulkWriteResult(WriteConcern writeConcern) {
return new AcknowledgedBulkWriteResult(insertedCount, matchedCount, removedCount, modifiedCount, new ArrayList<BulkWriteUpsert>(upserts));
}

public void throwOnError() {
public void throwOnError(ServerAddress address) {
if (!errors.isEmpty()) {
List<BulkWriteError> bwErrors = new ArrayList<BulkWriteError>();
for (WriteError e: errors) {
bwErrors.add(new BulkWriteError(e.code, e.message, FongoDBCollection.dbObject(e.details), e.index));
}
BulkWriteResult bulkWriteResult = getBulkWriteResult(writeConcern);
throw new InsertManyWriteConcernException(bulkWriteResult, unmodifiableList(new ArrayList<WriteError>(errors)));
throw new BulkWriteException(bulkWriteResult, bwErrors, null, address);
}
}

@@ -1215,7 +1215,7 @@ BulkWriteResult executeBulkWriteOperation(final boolean ordered, final Boolean b
}
idx++;
}
combiner.throwOnError();
combiner.throwOnError(this.fongoDb.fongo.getServerAddress());
return combiner.getBulkWriteResult(writeConcern);
}

@@ -1559,6 +1559,15 @@ public static BsonDocument bsonDocument(DBObject dbObject) {
return new UnacknowledgedBulkWriteResult();
}
}

public static com.mongodb.bulk.BulkWriteResult translateBulkWriteResultToNew(final com.mongodb.BulkWriteResult bulkWriteResult) {
if (bulkWriteResult.isAcknowledged()) {
List<com.mongodb.bulk.BulkWriteUpsert> upserts = translateBulkWriteUpsertsToNew(bulkWriteResult.getUpserts());
return com.mongodb.bulk.BulkWriteResult.acknowledged(bulkWriteResult.getInsertedCount(), bulkWriteResult.getMatchedCount(), bulkWriteResult.getRemovedCount(), bulkWriteResult.getModifiedCount(), upserts);
} else {
return com.mongodb.bulk.BulkWriteResult.unacknowledged();
}
}

public static List<com.mongodb.BulkWriteUpsert> translateBulkWriteUpserts(final List<com.mongodb.bulk.BulkWriteUpsert> upserts,
final Decoder<DBObject> decoder) {
@@ -1569,8 +1578,7 @@ public static BsonDocument bsonDocument(DBObject dbObject) {
return retVal;
}

public static List<com.mongodb.bulk.BulkWriteUpsert> translateBulkWriteUpsertsToNew(final List<com.mongodb.BulkWriteUpsert> upserts,
final Decoder<BsonValue> decoder) {
public static List<com.mongodb.bulk.BulkWriteUpsert> translateBulkWriteUpsertsToNew(final List<com.mongodb.BulkWriteUpsert> upserts) {
List<com.mongodb.bulk.BulkWriteUpsert> retVal = new ArrayList<com.mongodb.bulk.BulkWriteUpsert>(upserts.size());
for (com.mongodb.BulkWriteUpsert cur : upserts) {
final BsonDocument document = bsonDocument(new BasicDBObject("_id", cur.getId()));
@@ -1601,6 +1609,16 @@ public static WriteConcernError translateWriteConcernError(final com.mongodb.bul
return retVal;
}

public static List<com.mongodb.bulk.BulkWriteError> translateWriteErrorsToNew(final List<BulkWriteError> errors) {
List<com.mongodb.bulk.BulkWriteError> retVal = new ArrayList<com.mongodb.bulk.BulkWriteError>(errors.size());
for (BulkWriteError cur : errors) {
// TODO I _think_ this should use `bsonDocument(cur.getDetails())` for the details instead of `new BsonDocument()` but the unit tests check for `new BsonDocument()`...
com.mongodb.bulk.BulkWriteError e = new com.mongodb.bulk.BulkWriteError(cur.getCode(), cur.getMessage(), new BsonDocument(), cur.getIndex());
retVal.add(e);
}
return retVal;
}

public static List<com.mongodb.bulk.WriteRequest> translateWriteRequestsToNew(final List<com.mongodb.WriteRequest> writeRequests,
final Codec<DBObject> objectCodec) {
List<com.mongodb.bulk.WriteRequest> retVal = new ArrayList<com.mongodb.bulk.WriteRequest>(writeRequests.size());
@@ -9,6 +9,7 @@
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.BulkWriteException;
import com.mongodb.BulkWriteOperation;
import com.mongodb.BulkWriteResult;
import com.mongodb.CommandResult;
@@ -2808,6 +2809,28 @@ public void test_bulk_update() {
assertEquals(new BasicDBObject("_id", 100).append("hi", 2), result);
}

// https://github.com/fakemongo/fongo/issues/328
@Test
public void test_bulk_update_withDupes() {
// Given
DBCollection collection = newCollection();
collection.insert(new BasicDBObject("_id", 100).append("hi", 1));

// When
BulkWriteOperation bulkWriteOperation = collection.initializeUnorderedBulkOperation();
// these are all safe
bulkWriteOperation.insert(new BasicDBObject("_id", 101).append("hi", 1));
bulkWriteOperation.insert(new BasicDBObject("_id", 102).append("hi", 1));
bulkWriteOperation.insert(new BasicDBObject("_id", 103).append("hi", 1));
bulkWriteOperation.insert(new BasicDBObject("_id", 104).append("hi", 1));
// this one should break it
bulkWriteOperation.insert(new BasicDBObject("_id", 100).append("hi", 1));

// Then
exception.expect(BulkWriteException.class);
bulkWriteOperation.execute().isAcknowledged();
}

@Test
public void test_bulk_insert() {
// Given
ProTip! Use n and p to navigate between commits in a pull request.