Skip to content

Commit

Permalink
fix: update the accounting of partial batch mutations (#2149)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
- [x] Rollback plan is reviewed and LGTMed
- [x] All new data plane features have a completed end to end testing plan

Fixes #1494 ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
ron-gal committed Mar 27, 2024
1 parent 6945e08 commit 4158094
Show file tree
Hide file tree
Showing 17 changed files with 631 additions and 140 deletions.
7 changes: 7 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -140,6 +140,13 @@
<method>*</method>
<to>*</to>
</difference>
<!-- change method argument type is ok because MutateRowsBatchingDescriptor is InternalApi -->
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor</className>
<method>*</method>
<to>*</to>
</difference>
<!-- Removed methods in an internal class -->
<difference>
<differenceType>7002</differenceType>
Expand Down
Expand Up @@ -33,20 +33,6 @@
* were part of that RPC.
*/
public final class MutateRowsException extends ApiException {
// Synthetic status to use for this ApiException subclass.
private static final StatusCode LOCAL_STATUS =
new StatusCode() {
@Override
public Code getCode() {
return Code.INTERNAL;
}

@Override
public Object getTransportCode() {
return null;
}
};

private final List<FailedMutation> failedMutations;

/**
Expand All @@ -56,22 +42,24 @@ public Object getTransportCode() {
@InternalApi
public static MutateRowsException create(
@Nullable Throwable rpcError,
StatusCode status,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
ErrorDetails errorDetails = null;
if (rpcError instanceof ApiException) {
errorDetails = ((ApiException) rpcError).getErrorDetails();
}

return new MutateRowsException(rpcError, failedMutations, retryable, errorDetails);
return new MutateRowsException(rpcError, status, failedMutations, retryable, errorDetails);
}

private MutateRowsException(
@Nullable Throwable rpcError,
StatusCode status,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable,
@Nullable ErrorDetails errorDetails) {
super(rpcError, LOCAL_STATUS, retryable, errorDetails);
super(rpcError, status, retryable, errorDetails);
Preconditions.checkNotNull(failedMutations);
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
this.failedMutations = failedMutations;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.util.Set;
Expand Down Expand Up @@ -57,11 +58,12 @@
* @see RetrySettings for retry configuration.
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public final class BigtableBatchingCallSettings extends UnaryCallSettings<BulkMutation, Void> {
public final class BigtableBatchingCallSettings
extends UnaryCallSettings<BulkMutation, MutateRowsAttemptResult> {

// This settings is just a simple wrapper for BatchingCallSettings to allow us to add
// additional functionality.
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, Void>
private final BatchingCallSettings<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingCallSettings;
private final boolean isLatencyBasedThrottlingEnabled;
private final Long targetRpcLatencyMs;
Expand Down Expand Up @@ -89,7 +91,8 @@ public BatchingSettings getBatchingSettings() {
}

/** Returns an adapter that packs and unpacks batching elements. */
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> getBatchingDescriptor() {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
getBatchingDescriptor() {
return batchingCallSettings.getBatchingDescriptor();
}

Expand Down Expand Up @@ -120,7 +123,8 @@ public boolean isServerInitiatedFlowControlEnabled() {
}

static Builder newBuilder(
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
return new Builder(batchingDescriptor);
}

Expand Down Expand Up @@ -148,9 +152,11 @@ public String toString() {
* A base builder class for {@link BigtableBatchingCallSettings}. See the class documentation of
* {@link BigtableBatchingCallSettings} for a description of the different values that can be set.
*/
public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void> {
public static class Builder
extends UnaryCallSettings.Builder<BulkMutation, MutateRowsAttemptResult> {

private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor;
private BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor;
private BatchingSettings batchingSettings;
private boolean isLatencyBasedThrottlingEnabled;
private Long targetRpcLatencyMs;
Expand All @@ -160,7 +166,8 @@ public static class Builder extends UnaryCallSettings.Builder<BulkMutation, Void

private Builder(
@Nonnull
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, Void> batchingDescriptor) {
BatchingDescriptor<RowMutationEntry, Void, BulkMutation, MutateRowsAttemptResult>
batchingDescriptor) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null");
}
Expand Down
Expand Up @@ -102,7 +102,9 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
Expand Down Expand Up @@ -165,7 +167,8 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> bulkMutateRowsCallable;
private final UnaryCallable<BulkMutation, Void> externalBulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
Expand Down Expand Up @@ -368,7 +371,9 @@ public EnhancedBigtableStub(
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
bulkMutateRowsCallable = createMutateRowsBaseCallable();
externalBulkMutateRowsCallable =
new MutateRowsErrorConverterUnaryCallable(bulkMutateRowsCallable);
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
generateInitialChangeStreamPartitionsCallable =
Expand Down Expand Up @@ -665,14 +670,24 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
}

/**
* Internal helper to create the base MutateRows callable chain. The chain is responsible for
* retrying individual entry in case of error.
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
* batching. The chain will:
*
* <p>NOTE: the caller is responsible for adding tracing & metrics.
* <ul>
* <li>Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
* been applied, are filtered from the next attempt. Also, any entries that failed with a
* nontransient error, are filtered from the next attempt. This will continue until there
* are no more entries or there are no more retry attempts left.
* <li>Wrap batch failures in a {@link MutateRowsAttemptResult}.
* <li>Add tracing & metrics.
* </ul>
*
* This callable returns an internal type {@link MutateRowsAttemptResult}.
*
* @see MutateRowsRetryingCallable for more details
* <p>This function should not be exposed to external users, as it could cause a data loss.
*/
private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
private UnaryCallable<BulkMutation, MutateRowsAttemptResult> createMutateRowsBaseCallable() {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowsRequest, MutateRowsResponse>newBuilder()
Expand Down Expand Up @@ -706,55 +721,38 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

BasicResultRetryAlgorithm<Void> resultRetryAlgorithm;
BasicResultRetryAlgorithm<MutateRowsAttemptResult> resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
}
MutateRowsPartialErrorRetryAlgorithm mutateRowsPartialErrorRetryAlgorithm =
new MutateRowsPartialErrorRetryAlgorithm(resultRetryAlgorithm);

RetryAlgorithm<Void> retryAlgorithm =
RetryAlgorithm<MutateRowsAttemptResult> retryAlgorithm =
new RetryAlgorithm<>(
resultRetryAlgorithm,
mutateRowsPartialErrorRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));

RetryingExecutorWithContext<Void> retryingExecutor =
RetryingExecutorWithContext<MutateRowsAttemptResult> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> baseCallable =
new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withBigtableTracer,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes(),
retryAlgorithm);
}

/**
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
* batching. The chain will:
*
* <ul>
* <li>Convert a {@link BulkMutation} into a {@link MutateRowsRequest}.
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
* been applied, are filtered from the next attempt. Also, any entries that failed with a
* nontransient error, are filtered from the next attempt. This will continue until there
* are no more entries or there are no more retry attempts left.
* <li>Wrap batch failures in a {@link
* com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
* <li>Add tracing & metrics.
* </ul>
*/
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
Expand All @@ -764,16 +762,16 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> traced =
UnaryCallable<BulkMutation, MutateRowsAttemptResult> traced =
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

Expand Down Expand Up @@ -1171,10 +1169,15 @@ public UnaryCallable<RowMutation, Void> mutateRowCallable() {
}

/**
* Returns the callable chain created in {@link #createBulkMutateRowsCallable()} ()} during stub
* Returns the callable chain created in {@link #createMutateRowsBaseCallable()} during stub
* construction.
*/
public UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable() {
return externalBulkMutateRowsCallable;
}

@InternalApi
public UnaryCallable<BulkMutation, MutateRowsAttemptResult> internalBulkMutateRowsCallable() {
return bulkMutateRowsCallable;
}

Expand Down
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsAttemptResult;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Status;

/**
* This callable converts partial batch failures into an exception. This is necessary to make sure
* that the caller properly handles issues and avoids possible data loss on partial failures
*/
@InternalApi
public class MutateRowsErrorConverterUnaryCallable extends UnaryCallable<BulkMutation, Void> {

private final UnaryCallable<BulkMutation, MutateRowsAttemptResult> innerCallable;

public MutateRowsErrorConverterUnaryCallable(
UnaryCallable<BulkMutation, MutateRowsAttemptResult> callable) {
this.innerCallable = callable;
}

@Override
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
ApiFuture<MutateRowsAttemptResult> future = innerCallable.futureCall(request, context);
return ApiFutures.transform(
future,
result -> {
if (!result.getFailedMutations().isEmpty()) {
throw MutateRowsException.create(
null,
GrpcStatusCode.of(Status.Code.OK),
result.getFailedMutations(),
result.getIsRetryable());
}
return null;
},
MoreExecutors.directExecutor());
}
}
Expand Up @@ -30,18 +30,22 @@
* applications.
*/
@InternalApi
public final class BulkMutateRowsUserFacingCallable extends UnaryCallable<BulkMutation, Void> {
private final UnaryCallable<MutateRowsRequest, Void> innerCallable;
public final class BulkMutateRowsUserFacingCallable
extends UnaryCallable<BulkMutation, MutateRowsAttemptResult> {

private final UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable;
private final RequestContext requestContext;

public BulkMutateRowsUserFacingCallable(
UnaryCallable<MutateRowsRequest, Void> innerCallable, RequestContext requestContext) {
UnaryCallable<MutateRowsRequest, MutateRowsAttemptResult> innerCallable,
RequestContext requestContext) {
this.innerCallable = innerCallable;
this.requestContext = requestContext;
}

@Override
public ApiFuture<Void> futureCall(BulkMutation request, ApiCallContext context) {
public ApiFuture<MutateRowsAttemptResult> futureCall(
BulkMutation request, ApiCallContext context) {
return innerCallable.futureCall(request.toProto(requestContext), context);
}
}

0 comments on commit 4158094

Please sign in to comment.