diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java index 5ad786867..fd5e9ba26 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java @@ -69,24 +69,21 @@ ApiFuture wrapResult(int writeIndex) { *

The writes in the batch are not applied atomically and can be applied out of order. */ ApiFuture bulkCommit() { + + // Follows same thread safety logic as `UpdateBuilder::commit`. + committed = true; + BatchWriteRequest request = buildBatchWriteRequest(); + Tracing.getTracer() .getCurrentSpan() .addAnnotation( TraceUtil.SPAN_NAME_BATCHWRITE, - ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size()))); - - final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder(); - request.setDatabase(firestore.getDatabaseName()); - - for (WriteOperation writeOperation : getWrites()) { - request.addWrites(writeOperation.write); - } - - committed = true; + ImmutableMap.of( + "numDocuments", AttributeValue.longAttributeValue(request.getWritesCount()))); ApiFuture response = processExceptions( - firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable())); + firestore.sendRequest(request, firestore.getClient().batchWriteCallable())); return ApiFutures.transformAsync( response, @@ -117,6 +114,13 @@ ApiFuture bulkCommit() { executor); } + private BatchWriteRequest buildBatchWriteRequest() { + BatchWriteRequest.Builder builder = BatchWriteRequest.newBuilder(); + builder.setDatabase(firestore.getDatabaseName()); + forEachWrite(builder::addWrites); + return builder.build(); + } + /** Maps an RPC failure to each individual write's result. */ private ApiFuture processExceptions(ApiFuture response) { return ApiFutures.catching( diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java index a9326a8fc..6ed342b20 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java @@ -30,15 +30,20 @@ import com.google.firestore.v1.CommitResponse; import com.google.firestore.v1.Write; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Consumer; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -48,11 +53,11 @@ */ @InternalExtensionOnly public abstract class UpdateBuilder { - static class WriteOperation { - Write.Builder write; - DocumentReference documentReference; + static final class WriteOperation { + final Write write; + final DocumentReference documentReference; - WriteOperation(DocumentReference documentReference, Write.Builder write) { + WriteOperation(DocumentReference documentReference, Write write) { this.documentReference = documentReference; this.write = write; } @@ -65,13 +70,11 @@ public String toString() { final FirestoreImpl firestore; + // All reads and writes on `writes` must be done in a block that is synchronized on `writes`; + // otherwise, you get undefined behavior. private final List writes = new ArrayList<>(); - protected boolean committed; - - boolean isCommitted() { - return committed; - } + protected volatile boolean committed; UpdateBuilder(FirestoreImpl firestore) { this.firestore = firestore; @@ -141,7 +144,6 @@ public T create( private T performCreate( @Nonnull DocumentReference documentReference, @Nonnull Map fields) { - verifyNotCommitted(); Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_CREATEDOCUMENT); DocumentSnapshot documentSnapshot = DocumentSnapshot.fromObject( @@ -157,17 +159,7 @@ private T performCreate( write.addAllUpdateTransforms(documentTransform.toPb()); } - writes.add(new WriteOperation(documentReference, write)); - - return wrapResult(writes.size() - 1); - } - - private void verifyNotCommitted() { - Preconditions.checkState( - !isCommitted(), - String.format( - "Cannot modify a %s that has already been committed.", - this.getClass().getSimpleName())); + return addWrite(documentReference, write); } /** @@ -258,7 +250,6 @@ private T performSet( @Nonnull DocumentReference documentReference, @Nonnull Map fields, @Nonnull SetOptions options) { - verifyNotCommitted(); Map documentData; if (options.getFieldMask() != null) { @@ -293,15 +284,28 @@ private T performSet( write.setUpdateMask(documentMask.toPb()); } - writes.add(new WriteOperation(documentReference, write)); + return addWrite(documentReference, write); + } - return wrapResult(writes.size() - 1); + private T addWrite(DocumentReference documentReference, Write.Builder write) { + WriteOperation operation = new WriteOperation(documentReference, write.build()); + int writeIndex; + synchronized (writes) { + Preconditions.checkState( + !committed, + String.format( + "Cannot modify a %s that has already been committed.", + this.getClass().getSimpleName())); + writes.add(operation); + writeIndex = writes.size() - 1; + } + return wrapResult(writeIndex); } /** Removes all values in 'fields' that are not specified in 'fieldMask'. */ - private Map applyFieldMask( + private static Map applyFieldMask( Map fields, List fieldMask) { - List remainingFields = new ArrayList<>(fieldMask); + Set remainingFields = new HashSet<>(fieldMask); Map filteredData = applyFieldMask(fields, remainingFields, FieldPath.empty()); @@ -309,7 +313,7 @@ private Map applyFieldMask( throw new IllegalArgumentException( String.format( "Field masks contains invalid path. No data exist at field '%s'.", - remainingFields.get(0))); + remainingFields.iterator().next())); } return filteredData; @@ -319,8 +323,8 @@ private Map applyFieldMask( * Strips all values in 'fields' that are not specified in 'fieldMask'. Modifies 'fieldMask' * inline and removes all matched fields. */ - private Map applyFieldMask( - Map fields, List fieldMask, FieldPath root) { + private static Map applyFieldMask( + Map fields, Set fieldMask, FieldPath root) { Map filteredMap = new HashMap<>(); for (Entry entry : fields.entrySet()) { @@ -340,7 +344,7 @@ private Map applyFieldMask( return filteredMap; } - private Map convertToFieldPaths( + private static Map convertToFieldPaths( @Nonnull Map fields, boolean splitOnDots) { Map fieldPaths = new HashMap<>(); @@ -532,7 +536,6 @@ private T performUpdate( @Nonnull DocumentReference documentReference, @Nonnull final Map fields, @Nonnull Precondition precondition) { - verifyNotCommitted(); Preconditions.checkArgument(!fields.isEmpty(), "Data for update() cannot be empty."); Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_UPDATEDOCUMENT); Map deconstructedMap = expandObject(fields); @@ -567,9 +570,8 @@ public boolean allowTransform() { if (!documentTransform.isEmpty()) { write.addAllUpdateTransforms(documentTransform.toPb()); } - writes.add(new WriteOperation(documentReference, write)); - return wrapResult(writes.size() - 1); + return addWrite(documentReference, write); } /** @@ -598,76 +600,98 @@ public T delete(@Nonnull DocumentReference documentReference) { private T performDelete( @Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) { - verifyNotCommitted(); Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_DELETEDOCUMENT); Write.Builder write = Write.newBuilder().setDelete(documentReference.getName()); if (!precondition.isEmpty()) { write.setCurrentDocument(precondition.toPb()); } - writes.add(new WriteOperation(documentReference, write)); - return wrapResult(writes.size() - 1); + return addWrite(documentReference, write); } /** Commit the current batch. */ ApiFuture> commit(@Nullable ByteString transactionId) { + + // Sequence is thread safe. + // + // 1. Set committed = true + // 2. Build commit request + // + // Step 1 sets uses volatile property to ensure committed is visible to all + // threads immediately. + // + // Step 2 uses `forEach(..)` that is synchronized, therefore will be blocked + // until any writes are complete. + // + // Writes will verify `committed==false` within synchronized block of code + // before appending writes. Since committed is set to true before accessing + // writes, we are ensured that no more writes will be appended after commit + // accesses writes. + committed = true; + CommitRequest request = buildCommitRequest(transactionId); + Tracing.getTracer() .getCurrentSpan() .addAnnotation( TraceUtil.SPAN_NAME_COMMIT, - ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size()))); - - final CommitRequest.Builder request = CommitRequest.newBuilder(); - request.setDatabase(firestore.getDatabaseName()); - - for (WriteOperation writeOperation : writes) { - request.addWrites(writeOperation.write); - } - - if (transactionId != null) { - request.setTransaction(transactionId); - } - - committed = true; + ImmutableMap.of( + "numDocuments", AttributeValue.longAttributeValue(request.getWritesCount()))); ApiFuture response = - firestore.sendRequest(request.build(), firestore.getClient().commitCallable()); + firestore.sendRequest(request, firestore.getClient().commitCallable()); return ApiFutures.transform( response, commitResponse -> { - List writeResults = - commitResponse.getWriteResultsList(); - - List result = new ArrayList<>(); - - for (com.google.firestore.v1.WriteResult writeResult : writeResults) { - result.add(WriteResult.fromProto(writeResult, commitResponse.getCommitTime())); - } - - return result; + Timestamp commitTime = commitResponse.getCommitTime(); + return commitResponse.getWriteResultsList().stream() + .map(writeResult -> WriteResult.fromProto(writeResult, commitTime)) + .collect(Collectors.toList()); }, MoreExecutors.directExecutor()); } + private CommitRequest buildCommitRequest(ByteString transactionId) { + CommitRequest.Builder builder = CommitRequest.newBuilder(); + builder.setDatabase(firestore.getDatabaseName()); + forEachWrite(builder::addWrites); + if (transactionId != null) { + builder.setTransaction(transactionId); + } + return builder.build(); + } + /** Checks whether any updates have been queued. */ boolean isEmpty() { - return writes.isEmpty(); + synchronized (writes) { + return writes.isEmpty(); + } } - List getWrites() { - return writes; + void forEachWrite(Consumer consumer) { + synchronized (writes) { + for (WriteOperation writeOperation : writes) { + consumer.accept(writeOperation.write); + } + } } /** Get the number of writes. */ public int getMutationsSize() { - return writes.size(); + synchronized (writes) { + return writes.size(); + } } @Override public String toString() { + final String writesAsString; + synchronized (writes) { + writesAsString = writes.toString(); + } + return String.format( - "%s{writes=%s, committed=%s}", getClass().getSimpleName(), writes, committed); + "%s{writes=%s, committed=%s}", getClass().getSimpleName(), writesAsString, committed); } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/ToStringTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/ToStringTest.java index 9dca2518e..6779edd18 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/ToStringTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/ToStringTest.java @@ -86,7 +86,8 @@ public void testWriteOperation() { documentReference, Collections.singletonMap("key", "value"), UserDataConverter.NO_DELETES) - .toPb()) + .toPb() + .build()) .toString(); assertThat(toStringResult).startsWith("WriteOperation{"); assertThat(toStringResult)