Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Thread safe UpdateBuilder #1537

Merged
merged 23 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
138ebb8
Thread safe UpdateBuilder
tom-andersen Jan 19, 2024
facf7cf
Add comment
tom-andersen Jan 19, 2024
bb0bf77
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 19, 2024
a306c0e
chore: add an unmanaged dependency check (#1532)
JoeWang1127 Jan 19, 2024
763cdf5
refactor: Optimize FieldMask instantiation (#1536)
tom-andersen Jan 19, 2024
96175e3
Use synchronize
tom-andersen Jan 22, 2024
c790513
Merge remote-tracking branch 'origin/main' into tomandersen/threadSaf…
tom-andersen Jan 22, 2024
8967cb7
Update comment
tom-andersen Jan 22, 2024
7b3f9a8
Make sure commit prevents writes.
tom-andersen Jan 22, 2024
2e66019
Pretty
tom-andersen Jan 22, 2024
17a7ae6
Refactor
tom-andersen Jan 22, 2024
ae56dec
Add comments and make committed volatile.
tom-andersen Jan 23, 2024
81002e0
Make WriteOperation immutable.
tom-andersen Jan 23, 2024
b2ff585
Refactor
tom-andersen Jan 23, 2024
5cb6c52
fix(deps): Update the Java code generator (gapic-generator-java) to 2…
gcf-owl-bot[bot] Jan 22, 2024
bcc366d
test(deps): update dependency com.google.truth:truth to v1.3.0 (#1538)
renovate-bot Jan 22, 2024
396c15f
make some methods static: applyFieldMask() and convertToFieldPaths()
dconeybe Jan 23, 2024
e1c701a
Comment
tom-andersen Jan 23, 2024
b54f0d0
Inline
tom-andersen Jan 23, 2024
f6cc718
use explicit synchronization
dconeybe Jan 23, 2024
917e473
Merge remote-tracking branch 'origin/main' into tomandersen/threadSaf…
dconeybe Jan 23, 2024
a885696
Review feedback
tom-andersen Jan 24, 2024
e010693
Merge remote-tracking branch 'origin/tomandersen/threadSafeUpdateBuil…
tom-andersen Jan 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,40 +69,37 @@ ApiFuture<WriteResult> wrapResult(int writeIndex) {
* <p>The writes in the batch are not applied atomically and can be applied out of order.
*/
ApiFuture<Void> bulkCommit() {

// Follows same thread safety logic as `UpdateBuilder::commit`.
committed = true;
BatchWriteRequest request = buildBatchWriteRequest();

Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_BATCHWRITE,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size())));

final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());

for (WriteOperation writeOperation : getWrites()) {
request.addWrites(writeOperation.write);
}

committed = true;
ImmutableMap.of(
"numDocuments", AttributeValue.longAttributeValue(request.getWritesCount())));

ApiFuture<BatchWriteResponse> response =
processExceptions(
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable()));
firestore.sendRequest(request, firestore.getClient().batchWriteCallable()));

