diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index 36c2930bd..b049219a9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -35,6 +35,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import com.google.rpc.Code; import java.util.List; @@ -263,9 +264,12 @@ private void handleAttemptSuccess(List responses) { Builder builder = lastRequest.toBuilder().clearEntries(); List newOriginalIndexes = Lists.newArrayList(); + boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()]; for (MutateRowsResponse response : responses) { for (Entry entry : response.getEntriesList()) { + seenIndices[Ints.checkedCast(entry.getIndex())] = true; + if (entry.getStatus().getCode() == Code.OK_VALUE) { continue; } @@ -288,6 +292,26 @@ private void handleAttemptSuccess(List responses) { } } + // Handle missing mutations + for (int i = 0; i < seenIndices.length; i++) { + if (seenIndices[i]) { + continue; + } + + int origIndex = getOriginalIndex(i); + FailedMutation failedMutation = + FailedMutation.create( + origIndex, + ApiExceptionFactory.createException( + "Missing entry response for entry " + origIndex, + null, + GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL), + false)); + + allFailures.add(failedMutation); + permanentFailures.add(failedMutation); + } + currentRequest = builder.build(); originalIndexes = newOriginalIndexes; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java index e783352bf..d8e3402b8 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java @@ -403,7 +403,11 @@ public void mutateRow(MutateRowRequest request, StreamObserver observer) { - observer.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i)); + } + observer.onNext(builder.build()); observer.onCompleted(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index c7a47942f..c2be1ea0f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -648,7 +648,11 @@ public void mutateRows( Thread.sleep(SERVER_LATENCY); } catch (InterruptedException e) { } - responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index da3dd0770..da989b65d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -422,10 +422,15 @@ public void testBatchMutateRowsThrottledTime() throws Exception { new Answer() { @Override public Object answer(InvocationOnMock invocation) { + MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0]; @SuppressWarnings("unchecked") StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; - observer.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + observer.onNext(builder.build()); observer.onCompleted(); return null; } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java index 538d4fc24..88a874b8c 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java @@ -223,7 +223,11 @@ public void mutateRows(MutateRowsRequest request, StreamObserver parentFuture.attemptFuture.get()); + assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class); + MutateRowsException e = (MutateRowsException) executionException.getCause(); + + assertThat(e).hasMessageThat().contains("Some mutations failed to apply"); + assertThat(e.getFailedMutations()).hasSize(1); + FailedMutation failedMutation = e.getFailedMutations().get(0); + assertThat(failedMutation.getIndex()).isEqualTo(1); + assertThat(failedMutation.getError()) + .hasMessageThat() + .contains("Missing entry response for entry 1"); + } + @Test public void testNoRpcTimeout() { parentFuture.timedAttemptSettings = diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java index 5d15dd521..86a94d34e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java @@ -107,7 +107,11 @@ public void mutateRows( MutateRowsRequest request, StreamObserver responseObserver) { attemptCounter.incrementAndGet(); if (expectations.isEmpty()) { - responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } else { Exception expectedRpc = expectations.poll();