Skip to content

Commit

Permalink
fix: Transactions with readTime will omit begin and commit transact…
Browse files Browse the repository at this point in the history
…ion requests, and instead pass `readTime` on individual read requests. (#1565)

* Optimize ReadOnly transactions.

* Pretty

* Refactor

* Handle null readTime

* Consistent error messages

* Pretty

* Refactor

* Make more backward compatible

* Clirr

* Feedback
  • Loading branch information
tom-andersen committed Feb 14, 2024
1 parent 71e053e commit d5fb01a
Show file tree
Hide file tree
Showing 10 changed files with 553 additions and 174 deletions.
6 changes: 6 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- ReadTimeTransaction - added abstract modifier to Transaction class-->
<difference>
<differenceType>3005</differenceType>
<className>com/google/cloud/firestore/Transaction</className>
</difference>

<!-- Shutdown/Shutdown Now -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,22 @@ public Query getQuery() {
*/
@Nonnull
public ApiFuture<AggregateQuerySnapshot> get() {
return get(null);
return get(null, null);
}

@Nonnull
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
ApiFuture<AggregateQuerySnapshot> get(
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
transactionId, readTime, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer);
return responseDeliverer.getFuture();
}

private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId());
RunAggregationQueryRequest request =
toProto(responseDeliverer.transactionId, responseDeliverer.readTime);
AggregateQueryResponseObserver responseObserver =
new AggregateQueryResponseObserver(responseDeliverer);
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
Expand All @@ -96,28 +98,24 @@ private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
private final class AggregateQueryResponseDeliverer {

@Nullable private final ByteString transactionId;
@Nullable private final com.google.protobuf.Timestamp readTime;
private final long startTimeNanos;
private final SettableApiFuture<AggregateQuerySnapshot> future = SettableApiFuture.create();
private final AtomicBoolean isFutureCompleted = new AtomicBoolean(false);

AggregateQueryResponseDeliverer(@Nullable ByteString transactionId, long startTimeNanos) {
AggregateQueryResponseDeliverer(
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
long startTimeNanos) {
this.transactionId = transactionId;
this.readTime = readTime;
this.startTimeNanos = startTimeNanos;
}

ApiFuture<AggregateQuerySnapshot> getFuture() {
return future;
}

@Nullable
ByteString getTransactionId() {
return transactionId;
}

long getStartTimeNanos() {
return startTimeNanos;
}

void deliverResult(@Nonnull Map<String, Value> data, Timestamp readTime) {
if (isFutureCompleted.compareAndSet(false, true)) {
Map<String, Value> mappedData = new HashMap<>();
Expand Down Expand Up @@ -176,8 +174,8 @@ private boolean shouldRetry(Throwable throwable) {
FirestoreSettings.newBuilder().runAggregationQuerySettings().getRetryableCodes();
return query.shouldRetryQuery(
throwable,
responseDeliverer.getTransactionId(),
responseDeliverer.getStartTimeNanos(),
responseDeliverer.transactionId,
responseDeliverer.startTimeNanos,
retryableCodes);
}

Expand All @@ -193,18 +191,23 @@ public void onComplete() {}
*/
@Nonnull
public RunAggregationQueryRequest toProto() {
return toProto(null);
return toProto(null, null);
}

@Nonnull
RunAggregationQueryRequest toProto(@Nullable final ByteString transactionId) {
RunAggregationQueryRequest toProto(
@Nullable final ByteString transactionId,
@Nullable final com.google.protobuf.Timestamp readTime) {
RunQueryRequest runQueryRequest = query.toProto();

RunAggregationQueryRequest.Builder request = RunAggregationQueryRequest.newBuilder();
request.setParent(runQueryRequest.getParent());
if (transactionId != null) {
request.setTransaction(transactionId);
}
if (readTime != null) {
request.setReadTime(readTime);
}

StructuredAggregationQuery.Builder structuredAggregationQuery =
request.getStructuredAggregationQueryBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ public void getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nonnull final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {
this.getAll(documentReferences, fieldMask, null, apiStreamObserver);
this.getAll(documentReferences, fieldMask, null, null, apiStreamObserver);
}

void getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {

ResponseObserver<BatchGetDocumentsResponse> responseObserver =
Expand Down Expand Up @@ -304,6 +305,10 @@ public void onComplete() {
request.setTransaction(transactionId);
}

if (readTime != null) {
request.setReadTime(readTime);
}

for (DocumentReference docRef : documentReferences) {
request.addDocuments(docRef.getName());
}
Expand All @@ -318,17 +323,33 @@ public void onComplete() {
streamRequest(request.build(), responseObserver, firestoreClient.batchGetDocumentsCallable());
}

final ApiFuture<List<DocumentSnapshot>> getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable com.google.protobuf.Timestamp readTime) {
return getAll(documentReferences, fieldMask, null, readTime);
}

private ApiFuture<List<DocumentSnapshot>> getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId) {
return getAll(documentReferences, fieldMask, transactionId, null);
}

/** Internal getAll() method that accepts an optional transaction id. */
ApiFuture<List<DocumentSnapshot>> getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId) {
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime) {
final SettableApiFuture<List<DocumentSnapshot>> futureList = SettableApiFuture.create();
final Map<DocumentReference, DocumentSnapshot> documentSnapshotMap = new HashMap<>();
getAll(
documentReferences,
fieldMask,
transactionId,
readTime,
new ApiStreamObserver<DocumentSnapshot>() {
@Override
public void onNext(DocumentSnapshot documentSnapshot) {
Expand Down Expand Up @@ -390,9 +411,16 @@ public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {

TransactionRunner<T> transactionRunner =
new TransactionRunner<>(this, updateFunction, transactionOptions);
return transactionRunner.run();
if (transactionOptions.getReadTime() != null) {
// READ_ONLY transactions with readTime have no retry, nor transaction state, so we don't need
// a runner.
return updateFunction.updateCallback(
new ReadTimeTransaction(this, transactionOptions.getReadTime()));
} else {
// For READ_ONLY transactions without readTime, there is still strong consistency applied,
// that cannot be tracked client side.
return new ServerSideTransactionRunner<>(this, updateFunction, transactionOptions).run();
}
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,7 @@ boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) {
*/
@Nonnull
public ApiFuture<QuerySnapshot> get() {
return get(null);
return get(null, null);
}

/**
Expand All @@ -1811,7 +1811,7 @@ public ListenerRegistration addSnapshotListener(
return Watch.forQuery(this).runWatch(executor, listener);
}

ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId) {
ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId, @Nullable Timestamp readTime) {
final SettableApiFuture<QuerySnapshot> result = SettableApiFuture.create();

internalStream(
Expand Down Expand Up @@ -1843,7 +1843,7 @@ public void onCompleted() {
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null);
readTime);

return result;
}
Expand Down
Loading

0 comments on commit d5fb01a

Please sign in to comment.