Skip to content

Commit

Permalink
fix: Thread safe UpdateBuilder (#1537)
Browse files Browse the repository at this point in the history
* Thread safe UpdateBuilder

* Add comment

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* chore: add an unmanaged dependency check (#1532)

* refactor: Optimize FieldMask instantiation (#1536)

* Optimize FieldMask instantiation

* Pretty

* Use synchronize

* Update comment

* Make sure commit prevents writes.

* Pretty

* Refactor

* Add comments and make committed volatile.

* Make WriteOperation immutable.

* Refactor

* fix(deps): Update the Java code generator (gapic-generator-java) to 2.32.0 (#1534)

* chore: Add FindNearest API to the preview branch
docs: Improve the documentation on Document.fields

PiperOrigin-RevId: 599602467

Source-Link: googleapis/googleapis@d32bd97

Source-Link: googleapis/googleapis-gen@0545ffc
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiMDU0NWZmYzQ4OGI4MmQzYTQ3NzExMThjOTIzZDY0Y2QwYjc1OTk1MyJ9

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix(deps): Update the Java code generator (gapic-generator-java) to 2.32.0

PiperOrigin-RevId: 599914188

Source-Link: googleapis/googleapis@17e6661

Source-Link: googleapis/googleapis-gen@d86ba5b
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiZDg2YmE1YmU1MzdlNDg5NDM1MTA1Y2E4NTU2NmNjNDEwMzMwMWFiYSJ9

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>

* test(deps): update dependency com.google.truth:truth to v1.3.0 (#1538)

* make some methods static: applyFieldMask() and convertToFieldPaths()

* Comment

* Inline

* use explicit synchronization

* Review feedback

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Joe Wang <106995533+JoeWang1127@users.noreply.github.com>
Co-authored-by: gcf-owl-bot[bot] <78513119+gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Mend Renovate <bot@renovateapp.com>
Co-authored-by: Denver Coneybeare <dconeybe@google.com>
  • Loading branch information
6 people committed Jan 24, 2024
1 parent 46e09aa commit f9cdab5
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,21 @@ ApiFuture<WriteResult> wrapResult(int writeIndex) {
* <p>The writes in the batch are not applied atomically and can be applied out of order.
*/
ApiFuture<Void> bulkCommit() {

// Follows same thread safety logic as `UpdateBuilder::commit`.
committed = true;
BatchWriteRequest request = buildBatchWriteRequest();

Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_BATCHWRITE,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size())));

final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());

for (WriteOperation writeOperation : getWrites()) {
request.addWrites(writeOperation.write);
}

committed = true;
ImmutableMap.of(
"numDocuments", AttributeValue.longAttributeValue(request.getWritesCount())));

ApiFuture<BatchWriteResponse> response =
processExceptions(
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable()));
firestore.sendRequest(request, firestore.getClient().batchWriteCallable()));

return ApiFutures.transformAsync(
response,
Expand Down Expand Up @@ -117,6 +114,13 @@ ApiFuture<Void> bulkCommit() {
executor);
}

private BatchWriteRequest buildBatchWriteRequest() {
BatchWriteRequest.Builder builder = BatchWriteRequest.newBuilder();
builder.setDatabase(firestore.getDatabaseName());
forEachWrite(builder::addWrites);
return builder.build();
}

