Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.google.cloud.bigtable.grpc.async.BulkMutation;
import com.google.cloud.bigtable.grpc.async.BulkRead;
import com.google.cloud.bigtable.grpc.async.ResourceLimiter;
import com.google.cloud.bigtable.grpc.async.OperationAccountant;
import com.google.cloud.bigtable.grpc.io.ChannelPool;
import com.google.cloud.bigtable.grpc.io.CredentialInterceptorCache;
import com.google.cloud.bigtable.grpc.io.GoogleCloudResourcePrefixInterceptor;
Expand Down Expand Up @@ -333,7 +332,7 @@ public BigtableDataClient getDataClient() {
* @return a {@link com.google.cloud.bigtable.grpc.async.AsyncExecutor} object.
*/
public AsyncExecutor createAsyncExecutor() {
return new AsyncExecutor(dataClient, new OperationAccountant(resourceLimiter));
return new AsyncExecutor(dataClient, resourceLimiter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigtable.config.Logger;
import com.google.cloud.bigtable.grpc.BigtableDataClient;
import com.google.cloud.bigtable.grpc.scanner.FlatRow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -125,26 +126,48 @@ public ListenableFuture<List<FlatRow>> call(BigtableDataClient client, ReadRowsR

private final BigtableDataClient client;
private final OperationAccountant operationsAccountant;
private final ResourceLimiter resourceLimiter;

/**
* <p>Constructor for AsyncExecutor.</p>
*
* @param client a {@link com.google.cloud.bigtable.grpc.BigtableDataClient} object.
* @param operationAccountant a {@link com.google.cloud.bigtable.grpc.async.OperationAccountant} object.
* <p>
* Constructor for AsyncExecutor.
* </p>
* @param client a {@link com.google.cloud.bigtable.grpc.BigtableDataClient} object for executing
* RPCs.
* @param resourceLimiter a {@link ResourceLimiter} for ensuring that we don't run too many RPCs
* globally.
*/
public AsyncExecutor(BigtableDataClient client, ResourceLimiter resourceLimiter) {
this(client, resourceLimiter, new OperationAccountant());
}

/**
* <p>
* Constructor for AsyncExecutor.
* </p>
* @param client a {@link com.google.cloud.bigtable.grpc.BigtableDataClient} object for executing
* RPCs.
* @param resourceLimiter a {@link ResourceLimiter} for ensuring that we don't run too many RPCs
* globally.
* @param operationAccountant a {@link com.google.cloud.bigtable.grpc.async.OperationAccountant}
* object for tracking the RPCs initiated by this instance.
*/
public AsyncExecutor(BigtableDataClient client, OperationAccountant operationAccountant) {
@VisibleForTesting
AsyncExecutor(BigtableDataClient client, ResourceLimiter resourceLimiter,
OperationAccountant operationAccountant) {
this.client = client;
this.resourceLimiter = resourceLimiter;
this.operationsAccountant = operationAccountant;
}

/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowAsync(MutateRowRequest)} on the
* {@link com.google.bigtable.v2.MutateRowRequest} given an operationId generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}.
* {@link #registerOperation(long)}.
*
* @param request The {@link com.google.bigtable.v2.MutateRowRequest} to send.
* @param operationId The Id generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when
* {@link #registerOperation(int)} that will be released when
* the mutate operation is completed.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
*/
Expand All @@ -156,11 +179,11 @@ public ListenableFuture<MutateRowResponse> mutateRowAsync(MutateRowRequest reque
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowsAsync(MutateRowsRequest)} on the
* {@link com.google.bigtable.v2.MutateRowsRequest} given an operationId generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}.
* {@link #registerOperation(long)}.
*
* @param request The {@link com.google.bigtable.v2.MutateRowsRequest} to send.
* @param operationId The Id generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when
* {@link #registerOperation(long)} that will be released when
* the mutate operation is completed.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
*/
Expand All @@ -172,11 +195,11 @@ public ListenableFuture<List<MutateRowsResponse>> mutateRowsAsync(MutateRowsRequ
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#checkAndMutateRowAsync(CheckAndMutateRowRequest)} on the
* {@link com.google.bigtable.v2.CheckAndMutateRowRequest} given an operationId generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}.
* {@link #registerOperation(long)}.
*
* @param request The {@link com.google.bigtable.v2.CheckAndMutateRowRequest} to send.
* @param operationId The Id generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when
* {@link #registerOperation(long)} that will be released when
* the checkAndMutateRow operation is completed.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
*/
Expand All @@ -188,11 +211,11 @@ public ListenableFuture<CheckAndMutateRowResponse> checkAndMutateRowAsync(
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readModifyWriteRowAsync(ReadModifyWriteRowRequest)} on the
* {@link com.google.bigtable.v2.ReadModifyWriteRowRequest} given an operationId generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}.
* {@link #registerOperation(long)}.
*
* @param request The {@link com.google.bigtable.v2.ReadModifyWriteRowRequest} to send.
* @param operationId The Id generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when
* {@link #registerOperation(long)} that will be released when
* the readModifyWriteRowAsync operation is completed.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
*/
Expand All @@ -204,7 +227,7 @@ public ListenableFuture<CheckAndMutateRowResponse> checkAndMutateRowAsync(
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readRowsAsync(ReadRowsRequest)} on the
* {@link com.google.bigtable.v2.ReadRowsRequest} given an operationId generated from
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}.
* {@link #registerOperation(long)}.
*
* @param request The {@link com.google.bigtable.v2.ReadRowsRequest} to send.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
Expand All @@ -217,7 +240,7 @@ public ListenableFuture<List<Row>> readRowsAsync(ReadRowsRequest request, long o
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowAsync(MutateRowRequest)} on the
* {@link com.google.bigtable.v2.MutateRowRequest}. This method may block if
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks.
* {@link #registerOperation(long)} blocks.
*
* @param request The {@link com.google.bigtable.v2.MutateRowRequest} to send.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
Expand All @@ -231,7 +254,7 @@ public ListenableFuture<MutateRowResponse> mutateRowAsync(MutateRowRequest reque
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowsAsync(MutateRowsRequest)} on the
* {@link com.google.bigtable.v2.MutateRowsRequest}. This method may block if
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks.
* {@link #registerOperation(long)} blocks.
*
* @param request The {@link com.google.bigtable.v2.MutateRowRequest} to send.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
Expand All @@ -245,7 +268,7 @@ public ListenableFuture<List<MutateRowsResponse>> mutateRowsAsync(MutateRowsRequ
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#checkAndMutateRowAsync(CheckAndMutateRowRequest)} on the
* {@link com.google.bigtable.v2.CheckAndMutateRowRequest}. This method may block if
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks.
* {@link #registerOperation(long)} blocks.
*
* @param request The {@link com.google.bigtable.v2.CheckAndMutateRowRequest} to send.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
Expand All @@ -259,7 +282,7 @@ public ListenableFuture<CheckAndMutateRowResponse> checkAndMutateRowAsync(
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readModifyWriteRow(ReadModifyWriteRowRequest)} on the
* {@link com.google.bigtable.v2.ReadModifyWriteRowRequest}. This method may block if
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks.
* {@link #registerOperation(long)} blocks.
*
* @param request The {@link com.google.bigtable.v2.ReadModifyWriteRowRequest} to send.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
Expand All @@ -273,7 +296,7 @@ public ListenableFuture<CheckAndMutateRowResponse> checkAndMutateRowAsync(
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readRowsAsync(ReadRowsRequest)} on the
* {@link com.google.bigtable.v2.ReadRowsRequest}. This method may block if
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks.
* {@link #registerOperation(long)} blocks.
*
* @param request The {@link com.google.bigtable.v2.ReadRowsRequest} to send.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
Expand All @@ -288,7 +311,7 @@ public ListenableFuture<List<Row>> readRowsAsync(ReadRowsRequest request)
/**
* Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readRowsAsync(ReadRowsRequest)} on the
* {@link com.google.bigtable.v2.ReadRowsRequest}. This method may block if
* {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks.
* {@link #registerOperation(long)} blocks.
*
* @param request The {@link com.google.bigtable.v2.ReadRowsRequest} to send.
* @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events.
Expand All @@ -303,8 +326,17 @@ private <RequestT extends GeneratedMessageV3, ResponseT> ListenableFuture<Respon
AsyncCall<RequestT, ResponseT> rpc, RequestT request) throws InterruptedException {
// Wait until both the memory and rpc count maximum requirements are achieved before getting a
// unique id used to track this request.
long id = operationsAccountant.registerOperationWithHeapSize(request.getSerializedSize());
return call(rpc, request, id);
return call(rpc, request, registerOperation(request));
}

public long registerOperation(GeneratedMessageV3 request) throws InterruptedException {
return registerOperation(request.getSerializedSize());
}

public long registerOperation(long size) throws InterruptedException {
long id = resourceLimiter.registerOperationWithHeapSize(size);
operationsAccountant.registerOperation(id);
return id;
}

private <ResponseT, RequestT> ListenableFuture<ResponseT> call(AsyncCall<RequestT, ResponseT> rpc,
Expand All @@ -318,11 +350,13 @@ private <ResponseT, RequestT> ListenableFuture<ResponseT> call(AsyncCall<Request
Futures.addCallback(future, new FutureCallback<ResponseT>() {
@Override
public void onSuccess(ResponseT result) {
resourceLimiter.markCanBeCompleted(id);
operationsAccountant.onOperationCompletion(id);
}

@Override
public void onFailure(Throwable t) {
resourceLimiter.markCanBeCompleted(id);
operationsAccountant.onOperationCompletion(id);
}
});
Expand Down Expand Up @@ -362,7 +396,7 @@ public boolean hasInflightRequests() {
* @return a long.
*/
public long getMaxHeapSize() {
return operationsAccountant.getMaxHeapSize();
return resourceLimiter.getMaxHeapSize();
}

/**
Expand All @@ -379,7 +413,7 @@ public BigtableDataClient getClient() {
*
* @return a {@link com.google.cloud.bigtable.grpc.async.OperationAccountant} object.
*/
public OperationAccountant getOperationAccountant() {
OperationAccountant getOperationAccountant() {
return operationsAccountant;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -72,6 +73,7 @@ public class BulkMutation {
static Logger LOG = new Logger(BulkMutation.class);

public static final long MAX_RPC_WAIT_TIME_NANOS = TimeUnit.MINUTES.toNanos(5);
private final AtomicLong batchIdGenerator = new AtomicLong();

private static StatusRuntimeException toException(Status status) {
io.grpc.Status grpcStatus = io.grpc.Status
Expand Down Expand Up @@ -145,12 +147,13 @@ public boolean wasSent() {

@VisibleForTesting
class Batch implements Runnable {

private final Meter mutationMeter =
BigtableClientMetrics.meter(MetricLevel.Info, "bulk-mutator.mutations.added");
private final Meter mutationRetryMeter =
BigtableClientMetrics.meter(MetricLevel.Info, "bulk-mutator.mutations.retried");

private Long retryId;
private Long batchId;
private RequestManager currentRequestManager;
private BackOff currentBackoff;
private int failedCount;
Expand Down Expand Up @@ -204,7 +207,8 @@ public void onFailure(Throwable t) {
});
}

private synchronized void handleResult(List<MutateRowsResponse> results) {
@VisibleForTesting
synchronized void handleResult(List<MutateRowsResponse> results) {
BulkMutationsStats.getInstance().markMutationsRpcCompletion(
clock.nanoTime() - currentRequestManager.lastRpcSentTimeNanos);

Expand Down Expand Up @@ -263,9 +267,9 @@ private synchronized void performFullRetry(AtomicReference<Long> backoff, Throwa
failedCount++;
if (backoffMs == BackOff.STOP) {
setFailure(
new BigtableRetriesExhaustedException("Batch #" + retryId + " Exhausted retries.", t));
new BigtableRetriesExhaustedException("Batch #" + batchId + " Exhausted retries.", t));
} else {
LOG.info("Retrying failed call for batch #%d. Failure #%d, got: %s", t, retryId, failedCount,
LOG.info("Retrying failed call for batch #%d. Failure #%d, got: %s", t, batchId, failedCount,
io.grpc.Status.fromThrowable(t));
mutationRetryMeter.mark(getRequestCount());
retryExecutorService.schedule(this, backoffMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -380,14 +384,14 @@ public synchronized void run() {
return;
}
try {
if (retryId == null) {
retryId = Long.valueOf(
asyncExecutor.getOperationAccountant().registerComplexOperation(createRetryHandler()));
if (batchId == null) {
batchId = batchIdGenerator.incrementAndGet();
asyncExecutor.getOperationAccountant().registerComplexOperation(batchId,
createRetryHandler());
}
MutateRowsRequest request = currentRequestManager.build();
long start = clock.nanoTime();
long operationId = asyncExecutor.getOperationAccountant()
.registerOperationWithHeapSize(request.getSerializedSize());
long operationId = asyncExecutor.registerOperation(request);
long now = clock.nanoTime();
BulkMutationsStats.getInstance().markThrottling(now - start);
mutateRowsFuture = asyncExecutor.mutateRowsAsync(request, operationId);
Expand All @@ -411,7 +415,7 @@ private ComplexOperationStalenessHandler createRetryHandler() {
@Override
public void performRetryIfStale() {
synchronized(Batch.this) {
// If the retryId is null, it means that the operation somehow fails partially,
// If the batchId is null, it means that the operation somehow fails partially,
// cleanup the retry.
if (currentRequestManager == null || currentRequestManager.isEmpty()) {
setRetryComplete();
Expand Down Expand Up @@ -445,12 +449,12 @@ private synchronized void setRetryComplete() {
mutateRowsFuture.cancel(true);
}
mutateRowsFuture = null;
if (retryId != null) {
asyncExecutor.getOperationAccountant().onComplexOperationCompletion(retryId);
if (batchId != null) {
asyncExecutor.getOperationAccountant().onComplexOperationCompletion(batchId);
if (failedCount > 0) {
LOG.info("Batch #%d recovered from the failure and completed.", retryId);
LOG.info("Batch #%d recovered from the failure and completed.", batchId);
}
retryId = null;
batchId = null;
}
currentRequestManager = null;
}
Expand Down
Loading