Skip to content

Commit

Permalink
fix: check that all bulk mutation entries are accounted for (#1907)
Browse files Browse the repository at this point in the history
Add a fail safe that marks missing entries in a response as permanent errors. Previously the client assumed that all entries were present and only looked for errors

Change-Id: Ie3f294fd6bb19ec17662b58bfe9c75a3eed81097

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
igorbernstein2 committed Sep 12, 2023
1 parent 6208c90 commit 9ad8a00
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 5 deletions.
Expand Up @@ -35,6 +35,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.rpc.Code;
import java.util.List;
Expand Down Expand Up @@ -263,9 +264,12 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {

Builder builder = lastRequest.toBuilder().clearEntries();
List<Integer> newOriginalIndexes = Lists.newArrayList();
boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()];

for (MutateRowsResponse response : responses) {
for (Entry entry : response.getEntriesList()) {
seenIndices[Ints.checkedCast(entry.getIndex())] = true;

if (entry.getStatus().getCode() == Code.OK_VALUE) {
continue;
}
Expand All @@ -288,6 +292,26 @@ private void handleAttemptSuccess(List<MutateRowsResponse> responses) {
}
}

// Handle missing mutations
for (int i = 0; i < seenIndices.length; i++) {
if (seenIndices[i]) {
continue;
}

int origIndex = getOriginalIndex(i);
FailedMutation failedMutation =
FailedMutation.create(
origIndex,
ApiExceptionFactory.createException(
"Missing entry response for entry " + origIndex,
null,
GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL),
false));

allFailures.add(failedMutation);
permanentFailures.add(failedMutation);
}

currentRequest = builder.build();
originalIndexes = newOriginalIndexes;

Expand Down
Expand Up @@ -403,7 +403,11 @@ public void mutateRow(MutateRowRequest request, StreamObserver<MutateRowResponse

@Override
public void mutateRows(MutateRowsRequest request, StreamObserver<MutateRowsResponse> observer) {
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i));
}
observer.onNext(builder.build());
observer.onCompleted();
}

Expand Down
Expand Up @@ -648,7 +648,11 @@ public void mutateRows(
Thread.sleep(SERVER_LATENCY);
} catch (InterruptedException e) {
}
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

Expand Down
Expand Up @@ -422,10 +422,15 @@ public void testBatchMutateRowsThrottledTime() throws Exception {
new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
StreamObserver<MutateRowsResponse> observer =
(StreamObserver<MutateRowsResponse>) invocation.getArguments()[1];
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
observer.onNext(builder.build());
observer.onCompleted();
return null;
}
Expand Down
Expand Up @@ -223,7 +223,11 @@ public void mutateRows(MutateRowsRequest request, StreamObserver<MutateRowsRespo
observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
return;
}
observer.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
observer.onNext(builder.build());
observer.onCompleted();
}

Expand Down
Expand Up @@ -41,6 +41,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -92,6 +94,37 @@ public void singleEntrySuccessTest() throws Exception {
assertThat(innerCallable.lastRequest).isEqualTo(request);
}

@Test
public void missingEntry() {
MutateRowsRequest request =
MutateRowsRequest.newBuilder()
.addEntries(Entry.getDefaultInstance())
.addEntries(Entry.getDefaultInstance())
.build();
innerCallable.response.add(
MutateRowsResponse.newBuilder()
.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(0))
.build());

MutateRowsAttemptCallable attemptCallable =
new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes);
attemptCallable.setExternalFuture(parentFuture);
attemptCallable.call();

ExecutionException executionException =
Assert.assertThrows(ExecutionException.class, () -> parentFuture.attemptFuture.get());
assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class);
MutateRowsException e = (MutateRowsException) executionException.getCause();

assertThat(e).hasMessageThat().contains("Some mutations failed to apply");
assertThat(e.getFailedMutations()).hasSize(1);
FailedMutation failedMutation = e.getFailedMutations().get(0);
assertThat(failedMutation.getIndex()).isEqualTo(1);
assertThat(failedMutation.getError())
.hasMessageThat()
.contains("Missing entry response for entry 1");
}

@Test
public void testNoRpcTimeout() {
parentFuture.timedAttemptSettings =
Expand Down
Expand Up @@ -107,7 +107,11 @@ public void mutateRows(
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
attemptCounter.incrementAndGet();
if (expectations.isEmpty()) {
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
for (int i = 0; i < request.getEntriesCount(); i++) {
builder.addEntriesBuilder().setIndex(i);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} else {
Exception expectedRpc = expectations.poll();
Expand Down

0 comments on commit 9ad8a00

Please sign in to comment.