/** Maps an RPC failure to each individual write's result. */
private ApiFuture<BatchWriteResponse> processExceptions(ApiFuture<BatchWriteResponse> response) {
return ApiFutures.catching(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@
import com.google.firestore.v1.CommitResponse;
import com.google.firestore.v1.Write;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -48,11 +53,11 @@
*/
@InternalExtensionOnly
public abstract class UpdateBuilder<T> {
static class WriteOperation {
Write.Builder write;
DocumentReference documentReference;
static final class WriteOperation {
final Write write;
final DocumentReference documentReference;

WriteOperation(DocumentReference documentReference, Write.Builder write) {
WriteOperation(DocumentReference documentReference, Write write) {
this.documentReference = documentReference;
this.write = write;
}
Expand All @@ -65,13 +70,11 @@ public String toString() {

final FirestoreImpl firestore;

// All reads and writes on `writes` must be done in a block that is synchronized on `writes`;
// otherwise, you get undefined behavior.
private final List<WriteOperation> writes = new ArrayList<>();

protected boolean committed;

boolean isCommitted() {
return committed;
}
protected volatile boolean committed;

UpdateBuilder(FirestoreImpl firestore) {
this.firestore = firestore;
Expand Down Expand Up @@ -141,7 +144,6 @@ public T create(

private T performCreate(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_CREATEDOCUMENT);
DocumentSnapshot documentSnapshot =
DocumentSnapshot.fromObject(
Expand All @@ -157,17 +159,7 @@ private T performCreate(
write.addAllUpdateTransforms(documentTransform.toPb());
}

writes.add(new WriteOperation(documentReference, write));

return wrapResult(writes.size() - 1);
}

private void verifyNotCommitted() {
Preconditions.checkState(
!isCommitted(),
String.format(
"Cannot modify a %s that has already been committed.",
this.getClass().getSimpleName()));
return addWrite(documentReference, write);
}

/**
Expand Down Expand Up @@ -258,7 +250,6 @@ private T performSet(
@Nonnull DocumentReference documentReference,
@Nonnull Map<String, Object> fields,
@Nonnull SetOptions options) {
verifyNotCommitted();
Map<FieldPath, Object> documentData;

if (options.getFieldMask() != null) {
Expand Down Expand Up @@ -293,23 +284,36 @@ private T performSet(
write.setUpdateMask(documentMask.toPb());
}

writes.add(new WriteOperation(documentReference, write));
return addWrite(documentReference, write);
}

return wrapResult(writes.size() - 1);
private T addWrite(DocumentReference documentReference, Write.Builder write) {
WriteOperation operation = new WriteOperation(documentReference, write.build());
int writeIndex;
synchronized (writes) {
Preconditions.checkState(
!committed,
String.format(
"Cannot modify a %s that has already been committed.",
this.getClass().getSimpleName()));
writes.add(operation);
writeIndex = writes.size() - 1;
}
return wrapResult(writeIndex);
}

/** Removes all values in 'fields' that are not specified in 'fieldMask'. */
private Map<FieldPath, Object> applyFieldMask(
private static Map<FieldPath, Object> applyFieldMask(
Map<String, Object> fields, List<FieldPath> fieldMask) {
List<FieldPath> remainingFields = new ArrayList<>(fieldMask);
Set<FieldPath> remainingFields = new HashSet<>(fieldMask);
Map<FieldPath, Object> filteredData =
applyFieldMask(fields, remainingFields, FieldPath.empty());

if (!remainingFields.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Field masks contains invalid path. No data exist at field '%s'.",
remainingFields.get(0)));
remainingFields.iterator().next()));
}

return filteredData;
Expand All @@ -319,8 +323,8 @@ private Map<FieldPath, Object> applyFieldMask(
* Strips all values in 'fields' that are not specified in 'fieldMask'. Modifies 'fieldMask'
* inline and removes all matched fields.
*/
private Map<FieldPath, Object> applyFieldMask(
Map<String, Object> fields, List<FieldPath> fieldMask, FieldPath root) {
private static Map<FieldPath, Object> applyFieldMask(
Map<String, Object> fields, Set<FieldPath> fieldMask, FieldPath root) {
Map<FieldPath, Object> filteredMap = new HashMap<>();

for (Entry<String, Object> entry : fields.entrySet()) {
Expand All @@ -340,7 +344,7 @@ private Map<FieldPath, Object> applyFieldMask(
return filteredMap;
}

private Map<FieldPath, Object> convertToFieldPaths(
private static Map<FieldPath, Object> convertToFieldPaths(
@Nonnull Map<String, Object> fields, boolean splitOnDots) {
Map<FieldPath, Object> fieldPaths = new HashMap<>();

Expand Down Expand Up @@ -532,7 +536,6 @@ private T performUpdate(
@Nonnull DocumentReference documentReference,
@Nonnull final Map<FieldPath, Object> fields,
@Nonnull Precondition precondition) {
verifyNotCommitted();
Preconditions.checkArgument(!fields.isEmpty(), "Data for update() cannot be empty.");
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_UPDATEDOCUMENT);
Map<String, Object> deconstructedMap = expandObject(fields);
Expand Down Expand Up @@ -567,9 +570,8 @@ public boolean allowTransform() {
if (!documentTransform.isEmpty()) {
write.addAllUpdateTransforms(documentTransform.toPb());
}
writes.add(new WriteOperation(documentReference, write));

return wrapResult(writes.size() - 1);
return addWrite(documentReference, write);
}

/**
Expand Down Expand Up @@ -598,76 +600,98 @@ public T delete(@Nonnull DocumentReference documentReference) {

private T performDelete(
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_DELETEDOCUMENT);
Write.Builder write = Write.newBuilder().setDelete(documentReference.getName());

if (!precondition.isEmpty()) {
write.setCurrentDocument(precondition.toPb());
}
writes.add(new WriteOperation(documentReference, write));

return wrapResult(writes.size() - 1);
return addWrite(documentReference, write);
}

/** Commit the current batch. */
ApiFuture<List<WriteResult>> commit(@Nullable ByteString transactionId) {

// Sequence is thread safe.
//
// 1. Set committed = true
// 2. Build commit request
//
// Step 1 sets uses volatile property to ensure committed is visible to all
// threads immediately.
//
// Step 2 uses `forEach(..)` that is synchronized, therefore will be blocked
// until any writes are complete.
//
// Writes will verify `committed==false` within synchronized block of code
// before appending writes. Since committed is set to true before accessing
// writes, we are ensured that no more writes will be appended after commit
// accesses writes.
committed = true;
CommitRequest request = buildCommitRequest(transactionId);

Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_COMMIT,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size())));

final CommitRequest.Builder request = CommitRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());

for (WriteOperation writeOperation : writes) {
request.addWrites(writeOperation.write);
}

if (transactionId != null) {
request.setTransaction(transactionId);
}

committed = true;
ImmutableMap.of(
"numDocuments", AttributeValue.longAttributeValue(request.getWritesCount())));

ApiFuture<CommitResponse> response =
firestore.sendRequest(request.build(), firestore.getClient().commitCallable());
firestore.sendRequest(request, firestore.getClient().commitCallable());

return ApiFutures.transform(
response,
commitResponse -> {
List<com.google.firestore.v1.WriteResult> writeResults =
commitResponse.getWriteResultsList();

List<WriteResult> result = new ArrayList<>();

for (com.google.firestore.v1.WriteResult writeResult : writeResults) {
result.add(WriteResult.fromProto(writeResult, commitResponse.getCommitTime()));
}

return result;
Timestamp commitTime = commitResponse.getCommitTime();
return commitResponse.getWriteResultsList().stream()
.map(writeResult -> WriteResult.fromProto(writeResult, commitTime))
.collect(Collectors.toList());
},
MoreExecutors.directExecutor());
}

private CommitRequest buildCommitRequest(ByteString transactionId) {
CommitRequest.Builder builder = CommitRequest.newBuilder();
builder.setDatabase(firestore.getDatabaseName());
forEachWrite(builder::addWrites);
if (transactionId != null) {
builder.setTransaction(transactionId);
}
return builder.build();
}

/** Checks whether any updates have been queued. */
boolean isEmpty() {
return writes.isEmpty();
synchronized (writes) {
return writes.isEmpty();
}
}

List<WriteOperation> getWrites() {
return writes;
void forEachWrite(Consumer<Write> consumer) {
synchronized (writes) {
for (WriteOperation writeOperation : writes) {
consumer.accept(writeOperation.write);
}
}
}

/** Get the number of writes. */
public int getMutationsSize() {
return writes.size();
synchronized (writes) {
return writes.size();
}
}

@Override
public String toString() {
final String writesAsString;
synchronized (writes) {
writesAsString = writes.toString();
}

return String.format(
"%s{writes=%s, committed=%s}", getClass().getSimpleName(), writes, committed);
"%s{writes=%s, committed=%s}", getClass().getSimpleName(), writesAsString, committed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void testWriteOperation() {
documentReference,
Collections.singletonMap("key", "value"),
UserDataConverter.NO_DELETES)
.toPb())
.toPb()
.build())
.toString();
assertThat(toStringResult).startsWith("WriteOperation{");
assertThat(toStringResult)
Expand Down

0 comments on commit f9cdab5

Please sign in to comment.