extract(MutateRowRequest mutateRowRequest) {
}
/**
- * Internal helper to create the base MutateRows callable chain. The chain is responsible for
- * retrying individual entry in case of error.
+ * Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
+ * batching. The chain will:
*
- * NOTE: the caller is responsible for adding tracing & metrics.
+ *
+ * - Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
+ *
- Process the response and schedule retries. At the end of each attempt, entries that have
+ * been applied, are filtered from the next attempt. Also, any entries that failed with a
+ * nontransient error, are filtered from the next attempt. This will continue until there
+ * are no more entries or there are no more retry attempts left.
+ *
- Wrap batch failures in a {@link MutateRowsAttemptResult}.
+ *
- Add tracing & metrics.
+ *
+ *
+ * This callable returns an internal type {@link MutateRowsAttemptResult}.
*
- * @see MutateRowsRetryingCallable for more details
+ * This function should not be exposed to external users, as it could cause a data loss.
*/
- private UnaryCallable createMutateRowsBaseCallable() {
+ private UnaryCallable createMutateRowsBaseCallable() {
ServerStreamingCallable base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.newBuilder()
@@ -706,55 +721,38 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);
- BasicResultRetryAlgorithm resultRetryAlgorithm;
+ BasicResultRetryAlgorithm resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
}
+ MutateRowsPartialErrorRetryAlgorithm mutateRowsPartialErrorRetryAlgorithm =
+ new MutateRowsPartialErrorRetryAlgorithm(resultRetryAlgorithm);
- RetryAlgorithm retryAlgorithm =
+ RetryAlgorithm retryAlgorithm =
new RetryAlgorithm<>(
- resultRetryAlgorithm,
+ mutateRowsPartialErrorRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
- RetryingExecutorWithContext retryingExecutor =
+ RetryingExecutorWithContext retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
+ UnaryCallable baseCallable =
+ new MutateRowsRetryingCallable(
+ clientContext.getDefaultCallContext(),
+ withBigtableTracer,
+ retryingExecutor,
+ settings.bulkMutateRowsSettings().getRetryableCodes(),
+ retryAlgorithm);
- return new MutateRowsRetryingCallable(
- clientContext.getDefaultCallContext(),
- withBigtableTracer,
- retryingExecutor,
- settings.bulkMutateRowsSettings().getRetryableCodes(),
- retryAlgorithm);
- }
-
- /**
- * Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
- * batching. The chain will:
- *
- *
- * - Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
- *
- Process the response and schedule retries. At the end of each attempt, entries that have
- * been applied, are filtered from the next attempt. Also, any entries that failed with a
- * nontransient error, are filtered from the next attempt. This will continue until there
- * are no more entries or there are no more retry attempts left.
- *
- Wrap batch failures in a {@link
- * com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
- *
- Add tracing & metrics.
- *
- */
- private UnaryCallable createBulkMutateRowsCallable() {
- UnaryCallable baseCallable = createMutateRowsBaseCallable();
-
- UnaryCallable withCookie = baseCallable;
+ UnaryCallable withCookie = baseCallable;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}
- UnaryCallable flowControlCallable = null;
+ UnaryCallable flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
@@ -764,16 +762,16 @@ private UnaryCallable createBulkMutateRowsCallable() {
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
- UnaryCallable userFacing =
+ UnaryCallable userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);
SpanName spanName = getSpanName("MutateRows");
- UnaryCallable tracedBatcherUnaryCallable =
+ UnaryCallable tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);
- UnaryCallable traced =
+ UnaryCallable traced =
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);
@@ -1171,10 +1169,15 @@ public UnaryCallable mutateRowCallable() {
}
/**
- * Returns the callable chain created in {@link #createBulkMutateRowsCallable()} ()} during stub
+ * Returns the callable chain created in {@link #createMutateRowsBaseCallable()} during stub
* construction.
*/
public UnaryCallable bulkMutateRowsCallable() {
+ return externalBulkMutateRowsCallable;
+ }
+
+ @InternalApi
+ public UnaryCallable internalBulkMutateRowsCallable() {
return bulkMutateRowsCallable;
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowsErrorConverterUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowsErrorConverterUnaryCallable.java
new file mode 100644
index 0000000000..2b118df61e
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowsErrorConverterUnaryCallable.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
+import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Status;
+
+/**
+ * This callable converts partial batch failures into an exception. This is necessary to make sure
+ * that the caller properly handles issues and avoids possible data loss on partial failures
+ */
+@InternalApi
+public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable {
+
+ private final UnaryCallable innerCallable;
+
+ public MutateRowsErrorConverterUnaryCallable(
+ UnaryCallable callable) {
+ this.innerCallable = callable;
+ }
+
+ @Override
+ public ApiFuture futureCall(BulkMutation request, ApiCallContext context) {
+ ApiFuture future = innerCallable.futureCall(request, context);
+ return ApiFutures.transform(
+ future,
+ result -> {
+ if (!result.getFailedMutations().isEmpty()) {
+ throw MutateRowsException.create(
+ null,
+ GrpcStatusCode.of(Status.Code.OK),
+ result.getFailedMutations(),
+ result.getIsRetryable());
+ }
+ return null;
+ },
+ MoreExecutors.directExecutor());
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsUserFacingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsUserFacingCallable.java
index 8048cceaad..94980a80a2 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsUserFacingCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsUserFacingCallable.java
@@ -30,18 +30,22 @@
* applications.
*/
@InternalApi
-public final class BulkMutateRowsUserFacingCallable extends UnaryCallable {
- private final UnaryCallable innerCallable;
+public final class BulkMutateRowsUserFacingCallable
+ extends UnaryCallable {
+
+ private final UnaryCallable innerCallable;
private final RequestContext requestContext;
public BulkMutateRowsUserFacingCallable(
- UnaryCallable innerCallable, RequestContext requestContext) {
+ UnaryCallable innerCallable,
+ RequestContext requestContext) {
this.innerCallable = innerCallable;
this.requestContext = requestContext;
}
@Override
- public ApiFuture futureCall(BulkMutation request, ApiCallContext context) {
+ public ApiFuture futureCall(
+ BulkMutation request, ApiCallContext context) {
return innerCallable.futureCall(request.toProto(requestContext), context);
}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
index 155ea43211..b07e67ba94 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
@@ -87,7 +87,7 @@
*
* Package-private for internal use.
*/
-class MutateRowsAttemptCallable implements Callable {
+class MutateRowsAttemptCallable implements Callable {
// Synthetic status for Mutations that didn't get a result (because the whole RPC failed). It will
// be exposed in MutateRowsException's FailedMutations.
private static final StatusCode LOCAL_UNKNOWN_STATUS =
@@ -116,17 +116,17 @@ public Object getTransportCode() {
@Nonnull private TimedAttemptSettings attemptSettings;
// Parent controller
- private RetryingFuture externalFuture;
+ private RetryingFuture externalFuture;
// Simple wrappers for handling result futures
- private final ApiFunction, Void> attemptSuccessfulCallback =
- new ApiFunction, Void>() {
- @Override
- public Void apply(List responses) {
- handleAttemptSuccess(responses);
- return null;
- }
- };
+ private final ApiFunction, MutateRowsAttemptResult>
+ attemptSuccessfulCallback =
+ new ApiFunction, MutateRowsAttemptResult>() {
+ @Override
+ public MutateRowsAttemptResult apply(List responses) {
+ return handleAttemptSuccess(responses);
+ }
+ };
private final ApiFunction> attemptFailedCallback =
new ApiFunction>() {
@@ -153,7 +153,7 @@ public List apply(Throwable throwable) {
permanentFailures = Lists.newArrayList();
}
- public void setExternalFuture(RetryingFuture externalFuture) {
+ public void setExternalFuture(RetryingFuture externalFuture) {
this.externalFuture = externalFuture;
}
@@ -166,7 +166,7 @@ public void setExternalFuture(RetryingFuture externalFuture) {
* return of this method should just be ignored.
*/
@Override
- public Void call() {
+ public MutateRowsAttemptResult call() {
try {
// externalFuture is set from MutateRowsRetryingCallable before invoking this method. It
// shouldn't be null unless the code changed
@@ -192,7 +192,7 @@ public Void call() {
}
// Handle concurrent cancellation
- externalFuture.setAttemptFuture(new NonCancellableFuture());
+ externalFuture.setAttemptFuture(new NonCancellableFuture<>());
if (externalFuture.isDone()) {
return null;
}
@@ -208,13 +208,13 @@ public Void call() {
// Inspect the results and either propagate the success, or prepare to retry the failed
// mutations
- ApiFuture transformed =
+ ApiFuture transformed =
ApiFutures.transform(catching, attemptSuccessfulCallback, MoreExecutors.directExecutor());
// Notify the parent of the attempt
externalFuture.setAttemptFuture(transformed);
} catch (Throwable e) {
- externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e));
+ externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e));
}
return null;
@@ -257,7 +257,8 @@ private void handleAttemptError(Throwable rpcError) {
currentRequest = builder.build();
originalIndexes = newOriginalIndexes;
- throw MutateRowsException.create(rpcError, allFailures.build(), builder.getEntriesCount() > 0);
+ throw MutateRowsException.create(
+ rpcError, entryError.getStatusCode(), allFailures.build(), builder.getEntriesCount() > 0);
}
/**
@@ -267,7 +268,7 @@ private void handleAttemptError(Throwable rpcError) {
* {@link MutateRowsException}. If no errors exist, then the attempt future is successfully
* completed. We don't currently handle RetryInfo on entry level failures.
*/
- private void handleAttemptSuccess(List responses) {
+ private MutateRowsAttemptResult handleAttemptSuccess(List responses) {
List allFailures = Lists.newArrayList(permanentFailures);
MutateRowsRequest lastRequest = currentRequest;
@@ -326,8 +327,9 @@ private void handleAttemptSuccess(List responses) {
if (!allFailures.isEmpty()) {
boolean isRetryable = builder.getEntriesCount() > 0;
- throw MutateRowsException.create(null, allFailures, isRetryable);
+ return MutateRowsAttemptResult.create(allFailures, isRetryable);
}
+ return MutateRowsAttemptResult.success();
}
/**
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptResult.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptResult.java
new file mode 100644
index 0000000000..d668c2a50f
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptResult.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.mutaterows;
+
+import com.google.api.core.InternalApi;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nonnull;
+
+/**
+ * This class represents the result of a MutateRows attempt. It contains a potentially empty list of
+ * failed mutations, along with an indicator whether these errors are retryable.
+ */
+@InternalApi
+@AutoValue
+public abstract class MutateRowsAttemptResult {
+
+ public abstract List getFailedMutations();
+
+ public abstract boolean getIsRetryable();
+
+ @InternalApi
+ @Nonnull
+ public static MutateRowsAttemptResult create(
+ List failedMutations, boolean isRetryable) {
+ return new AutoValue_MutateRowsAttemptResult(failedMutations, isRetryable);
+ }
+
+ @InternalApi
+ @Nonnull
+ public static MutateRowsAttemptResult success() {
+ return new AutoValue_MutateRowsAttemptResult(new ArrayList<>(), false);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java
index 65cc781169..87f5c88d3e 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java
@@ -37,7 +37,7 @@
*/
@InternalApi("For internal use only")
public class MutateRowsBatchingDescriptor
- implements BatchingDescriptor {
+ implements BatchingDescriptor {
@Override
public BatchingRequestBuilder newRequestBuilder(
@@ -46,7 +46,15 @@ public BatchingRequestBuilder newRequestBuilder(
}
@Override
- public void splitResponse(Void response, List> entries) {
+ public void splitResponse(
+ MutateRowsAttemptResult response, List> entries) {
+ // For every failed mutation in the response, we set the exception on the matching requested
+ // mutation. It is important to set the correct error on the correct mutation. When the entry is
+ // later read, it resolves the exception first, and only later it goes to the value set by
+ // set().
+ for (FailedMutation mutation : response.getFailedMutations()) {
+ entries.get(mutation.getIndex()).getResultFuture().setException(mutation.getError());
+ }
for (BatchEntry batchResponse : entries) {
batchResponse.getResultFuture().set(null);
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsPartialErrorRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsPartialErrorRetryAlgorithm.java
new file mode 100644
index 0000000000..9c7035db96
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsPartialErrorRetryAlgorithm.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.mutaterows;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
+import com.google.api.gax.retrying.RetryingContext;
+import com.google.api.gax.retrying.TimedAttemptSettings;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * This algorithm will retry if there was a retryable failed mutation, or if there wasn't but the
+ * underlying algorithm allows a retry.
+ */
+@InternalApi
+public class MutateRowsPartialErrorRetryAlgorithm
+ implements ResultRetryAlgorithmWithContext {
+ private final ResultRetryAlgorithmWithContext retryAlgorithm;
+
+ public MutateRowsPartialErrorRetryAlgorithm(
+ ResultRetryAlgorithmWithContext retryAlgorithm) {
+ this.retryAlgorithm = retryAlgorithm;
+ }
+
+ @Override
+ public boolean shouldRetry(
+ Throwable previousThrowable, MutateRowsAttemptResult previousResponse) {
+ // handle partial retryable failures
+ if (previousResponse != null && !previousResponse.getFailedMutations().isEmpty()) {
+ return previousResponse.getIsRetryable();
+ }
+ // business as usual
+ return retryAlgorithm.shouldRetry(previousThrowable, previousResponse);
+ }
+
+ @Override
+ public boolean shouldRetry(
+ @Nullable RetryingContext context,
+ Throwable previousThrowable,
+ MutateRowsAttemptResult previousResponse) {
+ // handle partial retryable failures
+ if (previousResponse != null && !previousResponse.getFailedMutations().isEmpty()) {
+ return previousResponse.getIsRetryable();
+ }
+ // business as usual
+ return retryAlgorithm.shouldRetry(context, previousThrowable, previousResponse);
+ }
+
+ @Override
+ public TimedAttemptSettings createNextAttempt(
+ Throwable previousThrowable,
+ MutateRowsAttemptResult previousResponse,
+ TimedAttemptSettings previousSettings) {
+ return retryAlgorithm.createNextAttempt(previousThrowable, previousResponse, previousSettings);
+ }
+
+ @Override
+ public TimedAttemptSettings createNextAttempt(
+ RetryingContext context,
+ Throwable previousThrowable,
+ MutateRowsAttemptResult previousResponse,
+ TimedAttemptSettings previousSettings) {
+ return retryAlgorithm.createNextAttempt(
+ context, previousThrowable, previousResponse, previousSettings);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable.java
index 8ad1db258d..354a5ea54a 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable.java
@@ -40,17 +40,18 @@
* @see MutateRowsAttemptCallable for more details.
*/
@InternalApi
-public class MutateRowsRetryingCallable extends UnaryCallable {
+public class MutateRowsRetryingCallable
+ extends UnaryCallable {
private final ApiCallContext callContextPrototype;
private final ServerStreamingCallable callable;
- private final RetryingExecutorWithContext executor;
+ private final RetryingExecutorWithContext executor;
private final ImmutableSet retryCodes;
private final RetryAlgorithm retryAlgorithm;
public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable callable,
- @Nonnull RetryingExecutorWithContext executor,
+ @Nonnull RetryingExecutorWithContext executor,
@Nonnull Set retryCodes,
@Nonnull RetryAlgorithm retryAlgorithm) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
@@ -61,12 +62,14 @@ public MutateRowsRetryingCallable(
}
@Override
- public RetryingFuture futureCall(MutateRowsRequest request, ApiCallContext inputContext) {
+ public RetryingFuture futureCall(
+ MutateRowsRequest request, ApiCallContext inputContext) {
ApiCallContext context = callContextPrototype.nullToSelf(inputContext);
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes, retryAlgorithm);
- RetryingFuture retryingFuture = executor.createFuture(retryCallable, context);
+ RetryingFuture retryingFuture =
+ executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
retryCallable.call();
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
index eacf145bcb..2eb0700488 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
@@ -16,11 +16,13 @@
package com.google.cloud.bigtable.data.v2.stub;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
import com.google.api.client.json.gson.GsonFactory;
import com.google.api.client.json.webtoken.JsonWebSignature;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
+import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
@@ -51,13 +53,14 @@
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.*;
-import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Queues;
import com.google.common.io.BaseEncoding;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
@@ -94,6 +97,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
import org.threeten.bp.Duration;
@RunWith(JUnit4.class)
@@ -118,7 +122,7 @@ public class EnhancedBigtableStubTest {
public void setUp() throws IOException, IllegalAccessException, InstantiationException {
metadataInterceptor = new MetadataInterceptor();
contextInterceptor = new ContextInterceptor();
- fakeDataService = new FakeDataService();
+ fakeDataService = Mockito.spy(new FakeDataService());
server =
FakeServiceBuilder.create(fakeDataService)
@@ -592,6 +596,69 @@ public void testReadChangeStreamWaitTimeoutIsSet() throws Exception {
}
}
+ @Test
+ public void testBatchMutationsPartialFailure() {
+ Batcher batcher =
+ enhancedBigtableStub.newMutateRowsBatcher("table1", GrpcCallContext.createDefault());
+
+ batcher.add(RowMutationEntry.create("key0").deleteRow());
+ batcher.add(RowMutationEntry.create("key1").deleteRow());
+
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ StreamObserver observer = invocationOnMock.getArgument(1);
+ observer.onNext(
+ MutateRowsResponse.newBuilder()
+ .addEntries(
+ MutateRowsResponse.Entry.newBuilder()
+ .setIndex(0)
+ .setStatus(Status.newBuilder().setCode(Code.OK_VALUE))
+ .build())
+ .addEntries(
+ MutateRowsResponse.Entry.newBuilder()
+ .setIndex(1)
+ .setStatus(
+ Status.newBuilder()
+ .setCode(Code.PERMISSION_DENIED_VALUE)
+ .setMessage("fake partial error"))
+ .build())
+ .build());
+ observer.onCompleted();
+ return null;
+ })
+ .when(fakeDataService)
+ .mutateRows(Mockito.any(MutateRowsRequest.class), Mockito.any(StreamObserver.class));
+ BatchingException batchingException =
+ assertThrows(BatchingException.class, () -> batcher.close());
+ assertThat(batchingException.getMessage())
+ .contains(
+ "Batching finished with 1 partial failures. The 1 partial failures contained 1 entries that failed with: 1 ApiException(1 PERMISSION_DENIED).");
+ assertThat(batchingException.getMessage()).contains("fake partial error");
+ assertThat(batchingException.getMessage()).doesNotContain("INTERNAL");
+ }
+
+ @Test
+ public void testBatchMutationRPCErrorCode() {
+ Batcher batcher =
+ enhancedBigtableStub.newMutateRowsBatcher("table1", GrpcCallContext.createDefault());
+
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ StreamObserver observer = invocationOnMock.getArgument(1);
+ observer.onError(io.grpc.Status.PERMISSION_DENIED.asException());
+ return null;
+ })
+ .when(fakeDataService)
+ .mutateRows(Mockito.any(MutateRowsRequest.class), Mockito.any(StreamObserver.class));
+
+ batcher.add(RowMutationEntry.create("key0").deleteRow());
+ BatchingException batchingException =
+ assertThrows(BatchingException.class, () -> batcher.close());
+ assertThat(batchingException.getMessage())
+ .contains(
+ "Batching finished with 1 batches failed to apply due to: 1 ApiException(1 PERMISSION_DENIED) and 0 partial failures");
+ }
+
private static class MetadataInterceptor implements ServerInterceptor {
final BlockingQueue headers = Queues.newLinkedBlockingDeque();
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
index c2be1ea0ff..8f62060c97 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java
@@ -17,6 +17,7 @@
import static com.google.api.gax.tracing.ApiTracerFactory.OperationType;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
@@ -27,6 +28,7 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.Batcher;
+import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
@@ -74,6 +76,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -449,6 +452,55 @@ public void testMutateRowAttemptsTagValues() {
assertThat(tableId.getAllValues()).containsExactly(TABLE_ID, TABLE_ID, TABLE_ID);
}
+ @Test
+ public void testMutateRowsPartialError() throws InterruptedException {
+ int numMutations = 6;
+ when(mockFactory.newTracer(any(), any(), any()))
+ .thenReturn(
+ new BuiltinMetricsTracer(
+ OperationType.Unary, SpanName.of("Bigtable", "MutateRows"), statsRecorderWrapper));
+
+ Batcher batcher = stub.newMutateRowsBatcher(TABLE_ID, null);
+ for (int i = 0; i < numMutations; i++) {
+ String key = i % 2 == 0 ? "key" : "fail-key";
+ batcher.add(RowMutationEntry.create(key).setCell("f", "q", "v"));
+ }
+
+ assertThrows(BatchingException.class, () -> batcher.close());
+
+ int expectedNumRequests = numMutations / batchElementCount;
+ verify(statsRecorderWrapper, timeout(100).times(expectedNumRequests))
+ .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
+
+ assertThat(zone.getAllValues()).containsExactly(ZONE, ZONE, ZONE);
+ assertThat(cluster.getAllValues()).containsExactly(CLUSTER, CLUSTER, CLUSTER);
+ assertThat(status.getAllValues()).containsExactly("OK", "OK", "OK");
+ }
+
+ @Test
+ public void testMutateRowsRpcError() {
+ int numMutations = 6;
+ when(mockFactory.newTracer(any(), any(), any()))
+ .thenReturn(
+ new BuiltinMetricsTracer(
+ OperationType.Unary, SpanName.of("Bigtable", "MutateRows"), statsRecorderWrapper));
+
+ Batcher batcher = stub.newMutateRowsBatcher(BAD_TABLE_ID, null);
+ for (int i = 0; i < numMutations; i++) {
+ batcher.add(RowMutationEntry.create("key").setCell("f", "q", "v"));
+ }
+
+ assertThrows(BatchingException.class, () -> batcher.close());
+
+ int expectedNumRequests = numMutations / batchElementCount;
+ verify(statsRecorderWrapper, timeout(100).times(expectedNumRequests))
+ .recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
+
+ assertThat(zone.getAllValues()).containsExactly("global", "global", "global");
+ assertThat(cluster.getAllValues()).containsExactly("unspecified", "unspecified", "unspecified");
+ assertThat(status.getAllValues()).containsExactly("NOT_FOUND", "NOT_FOUND", "NOT_FOUND");
+ }
+
@Test
public void testReadRowsAttemptsTagValues() {
when(mockFactory.newTracer(any(), any(), any()))
@@ -644,12 +696,30 @@ public void mutateRow(
@Override
public void mutateRows(
MutateRowsRequest request, StreamObserver responseObserver) {
+ if (request.getTableName().contains(BAD_TABLE_ID)) {
+ responseObserver.onError(new StatusRuntimeException(Status.NOT_FOUND));
+ return;
+ }
try {
Thread.sleep(SERVER_LATENCY);
} catch (InterruptedException e) {
}
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
+ if (request
+ .getEntries(i)
+ .getRowKey()
+ .toString(Charset.availableCharsets().get("UTF-8"))
+ .startsWith("fail")) {
+ builder
+ .addEntriesBuilder()
+ .setIndex(i)
+ .setStatus(
+ com.google.rpc.Status.newBuilder()
+ .setCode(com.google.rpc.Code.PERMISSION_DENIED_VALUE)
+ .build());
+ continue;
+ }
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
index 2894568f27..15bd9171f0 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
@@ -448,7 +448,7 @@ public Object answer(InvocationOnMock invocation) {
try (Batcher batcher =
new BatcherImpl<>(
batchingDescriptor,
- stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext),
+ stub.internalBulkMutateRowsCallable().withDefaultCallContext(defaultContext),
BulkMutation.create(TABLE_ID),
settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(),
Executors.newSingleThreadScheduledExecutor(),
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java
index e5d12ccaeb..6dd1ff9bd0 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java
@@ -44,8 +44,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -101,14 +99,18 @@ public void singleEntrySuccessTest() throws Exception {
attemptCallable.setExternalFuture(parentFuture);
attemptCallable.call();
- // Attempt completed successfully and the useless response has been suppressed
- assertThat(parentFuture.attemptFuture.get()).isNull();
+ // Attempt completed successfully
+ MutateRowsAttemptResult result = parentFuture.attemptFuture.get();
+
+ assertThat(result).isNotNull();
+ assertThat(result.getFailedMutations()).hasSize(0);
+ assertThat(result.getIsRetryable()).isFalse();
// innerCallable received the request
assertThat(innerCallable.lastRequest).isEqualTo(request);
}
@Test
- public void missingEntry() {
+ public void missingEntry() throws Exception {
MutateRowsRequest request =
MutateRowsRequest.newBuilder()
.addEntries(Entry.getDefaultInstance())
@@ -125,14 +127,10 @@ public void missingEntry() {
attemptCallable.setExternalFuture(parentFuture);
attemptCallable.call();
- ExecutionException executionException =
- Assert.assertThrows(ExecutionException.class, () -> parentFuture.attemptFuture.get());
- assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class);
- MutateRowsException e = (MutateRowsException) executionException.getCause();
+ MutateRowsAttemptResult result = parentFuture.attemptFuture.get();
- assertThat(e).hasMessageThat().contains("Some mutations failed to apply");
- assertThat(e.getFailedMutations()).hasSize(1);
- FailedMutation failedMutation = e.getFailedMutations().get(0);
+ assertThat(result.getFailedMutations()).hasSize(1);
+ FailedMutation failedMutation = result.getFailedMutations().get(0);
assertThat(failedMutation.getIndex()).isEqualTo(1);
assertThat(failedMutation.getError())
.hasMessageThat()
@@ -163,7 +161,7 @@ public void testNoRpcTimeout() {
}
@Test
- public void mixedTest() {
+ public void mixedTest() throws Exception {
// Setup the request & response
MutateRowsRequest request =
MutateRowsRequest.newBuilder()
@@ -194,20 +192,11 @@ public void mixedTest() {
// Make the only call
attemptCallable.call();
- // Overall error expectations
- Throwable actualError = null;
- try {
- parentFuture.attemptFuture.get();
- } catch (Throwable t) {
- actualError = t.getCause();
- }
-
- assertThat(actualError).isInstanceOf(MutateRowsException.class);
- assertThat(((MutateRowsException) actualError).isRetryable()).isTrue();
+ MutateRowsAttemptResult result = parentFuture.attemptFuture.get();
// Entry expectations
@SuppressWarnings("ConstantConditions")
- List failedMutations = ((MutateRowsException) actualError).getFailedMutations();
+ List failedMutations = result.getFailedMutations();
assertThat(failedMutations).hasSize(2);
assertThat(failedMutations.get(0).getIndex()).isEqualTo(1);
@@ -222,7 +211,7 @@ public void mixedTest() {
}
@Test
- public void nextAttemptTest() {
+ public void nextAttemptTest() throws Exception {
// Setup the request & response for the first call
MutateRowsRequest request =
MutateRowsRequest.newBuilder()
@@ -267,19 +256,11 @@ public void nextAttemptTest() {
assertThat(innerCallable.lastRequest.getEntries(0).getRowKey())
.isEqualTo(ByteString.copyFromUtf8("1-unavailable"));
- // Overall error expectations
- Throwable actualError = null;
- try {
- parentFuture.attemptFuture.get();
- } catch (Throwable t) {
- actualError = t.getCause();
- }
- assertThat(actualError).isInstanceOf(MutateRowsException.class);
- assertThat(((MutateRowsException) actualError).isRetryable()).isFalse();
+ MutateRowsAttemptResult result = parentFuture.attemptFuture.get();
// Entry expectations
@SuppressWarnings("ConstantConditions")
- List failedMutations = ((MutateRowsException) actualError).getFailedMutations();
+ List failedMutations = result.getFailedMutations();
assertThat(failedMutations).hasSize(1);
assertThat(failedMutations.get(0).getIndex()).isEqualTo(2);
@@ -411,8 +392,9 @@ public ApiFuture> futureCall(
}
}
- static class MockRetryingFuture extends AbstractApiFuture implements RetryingFuture {
- ApiFuture attemptFuture;
+ static class MockRetryingFuture extends AbstractApiFuture
+ implements RetryingFuture {
+ ApiFuture attemptFuture;
TimedAttemptSettings timedAttemptSettings;
@@ -433,7 +415,7 @@ static class MockRetryingFuture extends AbstractApiFuture implements Retry
}
@Override
- public void setAttemptFuture(ApiFuture attemptFuture) {
+ public void setAttemptFuture(ApiFuture attemptFuture) {
this.attemptFuture = attemptFuture;
}
@@ -443,17 +425,17 @@ public TimedAttemptSettings getAttemptSettings() {
}
@Override
- public Callable getCallable() {
+ public Callable getCallable() {
throw new UnsupportedOperationException("not used");
}
@Override
- public ApiFuture peekAttemptResult() {
+ public ApiFuture peekAttemptResult() {
throw new UnsupportedOperationException("not used");
}
@Override
- public ApiFuture getAttemptResult() {
+ public ApiFuture getAttemptResult() {
throw new UnsupportedOperationException("not used");
}
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java
index 237444ba84..c5f11d91d5 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java
@@ -16,21 +16,26 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchEntry;
import com.google.api.gax.batching.BatchResource;
import com.google.api.gax.batching.BatchingRequestBuilder;
import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.DeadlineExceededException;
+import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
+import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.data.v2.models.Mutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
@@ -92,11 +97,58 @@ public void splitResponseTest() {
assertThat(batchResponse.get(1).getResultFuture().isDone()).isFalse();
MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
- underTest.splitResponse(null, batchResponse);
+ underTest.splitResponse(MutateRowsAttemptResult.success(), batchResponse);
assertThat(batchResponse.get(0).getResultFuture().isDone()).isTrue();
assertThat(batchResponse.get(1).getResultFuture().isDone()).isTrue();
}
+ @Test
+ public void splitResponsePartialErrorsTest() {
+ BatchEntry batchEntry1 =
+ BatchEntry.create(
+ RowMutationEntry.create("key1").deleteRow(), SettableApiFuture.create());
+ BatchEntry batchEntry2 =
+ BatchEntry.create(
+ RowMutationEntry.create("key2").deleteRow(), SettableApiFuture.create());
+
+ List> batchResponse =
+ ImmutableList.of(batchEntry1, batchEntry2);
+ assertThat(batchResponse.get(0).getResultFuture().isDone()).isFalse();
+ assertThat(batchResponse.get(1).getResultFuture().isDone()).isFalse();
+
+ MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
+ underTest.splitResponse(
+ MutateRowsAttemptResult.create(
+ Arrays.asList(
+ FailedMutation.create(
+ 0,
+ ApiExceptionFactory.createException(
+ "error message",
+ null,
+ GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL),
+ false))),
+ true),
+ batchResponse);
+ assertThat(batchResponse.get(0).getResultFuture().isDone()).isTrue();
+ assertThat(batchResponse.get(1).getResultFuture().isDone()).isTrue();
+
+ Throwable unexpectedError = null;
+ try {
+ batchResponse.get(1).getResultFuture().get();
+
+ } catch (Throwable t) {
+ unexpectedError = t;
+ }
+ assertThat(unexpectedError).isNull();
+
+ Throwable actualError =
+ assertThrows(ExecutionException.class, () -> batchResponse.get(0).getResultFuture().get())
+ .getCause();
+
+ assertThat(actualError).isInstanceOf(InternalException.class);
+ assertThat(actualError).hasMessageThat().contains("error message");
+ }
+
@Test
public void splitExceptionTest() {
BatchEntry batchEntry1 =
@@ -140,6 +192,7 @@ public void splitExceptionWithFailedMutationsTest() {
MutateRowsException serverError =
MutateRowsException.create(
null,
+ GrpcStatusCode.of(Status.Code.UNAVAILABLE),
ImmutableList.of(
MutateRowsException.FailedMutation.create(
0,
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsErrorConverterUnaryCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsErrorConverterUnaryCallableTest.java
new file mode 100644
index 0000000000..170aa66188
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsErrorConverterUnaryCallableTest.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.mutaterows;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiExceptionFactory;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
+import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
+import com.google.cloud.bigtable.data.v2.stub.MutateRowsErrorConverterUnaryCallable;
+import java.util.Arrays;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class MutateRowsErrorConverterUnaryCallableTest {
+ @Mock private UnaryCallable innerCallable;
+ @Captor private ArgumentCaptor innerMutation;
+ private SettableApiFuture innerResult;
+
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Before
+ public void setUp() {
+ innerResult = SettableApiFuture.create();
+ Mockito.when(innerCallable.futureCall(innerMutation.capture(), Mockito.any()))
+ .thenReturn(innerResult);
+ }
+
+ @Test
+ public void testSuccess() {
+ MutateRowsErrorConverterUnaryCallable callable =
+ new MutateRowsErrorConverterUnaryCallable(innerCallable);
+
+ innerResult.set(MutateRowsAttemptResult.success());
+
+ Throwable unexpectedError = null;
+ try {
+ callable.call(BulkMutation.create("fake-table"));
+ } catch (Throwable t) {
+ unexpectedError = t;
+ }
+ assertThat(unexpectedError).isNull();
+ }
+
+ @Test
+ public void testPartialFailure() {
+ MutateRowsErrorConverterUnaryCallable callable =
+ new MutateRowsErrorConverterUnaryCallable(innerCallable);
+
+ innerResult.set(
+ MutateRowsAttemptResult.create(
+ Arrays.asList(
+ FailedMutation.create(
+ 0,
+ ApiExceptionFactory.createException(
+ null, GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL), false))),
+ true));
+
+ MutateRowsException exception =
+ Assert.assertThrows(
+ MutateRowsException.class, () -> callable.call(BulkMutation.create("fake-table")));
+
+ assertThat(exception).isInstanceOf(MutateRowsException.class);
+ assertThat((exception).isRetryable()).isTrue();
+ }
+
+ @Test
+ public void testRPCFailure() {
+ MutateRowsErrorConverterUnaryCallable callable =
+ new MutateRowsErrorConverterUnaryCallable(innerCallable);
+
+ innerResult.setException(new Exception("RPC error"));
+
+ Exception exception =
+ Assert.assertThrows(
+ Exception.class, () -> callable.call(BulkMutation.create("fake-table")));
+
+ assertThat(exception).isInstanceOf(Exception.class);
+ }
+}