return ApiFutures.transformAsync(
response,
batchWriteResponse -> {
List<ApiFuture<Void>> pendingUserCallbacks = new ArrayList<>();

List<com.google.firestore.v1.WriteResult> writeResults =
batchWriteResponse.getWriteResultsList();
List<com.google.rpc.Status> statuses = batchWriteResponse.getStatusList();

for (int i = 0; i < writeResults.size(); ++i) {
com.google.firestore.v1.WriteResult writeResult = writeResults.get(i);
int size = writeResults.size();
dconeybe marked this conversation as resolved.
Show resolved Hide resolved
List<ApiFuture<Void>> pendingUserCallbacks = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
com.google.rpc.Status status = statuses.get(i);
BulkWriterOperation operation = pendingOperations.get(i);
Status code = Status.fromCodeValue(status.getCode());
if (code == Status.OK) {
com.google.firestore.v1.WriteResult writeResult = writeResults.get(i);
pendingUserCallbacks.add(
operation.onSuccess(
new WriteResult(Timestamp.fromProto(writeResult.getUpdateTime()))));
Expand All @@ -117,6 +114,13 @@ ApiFuture<Void> bulkCommit() {
executor);
}

private BatchWriteRequest buildBatchWriteRequest() {
dconeybe marked this conversation as resolved.
Show resolved Hide resolved
BatchWriteRequest.Builder builder = BatchWriteRequest.newBuilder();
builder.setDatabase(firestore.getDatabaseName());
forEach(builder::addWrites);
return builder.build();
}

/** Maps an RPC failure to each individual write's result. */
private ApiFuture<BatchWriteResponse> processExceptions(ApiFuture<BatchWriteResponse> response) {
return ApiFutures.catching(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.firestore.v1.CommitResponse;
import com.google.firestore.v1.Write;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
Expand All @@ -39,6 +40,8 @@
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -48,13 +51,13 @@
*/
@InternalExtensionOnly
public abstract class UpdateBuilder<T> {
static class WriteOperation {
Write.Builder write;
DocumentReference documentReference;
static final class WriteOperation {
dconeybe marked this conversation as resolved.
Show resolved Hide resolved
final Write write;
final DocumentReference documentReference;

WriteOperation(DocumentReference documentReference, Write.Builder write) {
this.documentReference = documentReference;
this.write = write;
this.write = write.build();
}

@Override
Expand All @@ -65,13 +68,11 @@ public String toString() {

final FirestoreImpl firestore;

// All reads and writes on `writes` must be done in a block that is synchronized on `writes`;
// otherwise, you get undefined behavior.
private final List<WriteOperation> writes = new ArrayList<>();

protected boolean committed;

boolean isCommitted() {
return committed;
}
protected volatile boolean committed;

UpdateBuilder(FirestoreImpl firestore) {
this.firestore = firestore;
Expand Down Expand Up @@ -141,7 +142,6 @@ public T create(

private T performCreate(
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_CREATEDOCUMENT);
DocumentSnapshot documentSnapshot =
DocumentSnapshot.fromObject(
Expand All @@ -157,17 +157,7 @@ private T performCreate(
write.addAllUpdateTransforms(documentTransform.toPb());
}

writes.add(new WriteOperation(documentReference, write));

return wrapResult(writes.size() - 1);
}

private void verifyNotCommitted() {
Preconditions.checkState(
!isCommitted(),
String.format(
"Cannot modify a %s that has already been committed.",
this.getClass().getSimpleName()));
return writesAdd(new WriteOperation(documentReference, write));
}

/**
Expand Down Expand Up @@ -258,7 +248,6 @@ private T performSet(
@Nonnull DocumentReference documentReference,
@Nonnull Map<String, Object> fields,
@Nonnull SetOptions options) {
verifyNotCommitted();
Map<FieldPath, Object> documentData;

if (options.getFieldMask() != null) {
Expand Down Expand Up @@ -293,13 +282,25 @@ private T performSet(
write.setUpdateMask(documentMask.toPb());
}

writes.add(new WriteOperation(documentReference, write));
return writesAdd(new WriteOperation(documentReference, write));
}

return wrapResult(writes.size() - 1);
private T writesAdd(WriteOperation operation) {
dconeybe marked this conversation as resolved.
Show resolved Hide resolved
int writeIndex;
synchronized (writes) {
Preconditions.checkState(
!committed,
String.format(
"Cannot modify a %s that has already been committed.",
this.getClass().getSimpleName()));
writes.add(operation);
writeIndex = writes.size() - 1;
}
return wrapResult(writeIndex);
}

/** Removes all values in 'fields' that are not specified in 'fieldMask'. */
private Map<FieldPath, Object> applyFieldMask(
private static Map<FieldPath, Object> applyFieldMask(
Map<String, Object> fields, List<FieldPath> fieldMask) {
List<FieldPath> remainingFields = new ArrayList<>(fieldMask);
Map<FieldPath, Object> filteredData =
Expand All @@ -319,7 +320,7 @@ private Map<FieldPath, Object> applyFieldMask(
* Strips all values in 'fields' that are not specified in 'fieldMask'. Modifies 'fieldMask'
* inline and removes all matched fields.
*/
private Map<FieldPath, Object> applyFieldMask(
private static Map<FieldPath, Object> applyFieldMask(
Map<String, Object> fields, List<FieldPath> fieldMask, FieldPath root) {
Map<FieldPath, Object> filteredMap = new HashMap<>();

Expand All @@ -340,7 +341,7 @@ private Map<FieldPath, Object> applyFieldMask(
return filteredMap;
}

private Map<FieldPath, Object> convertToFieldPaths(
private static Map<FieldPath, Object> convertToFieldPaths(
@Nonnull Map<String, Object> fields, boolean splitOnDots) {
Map<FieldPath, Object> fieldPaths = new HashMap<>();

Expand Down Expand Up @@ -529,7 +530,6 @@ private T performUpdate(
@Nonnull DocumentReference documentReference,
@Nonnull final Map<FieldPath, Object> fields,
@Nonnull Precondition precondition) {
verifyNotCommitted();
Preconditions.checkArgument(!fields.isEmpty(), "Data for update() cannot be empty.");
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_UPDATEDOCUMENT);
Map<String, Object> deconstructedMap = expandObject(fields);
Expand Down Expand Up @@ -564,9 +564,8 @@ public boolean allowTransform() {
if (!documentTransform.isEmpty()) {
write.addAllUpdateTransforms(documentTransform.toPb());
}
writes.add(new WriteOperation(documentReference, write));

return wrapResult(writes.size() - 1);
return writesAdd(new WriteOperation(documentReference, write));
}

/**
Expand Down Expand Up @@ -595,76 +594,98 @@ public T delete(@Nonnull DocumentReference documentReference) {

private T performDelete(
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
verifyNotCommitted();
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_DELETEDOCUMENT);
Write.Builder write = Write.newBuilder().setDelete(documentReference.getName());

if (!precondition.isEmpty()) {
write.setCurrentDocument(precondition.toPb());
}
writes.add(new WriteOperation(documentReference, write));

return wrapResult(writes.size() - 1);
return writesAdd(new WriteOperation(documentReference, write));
}

/** Commit the current batch. */
ApiFuture<List<WriteResult>> commit(@Nullable ByteString transactionId) {

// Sequence is thread safe.
//
// 1. Set committed = true
// 2. Build commit request
//
// Step 1 sets uses volatile property to ensure committed is visible to all
// threads immediately.
//
// Step 2 uses `forEach(..)` that is synchronized, therefore will be blocked
// until any writes are complete.
//
// Writes will verify `committed==false` within synchronized block of code
// before appending writes. Since committed is set to true before accessing
// writes, we are ensured that no more writes will be appended after commit
// accesses writes.
committed = true;
CommitRequest request = buildCommitRequest(transactionId);

Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_COMMIT,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size())));

final CommitRequest.Builder request = CommitRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());

for (WriteOperation writeOperation : writes) {
request.addWrites(writeOperation.write);
}

if (transactionId != null) {
request.setTransaction(transactionId);
}

committed = true;
ImmutableMap.of(
"numDocuments", AttributeValue.longAttributeValue(request.getWritesCount())));

ApiFuture<CommitResponse> response =
firestore.sendRequest(request.build(), firestore.getClient().commitCallable());
firestore.sendRequest(request, firestore.getClient().commitCallable());

return ApiFutures.transform(
response,
commitResponse -> {
List<com.google.firestore.v1.WriteResult> writeResults =
commitResponse.getWriteResultsList();

List<WriteResult> result = new ArrayList<>();

for (com.google.firestore.v1.WriteResult writeResult : writeResults) {
result.add(WriteResult.fromProto(writeResult, commitResponse.getCommitTime()));
}

return result;
Timestamp commitTime = commitResponse.getCommitTime();
return commitResponse.getWriteResultsList().stream()
.map(writeResult -> WriteResult.fromProto(writeResult, commitTime))
.collect(Collectors.toList());
},
MoreExecutors.directExecutor());
}

private CommitRequest buildCommitRequest(ByteString transactionId) {
CommitRequest.Builder builder = CommitRequest.newBuilder();
builder.setDatabase(firestore.getDatabaseName());
forEach(builder::addWrites);
if (transactionId != null) {
builder.setTransaction(transactionId);
}
return builder.build();
}

/** Checks whether any updates have been queued. */
boolean isEmpty() {
return writes.isEmpty();
synchronized (writes) {
return writes.isEmpty();
}
}

List<WriteOperation> getWrites() {
return writes;
void forEach(Consumer<Write> consumer) {
dconeybe marked this conversation as resolved.
Show resolved Hide resolved
synchronized (writes) {
for (WriteOperation writeOperation : writes) {
consumer.accept(writeOperation.write);
}
}
}

/** Get the number of writes. */
public int getMutationsSize() {
return writes.size();
synchronized (writes) {
return writes.size();
}
}

@Override
public String toString() {
final String writesAsString;
synchronized (writes) {
writesAsString = writes.toString();
}

return String.format(
"%s{writes=%s, committed=%s}", getClass().getSimpleName(), writes, committed);
"%s{writes=%s, committed=%s}", getClass().getSimpleName(), writesAsString, committed);
}
}