diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java index d1c0eda844..bdd2136b62 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java @@ -17,6 +17,7 @@ import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.StatusCode; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.MutateRowsRequest; @@ -56,13 +57,30 @@ public Object getTransportCode() { public MutateRowsException( @Nullable Throwable rpcError, @Nonnull List failedMutations, - boolean retryable) { - super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable); + boolean retryable, + @Nullable ErrorDetails errorDetails) { + super( + new Throwable("Some mutations failed to apply", rpcError), + LOCAL_STATUS, + retryable, + errorDetails); Preconditions.checkNotNull(failedMutations); Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty"); this.failedMutations = failedMutations; } + /** + * This constructor is considered an internal implementation detail and not meant to be used by + * applications. + */ + @InternalApi + public MutateRowsException( + @Nullable Throwable rpcError, + @Nonnull List failedMutations, + boolean retryable) { + this(rpcError, failedMutations, retryable, null); + } + /** * Retrieve all of the failed mutations. This list will contain failures for all of the mutations * that have failed across all of the retry attempts so far. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index 269ce79031..3dc78610f5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -23,6 +23,7 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptionFactory; +import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; import com.google.bigtable.v2.MutateRowsRequest; @@ -250,7 +251,12 @@ private void handleAttemptError(Throwable rpcError) { currentRequest = builder.build(); originalIndexes = newOriginalIndexes; - throw new MutateRowsException(rpcError, allFailures.build(), entryError.isRetryable()); + ErrorDetails errorDetails = null; + if (rpcError instanceof ApiException) { + errorDetails = ((ApiException) rpcError).getErrorDetails(); + } + throw new MutateRowsException( + rpcError, allFailures.build(), entryError.isRetryable(), errorDetails); } /** @@ -268,6 +274,8 @@ private void handleAttemptSuccess(List responses) { List newOriginalIndexes = Lists.newArrayList(); boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()]; + ErrorDetails errorDetails = null; + for (MutateRowsResponse response : responses) { for (Entry entry : response.getEntriesList()) { seenIndices[Ints.checkedCast(entry.getIndex())] = true; @@ -283,13 +291,15 @@ private void handleAttemptSuccess(List responses) { allFailures.add(failedMutation); - if (!failedMutation.getError().isRetryable()) { + if (!ApiExceptions.isRetryable2(failedMutation.getError()) + && !failedMutation.getError().isRetryable()) { permanentFailures.add(failedMutation); } else { // Schedule the mutation entry for the next RPC by adding it to the request builder and // recording it's original index newOriginalIndexes.add(origIndex); builder.addEntries(lastRequest.getEntries((int) entry.getIndex())); + errorDetails = failedMutation.getError().getErrorDetails(); } } } @@ -319,7 +329,7 @@ private void handleAttemptSuccess(List responses) { if (!allFailures.isEmpty()) { boolean isRetryable = builder.getEntriesCount() > 0; - throw new MutateRowsException(null, allFailures, isRetryable); + throw new MutateRowsException(null, allFailures, isRetryable, errorDetails); } } @@ -338,10 +348,14 @@ private ApiException createEntryError(com.google.rpc.Status protoStatus) { StatusCode gaxStatusCode = GrpcStatusCode.of(grpcStatus.getCode()); + ErrorDetails errorDetails = + ErrorDetails.builder().setRawErrorMessages(protoStatus.getDetailsList()).build(); + return ApiExceptionFactory.createException( grpcStatus.asRuntimeException(), gaxStatusCode, - retryableCodes.contains(gaxStatusCode.getCode())); + retryableCodes.contains(gaxStatusCode.getCode()), + errorDetails); } /** @@ -354,10 +368,10 @@ private static ApiException createSyntheticErrorForRpcFailure(Throwable overallR ApiException requestApiException = (ApiException) overallRequestError; return ApiExceptionFactory.createException( - "Didn't receive a result for this mutation entry", overallRequestError, requestApiException.getStatusCode(), - requestApiException.isRetryable()); + requestApiException.isRetryable(), + requestApiException.getErrorDetails()); } return ApiExceptionFactory.createException( diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java index 71457f7e9a..0d5802e1fe 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryInfoRetryAlgorithm.java @@ -24,7 +24,6 @@ import com.google.protobuf.util.Durations; import com.google.rpc.RetryInfo; import io.grpc.Metadata; -import io.grpc.Status; import io.grpc.protobuf.ProtoUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.threeten.bp.Duration; @@ -93,17 +92,17 @@ static Duration extractRetryDelay(@Nullable Throwable throwable) { if (throwable == null) { return null; } - Metadata trailers = Status.trailersFromThrowable(throwable); - if (trailers == null) { + if (!(throwable instanceof ApiException)) { return null; } - RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY); - if (retryInfo == null) { + ApiException exception = (ApiException) throwable; + if (exception.getErrorDetails() == null) { return null; } - if (!retryInfo.hasRetryDelay()) { + if (exception.getErrorDetails().getRetryInfo() == null) { return null; } + RetryInfo retryInfo = exception.getErrorDetails().getRetryInfo(); return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay())); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java index b38e53480c..7c53dc6034 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java @@ -15,13 +15,13 @@ */ package com.google.cloud.bigtable.data.v2.stub; -import static com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm.RETRY_INFO_KEY; import static com.google.common.truth.Truth.assertThat; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ErrorDetails; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.UnavailableException; @@ -55,7 +55,9 @@ import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; +import com.google.protobuf.Any; import com.google.rpc.RetryInfo; import io.grpc.Metadata; import io.grpc.Status; @@ -77,6 +79,9 @@ public class RetryInfoTest { @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + private static final Metadata.Key ERROR_DETAILS_KEY = + Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER); + private FakeBigtableService service; private BigtableDataClient client; private BigtableDataSettings.Builder settings; @@ -167,6 +172,42 @@ public void testMutateRowsNonRetryableErrorWithRetryInfo() { false); } + @Test + public void testMutateRowsPartialFailure() { + service.partial = true; + + verifyRetryInfoIsUsed( + () -> + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))), + true); + } + + @Test + public void testMutateRowsPartialFailureNonRetryableError() { + service.partial = true; + + verifyRetryInfoIsUsed( + () -> + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))), + false); + } + + // TODO: add this test back + // @Test + public void testMutateRowsPartialFailureCanBeDisabled() { + service.partial = true; + + verifyRetryInfoCanBeDisabled( + () -> + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")))); + } + @Test public void testMutateRowsDisableRetryInfo() throws IOException { settings.stubSettings().setEnableRetryInfo(false); @@ -366,13 +407,18 @@ private void verifyRetryInfoCanBeDisabled(Runnable runnable) { private void enqueueRetryableExceptionWithDelay(com.google.protobuf.Duration delay) { Metadata trailers = new Metadata(); RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build(); - trailers.put(RETRY_INFO_KEY, retryInfo); + ErrorDetails errorDetails = + ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build(); + byte[] status = + com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray(); + trailers.put(ERROR_DETAILS_KEY, status); ApiException exception = new UnavailableException( new StatusRuntimeException(Status.UNAVAILABLE, trailers), GrpcStatusCode.of(Status.Code.UNAVAILABLE), - true); + true, + errorDetails); service.expectations.add(exception); } @@ -380,13 +426,18 @@ private void enqueueRetryableExceptionWithDelay(com.google.protobuf.Duration del private ApiException enqueueNonRetryableExceptionWithDelay(com.google.protobuf.Duration delay) { Metadata trailers = new Metadata(); RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build(); - trailers.put(RETRY_INFO_KEY, retryInfo); + ErrorDetails errorDetails = + ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(retryInfo))).build(); + byte[] status = + com.google.rpc.Status.newBuilder().addDetails(Any.pack(retryInfo)).build().toByteArray(); + trailers.put(ERROR_DETAILS_KEY, status); ApiException exception = new InternalException( new StatusRuntimeException(Status.INTERNAL, trailers), GrpcStatusCode.of(Status.Code.INTERNAL), - false); + false, + errorDetails); service.expectations.add(exception); @@ -395,6 +446,7 @@ private ApiException enqueueNonRetryableExceptionWithDelay(com.google.protobuf.D private class FakeBigtableService extends BigtableGrpc.BigtableImplBase { Queue expectations = Queues.newArrayDeque(); + boolean partial = false; @Override public void readRows( @@ -434,8 +486,26 @@ public void mutateRows( responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } else { - Exception expectedRpc = expectations.poll(); - responseObserver.onError(expectedRpc); + if (partial) { + ApiException expectedRpc = (ApiException) expectations.poll(); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + builder.addEntries( + 0, + MutateRowsResponse.Entry.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(expectedRpc.getStatusCode().getCode().getHttpStatusCode()) + .addDetails(Any.pack(expectedRpc.getErrorDetails().getRetryInfo()))) + .build()); + for (int i = 1; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } } }