Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wrong use of getRetryDelayInMillis() / 1000 in documentation and retry loops #885

Merged
merged 5 commits into from Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -321,19 +321,19 @@ CommitResponse writeAtLeastOnceWithOptions(
* <pre>{@code
* long singerId = my_singer_id;
* try (TransactionManager manager = dbClient.transactionManager()) {
* TransactionContext txn = manager.begin();
* TransactionContext transaction = manager.begin();
* while (true) {
* String column = "FirstName";
* Struct row = txn.readRow("Singers", Key.of(singerId), Collections.singleton(column));
* Struct row = transaction.readRow("Singers", Key.of(singerId), Collections.singleton(column));
* String name = row.getString(column);
* txn.buffer(
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
* try {
* manager.commit();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis() / 1000);
* txn = manager.resetForRetry();
* Thread.sleep(e.getRetryDelayInMillis());
* transaction = manager.resetForRetry();
* }
* }
* }
Expand Down Expand Up @@ -385,19 +385,19 @@ CommitResponse writeAtLeastOnceWithOptions(
* <pre>{@code
* long singerId = 1L;
* try (AsyncTransactionManager manager = client.transactionManagerAsync()) {
* TransactionContextFuture txnFut = manager.beginAsync();
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* txnFut
* transactionFuture
* .then(
* (txn, __) ->
* txn.readRowAsync(
* (transaction, __) ->
* transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column)))
* .then(
* (txn, row) -> {
* (transaction, row) -> {
* String name = row.getString(column);
* txn.buffer(
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
Expand All @@ -409,8 +409,8 @@ CommitResponse writeAtLeastOnceWithOptions(
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis() / 1000);
* txnFut = manager.resetForRetryAsync();
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
Expand All @@ -421,26 +421,26 @@ CommitResponse writeAtLeastOnceWithOptions(
* <pre>{@code
* final long singerId = 1L;
* try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
* TransactionContextFuture txn = manager.beginAsync();
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* final String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* txn.then(
* transactionFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext txn, Void input)
* public ApiFuture<Struct> apply(TransactionContext transaction, Void input)
* throws Exception {
* return txn.readRowAsync(
* return transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext txn, Struct input)
* public ApiFuture<Void> apply(TransactionContext transaction, Struct input)
* throws Exception {
* String name = input.getString(column);
* txn.buffer(
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
Expand All @@ -453,8 +453,8 @@ CommitResponse writeAtLeastOnceWithOptions(
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis() / 1000);
* txn = manager.resetForRetryAsync();
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
Expand Down
Expand Up @@ -816,13 +816,21 @@ private TransactionContext internalBegin() {
}

@Override
public SpannerException handleSessionNotFound(SessionNotFoundException notFound) {
session = sessionPool.replaceSession(notFound, session);
public SpannerException handleSessionNotFound(SessionNotFoundException notFoundException) {
session = sessionPool.replaceSession(notFoundException, session);
PooledSession pooledSession = session.get();
delegate = pooledSession.delegate.transactionManager(options);
restartedAfterSessionNotFound = true;
return createAbortedExceptionWithMinimalRetryDelay(notFoundException);
}

private static SpannerException createAbortedExceptionWithMinimalRetryDelay(
SessionNotFoundException notFoundException) {
return SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, notFound.getMessage(), notFound);
ErrorCode.ABORTED,
notFoundException.getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
notFoundException.getMessage(), notFoundException, 0, 1));
}

@Override
Expand Down
Expand Up @@ -24,9 +24,11 @@
import com.google.common.base.Predicate;
import com.google.rpc.ErrorInfo;
import com.google.rpc.ResourceInfo;
import com.google.rpc.RetryInfo;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -226,6 +228,28 @@ private static ErrorInfo extractErrorInfo(Throwable cause) {
return null;
}

/**
* Creates a {@link StatusRuntimeException} that contains a {@link RetryInfo} with the specified
* retry delay.
*/
static StatusRuntimeException createAbortedExceptionWithRetryDelay(
String message, Throwable cause, long retryDelaySeconds, int retryDelayNanos) {
Metadata.Key<RetryInfo> key = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
Metadata trailers = new Metadata();
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(
com.google.protobuf.Duration.newBuilder()
.setNanos(retryDelayNanos)
.setSeconds(retryDelaySeconds))
.build();
trailers.put(key, retryInfo);
return io.grpc.Status.ABORTED
.withDescription(message)
.withCause(cause)
.asRuntimeException(trailers);
}

static SpannerException newSpannerExceptionPreformatted(
ErrorCode code, @Nullable String message, @Nullable Throwable cause) {
// This is the one place in the codebase that is allowed to call constructors directly.
Expand Down
Expand Up @@ -531,7 +531,10 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
// Simulate an aborted transaction to force a retry with a new transaction.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Aborted due to failed initial statement", e));
ErrorCode.ABORTED,
"Aborted due to failed initial statement",
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1)));
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
Expand Down Expand Up @@ -684,6 +687,19 @@ public void run() {
return updateCount;
}

private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlResponse response) {
// Manually construct an AbortedException with a 10ms retry delay for BatchDML responses that
// return an Aborted status (and not an AbortedException).
return newSpannerException(
ErrorCode.fromRpcStatus(response.getStatus()),
response.getStatus().getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
response.getStatus().getMessage(),
/* cause = */ null,
/* retryDelaySeconds = */ 0,
/* retryDelayNanos = */ (int) TimeUnit.MILLISECONDS.toNanos(10L)));
}

@Override
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... options) {
beforeReadOrQuery();
Expand All @@ -705,8 +721,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
throw newSpannerException(
ErrorCode.fromRpcStatus(response.getStatus()), response.getStatus().getMessage());
throw createAbortedExceptionForBatchDml(response);
} else if (response.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(response.getStatus()),
Expand Down Expand Up @@ -741,25 +756,24 @@ public ApiFuture<long[]> batchUpdateAsync(
response,
new ApiFunction<ExecuteBatchDmlResponse, long[]>() {
@Override
public long[] apply(ExecuteBatchDmlResponse input) {
long[] results = new long[input.getResultSetsCount()];
for (int i = 0; i < input.getResultSetsCount(); ++i) {
results[i] = input.getResultSets(i).getStats().getRowCountExact();
if (input.getResultSets(i).getMetadata().hasTransaction()) {
public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) {
long[] results = new long[batchDmlResponse.getResultSetsCount()];
for (int i = 0; i < batchDmlResponse.getResultSetsCount(); ++i) {
results[i] = batchDmlResponse.getResultSets(i).getStats().getRowCountExact();
if (batchDmlResponse.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(
input.getResultSets(i).getMetadata().getTransaction(),
batchDmlResponse.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (input.getStatus().getCode() == Code.ABORTED_VALUE) {
throw newSpannerException(
ErrorCode.fromRpcStatus(input.getStatus()), input.getStatus().getMessage());
} else if (input.getStatus().getCode() != 0) {
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
throw createAbortedExceptionForBatchDml(batchDmlResponse);
} else if (batchDmlResponse.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(input.getStatus()),
input.getStatus().getMessage(),
ErrorCode.fromRpcStatus(batchDmlResponse.getStatus()),
batchDmlResponse.getStatus().getMessage(),
results);
}
return results;
Expand Down
Expand Up @@ -725,8 +725,11 @@ private void handleAborted(AbortedException aborted) {
logger.fine(toString() + ": Starting internal transaction retry");
while (true) {
// First back off and then restart the transaction.
long delay = aborted.getRetryDelayInMillis();
try {
Thread.sleep(aborted.getRetryDelayInMillis() / 1000);
if (delay > 0L) {
Thread.sleep(delay);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw SpannerExceptionFactory.newSpannerException(
Expand Down
Expand Up @@ -1137,7 +1137,7 @@ public ApiFuture<Void> apply(TransactionContext txn, Struct input)
commitTimestamp.get();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
Thread.sleep(e.getRetryDelayInMillis());
txn = manager.resetForRetryAsync();
}
}
Expand Down
Expand Up @@ -662,7 +662,7 @@ public void transactionManagerIsNonBlocking() throws Exception {
txManager.commit();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
Thread.sleep(e.getRetryDelayInMillis());
tx = txManager.resetForRetry();
}
}
Expand Down Expand Up @@ -705,7 +705,7 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
txManager.commit();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
Thread.sleep(e.getRetryDelayInMillis());
tx = txManager.resetForRetry();
}
}
Expand Down
Expand Up @@ -1732,7 +1732,7 @@ private void simulateAbort(Session session, ByteString transactionId) {

public StatusRuntimeException createAbortedException(ByteString transactionId) {
RetryInfo retryInfo =
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(100).build()).build();
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(1).build()).build();
Metadata.Key<RetryInfo> key =
Metadata.Key.of(
retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
Expand Down