From 7254bc9ae4e32e5daed392509d9a3b079a5217b2 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Fri, 19 May 2017 18:52:09 -0400 Subject: [PATCH] Removing the link between OperationAccountant and ResourceLimiter --- .../cloud/bigtable/grpc/BigtableSession.java | 3 +- .../bigtable/grpc/async/AsyncExecutor.java | 82 +++++++++++++------ .../bigtable/grpc/async/BulkMutation.java | 32 ++++---- .../grpc/async/OperationAccountant.java | 33 ++------ .../grpc/async/OperationAccountantPerf.java | 9 +- .../grpc/async/TestAsyncExecutor.java | 6 +- .../bigtable/grpc/async/TestBulkMutation.java | 4 +- .../TestBulkMutationAwaitCompletion.java | 7 +- .../grpc/async/TestOperationAccountant.java | 62 ++++++-------- .../hbase/BigtableBufferedMutator.java | 3 +- .../hbase/TestBigtableBufferedMutator.java | 9 +- 11 files changed, 126 insertions(+), 124 deletions(-) diff --git a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java index 720e71ec7d..224ed64a41 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java @@ -42,7 +42,6 @@ import com.google.cloud.bigtable.grpc.async.BulkMutation; import com.google.cloud.bigtable.grpc.async.BulkRead; import com.google.cloud.bigtable.grpc.async.ResourceLimiter; -import com.google.cloud.bigtable.grpc.async.OperationAccountant; import com.google.cloud.bigtable.grpc.io.ChannelPool; import com.google.cloud.bigtable.grpc.io.CredentialInterceptorCache; import com.google.cloud.bigtable.grpc.io.GoogleCloudResourcePrefixInterceptor; @@ -333,7 +332,7 @@ public BigtableDataClient getDataClient() { * @return a {@link com.google.cloud.bigtable.grpc.async.AsyncExecutor} object. */ public AsyncExecutor createAsyncExecutor() { - return new AsyncExecutor(dataClient, new OperationAccountant(resourceLimiter)); + return new AsyncExecutor(dataClient, resourceLimiter); } /** diff --git a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/AsyncExecutor.java b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/AsyncExecutor.java index c0027258b9..eb38d5f987 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/AsyncExecutor.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/AsyncExecutor.java @@ -31,6 +31,7 @@ import com.google.cloud.bigtable.config.Logger; import com.google.cloud.bigtable.grpc.BigtableDataClient; import com.google.cloud.bigtable.grpc.scanner.FlatRow; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -125,26 +126,48 @@ public ListenableFuture> call(BigtableDataClient client, ReadRowsR private final BigtableDataClient client; private final OperationAccountant operationsAccountant; + private final ResourceLimiter resourceLimiter; /** - *

Constructor for AsyncExecutor.

- * - * @param client a {@link com.google.cloud.bigtable.grpc.BigtableDataClient} object. - * @param operationAccountant a {@link com.google.cloud.bigtable.grpc.async.OperationAccountant} object. + *

+ * Constructor for AsyncExecutor. + *

+ * @param client a {@link com.google.cloud.bigtable.grpc.BigtableDataClient} object for executing + * RPCs. + * @param resourceLimiter a {@link ResourceLimiter} for ensuring that we don't run too many RPCs + * globally. + */ + public AsyncExecutor(BigtableDataClient client, ResourceLimiter resourceLimiter) { + this(client, resourceLimiter, new OperationAccountant()); + } + + /** + *

+ * Constructor for AsyncExecutor. + *

+ * @param client a {@link com.google.cloud.bigtable.grpc.BigtableDataClient} object for executing + * RPCs. + * @param resourceLimiter a {@link ResourceLimiter} for ensuring that we don't run too many RPCs + * globally. + * @param operationAccountant a {@link com.google.cloud.bigtable.grpc.async.OperationAccountant} + * object for tracking the RPCs initiated by this instance. */ - public AsyncExecutor(BigtableDataClient client, OperationAccountant operationAccountant) { + @VisibleForTesting + AsyncExecutor(BigtableDataClient client, ResourceLimiter resourceLimiter, + OperationAccountant operationAccountant) { this.client = client; + this.resourceLimiter = resourceLimiter; this.operationsAccountant = operationAccountant; } /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowAsync(MutateRowRequest)} on the * {@link com.google.bigtable.v2.MutateRowRequest} given an operationId generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}. + * {@link #registerOperation(long)}. * * @param request The {@link com.google.bigtable.v2.MutateRowRequest} to send. * @param operationId The Id generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when + * {@link #registerOperation(int)} that will be released when * the mutate operation is completed. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. */ @@ -156,11 +179,11 @@ public ListenableFuture mutateRowAsync(MutateRowRequest reque /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowsAsync(MutateRowsRequest)} on the * {@link com.google.bigtable.v2.MutateRowsRequest} given an operationId generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}. + * {@link #registerOperation(long)}. * * @param request The {@link com.google.bigtable.v2.MutateRowsRequest} to send. * @param operationId The Id generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when + * {@link #registerOperation(long)} that will be released when * the mutate operation is completed. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. */ @@ -172,11 +195,11 @@ public ListenableFuture> mutateRowsAsync(MutateRowsRequ /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#checkAndMutateRowAsync(CheckAndMutateRowRequest)} on the * {@link com.google.bigtable.v2.CheckAndMutateRowRequest} given an operationId generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}. + * {@link #registerOperation(long)}. * * @param request The {@link com.google.bigtable.v2.CheckAndMutateRowRequest} to send. * @param operationId The Id generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when + * {@link #registerOperation(long)} that will be released when * the checkAndMutateRow operation is completed. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. */ @@ -188,11 +211,11 @@ public ListenableFuture checkAndMutateRowAsync( /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readModifyWriteRowAsync(ReadModifyWriteRowRequest)} on the * {@link com.google.bigtable.v2.ReadModifyWriteRowRequest} given an operationId generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}. + * {@link #registerOperation(long)}. * * @param request The {@link com.google.bigtable.v2.ReadModifyWriteRowRequest} to send. * @param operationId The Id generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} that will be released when + * {@link #registerOperation(long)} that will be released when * the readModifyWriteRowAsync operation is completed. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. */ @@ -204,7 +227,7 @@ public ListenableFuture checkAndMutateRowAsync( /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readRowsAsync(ReadRowsRequest)} on the * {@link com.google.bigtable.v2.ReadRowsRequest} given an operationId generated from - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)}. + * {@link #registerOperation(long)}. * * @param request The {@link com.google.bigtable.v2.ReadRowsRequest} to send. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. @@ -217,7 +240,7 @@ public ListenableFuture> readRowsAsync(ReadRowsRequest request, long o /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowAsync(MutateRowRequest)} on the * {@link com.google.bigtable.v2.MutateRowRequest}. This method may block if - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks. + * {@link #registerOperation(long)} blocks. * * @param request The {@link com.google.bigtable.v2.MutateRowRequest} to send. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. @@ -231,7 +254,7 @@ public ListenableFuture mutateRowAsync(MutateRowRequest reque /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#mutateRowsAsync(MutateRowsRequest)} on the * {@link com.google.bigtable.v2.MutateRowsRequest}. This method may block if - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks. + * {@link #registerOperation(long)} blocks. * * @param request The {@link com.google.bigtable.v2.MutateRowRequest} to send. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. @@ -245,7 +268,7 @@ public ListenableFuture> mutateRowsAsync(MutateRowsRequ /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#checkAndMutateRowAsync(CheckAndMutateRowRequest)} on the * {@link com.google.bigtable.v2.CheckAndMutateRowRequest}. This method may block if - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks. + * {@link #registerOperation(long)} blocks. * * @param request The {@link com.google.bigtable.v2.CheckAndMutateRowRequest} to send. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. @@ -259,7 +282,7 @@ public ListenableFuture checkAndMutateRowAsync( /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readModifyWriteRow(ReadModifyWriteRowRequest)} on the * {@link com.google.bigtable.v2.ReadModifyWriteRowRequest}. This method may block if - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks. + * {@link #registerOperation(long)} blocks. * * @param request The {@link com.google.bigtable.v2.ReadModifyWriteRowRequest} to send. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. @@ -273,7 +296,7 @@ public ListenableFuture checkAndMutateRowAsync( /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readRowsAsync(ReadRowsRequest)} on the * {@link com.google.bigtable.v2.ReadRowsRequest}. This method may block if - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks. + * {@link #registerOperation(long)} blocks. * * @param request The {@link com.google.bigtable.v2.ReadRowsRequest} to send. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. @@ -288,7 +311,7 @@ public ListenableFuture> readRowsAsync(ReadRowsRequest request) /** * Performs a {@link com.google.cloud.bigtable.grpc.BigtableDataClient#readRowsAsync(ReadRowsRequest)} on the * {@link com.google.bigtable.v2.ReadRowsRequest}. This method may block if - * {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter#registerOperationWithHeapSize(long)} blocks. + * {@link #registerOperation(long)} blocks. * * @param request The {@link com.google.bigtable.v2.ReadRowsRequest} to send. * @return a {@link com.google.common.util.concurrent.ListenableFuture} which can be listened to for completion events. @@ -303,8 +326,17 @@ private ListenableFuture rpc, RequestT request) throws InterruptedException { // Wait until both the memory and rpc count maximum requirements are achieved before getting a // unique id used to track this request. - long id = operationsAccountant.registerOperationWithHeapSize(request.getSerializedSize()); - return call(rpc, request, id); + return call(rpc, request, registerOperation(request)); + } + + public long registerOperation(GeneratedMessageV3 request) throws InterruptedException { + return registerOperation(request.getSerializedSize()); + } + + public long registerOperation(long size) throws InterruptedException { + long id = resourceLimiter.registerOperationWithHeapSize(size); + operationsAccountant.registerOperation(id); + return id; } private ListenableFuture call(AsyncCall rpc, @@ -318,11 +350,13 @@ private ListenableFuture call(AsyncCall() { @Override public void onSuccess(ResponseT result) { + resourceLimiter.markCanBeCompleted(id); operationsAccountant.onOperationCompletion(id); } @Override public void onFailure(Throwable t) { + resourceLimiter.markCanBeCompleted(id); operationsAccountant.onOperationCompletion(id); } }); @@ -362,7 +396,7 @@ public boolean hasInflightRequests() { * @return a long. */ public long getMaxHeapSize() { - return operationsAccountant.getMaxHeapSize(); + return resourceLimiter.getMaxHeapSize(); } /** @@ -379,7 +413,7 @@ public BigtableDataClient getClient() { * * @return a {@link com.google.cloud.bigtable.grpc.async.OperationAccountant} object. */ - public OperationAccountant getOperationAccountant() { + OperationAccountant getOperationAccountant() { return operationsAccountant; } } diff --git a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/BulkMutation.java b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/BulkMutation.java index 9359814f64..670d460cc8 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/BulkMutation.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/BulkMutation.java @@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** @@ -72,6 +73,7 @@ public class BulkMutation { static Logger LOG = new Logger(BulkMutation.class); public static final long MAX_RPC_WAIT_TIME_NANOS = TimeUnit.MINUTES.toNanos(5); + private final AtomicLong batchIdGenerator = new AtomicLong(); private static StatusRuntimeException toException(Status status) { io.grpc.Status grpcStatus = io.grpc.Status @@ -145,12 +147,13 @@ public boolean wasSent() { @VisibleForTesting class Batch implements Runnable { + private final Meter mutationMeter = BigtableClientMetrics.meter(MetricLevel.Info, "bulk-mutator.mutations.added"); private final Meter mutationRetryMeter = BigtableClientMetrics.meter(MetricLevel.Info, "bulk-mutator.mutations.retried"); - private Long retryId; + private Long batchId; private RequestManager currentRequestManager; private BackOff currentBackoff; private int failedCount; @@ -204,7 +207,8 @@ public void onFailure(Throwable t) { }); } - private synchronized void handleResult(List results) { + @VisibleForTesting + synchronized void handleResult(List results) { BulkMutationsStats.getInstance().markMutationsRpcCompletion( clock.nanoTime() - currentRequestManager.lastRpcSentTimeNanos); @@ -263,9 +267,9 @@ private synchronized void performFullRetry(AtomicReference backoff, Throwa failedCount++; if (backoffMs == BackOff.STOP) { setFailure( - new BigtableRetriesExhaustedException("Batch #" + retryId + " Exhausted retries.", t)); + new BigtableRetriesExhaustedException("Batch #" + batchId + " Exhausted retries.", t)); } else { - LOG.info("Retrying failed call for batch #%d. Failure #%d, got: %s", t, retryId, failedCount, + LOG.info("Retrying failed call for batch #%d. Failure #%d, got: %s", t, batchId, failedCount, io.grpc.Status.fromThrowable(t)); mutationRetryMeter.mark(getRequestCount()); retryExecutorService.schedule(this, backoffMs, TimeUnit.MILLISECONDS); @@ -380,14 +384,14 @@ public synchronized void run() { return; } try { - if (retryId == null) { - retryId = Long.valueOf( - asyncExecutor.getOperationAccountant().registerComplexOperation(createRetryHandler())); + if (batchId == null) { + batchId = batchIdGenerator.incrementAndGet(); + asyncExecutor.getOperationAccountant().registerComplexOperation(batchId, + createRetryHandler()); } MutateRowsRequest request = currentRequestManager.build(); long start = clock.nanoTime(); - long operationId = asyncExecutor.getOperationAccountant() - .registerOperationWithHeapSize(request.getSerializedSize()); + long operationId = asyncExecutor.registerOperation(request); long now = clock.nanoTime(); BulkMutationsStats.getInstance().markThrottling(now - start); mutateRowsFuture = asyncExecutor.mutateRowsAsync(request, operationId); @@ -411,7 +415,7 @@ private ComplexOperationStalenessHandler createRetryHandler() { @Override public void performRetryIfStale() { synchronized(Batch.this) { - // If the retryId is null, it means that the operation somehow fails partially, + // If the batchId is null, it means that the operation somehow fails partially, // cleanup the retry. if (currentRequestManager == null || currentRequestManager.isEmpty()) { setRetryComplete(); @@ -445,12 +449,12 @@ private synchronized void setRetryComplete() { mutateRowsFuture.cancel(true); } mutateRowsFuture = null; - if (retryId != null) { - asyncExecutor.getOperationAccountant().onComplexOperationCompletion(retryId); + if (batchId != null) { + asyncExecutor.getOperationAccountant().onComplexOperationCompletion(batchId); if (failedCount > 0) { - LOG.info("Batch #%d recovered from the failure and completed.", retryId); + LOG.info("Batch #%d recovered from the failure and completed.", batchId); } - retryId = null; + batchId = null; } currentRequestManager = null; } diff --git a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/OperationAccountant.java b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/OperationAccountant.java index 9ec89f858b..5dd95cf4a8 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/OperationAccountant.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/OperationAccountant.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.GuardedBy; @@ -54,10 +53,8 @@ public static interface ComplexOperationStalenessHandler { // will still wait to complete. private static final long INTERVAL_NO_SUCCESS_WARNING_NANOS = TimeUnit.SECONDS.toNanos(30); - private final ResourceLimiter resourceLimiter; private final NanoClock clock; private final long finishWaitMillis; - private final AtomicLong complexOperationIdGenerator = new AtomicLong(); private ReentrantLock lock = new ReentrantLock(); private Condition flushedCondition = lock.newCondition(); @@ -75,13 +72,12 @@ public static interface ComplexOperationStalenessHandler { * * @param resourceLimiter a {@link com.google.cloud.bigtable.grpc.async.ResourceLimiter} object. */ - public OperationAccountant(ResourceLimiter resourceLimiter) { - this(resourceLimiter, NanoClock.SYSTEM, DEFAULT_FINISH_WAIT_MILLIS); + public OperationAccountant() { + this(NanoClock.SYSTEM, DEFAULT_FINISH_WAIT_MILLIS); } @VisibleForTesting - OperationAccountant(ResourceLimiter resourceLimiter, NanoClock clock, long finishWaitMillis) { - this.resourceLimiter = resourceLimiter; + OperationAccountant(NanoClock clock, long finishWaitMillis) { this.clock = clock; this.finishWaitMillis = finishWaitMillis; resetNoSuccessWarningDeadline(); @@ -90,14 +86,12 @@ public OperationAccountant(ResourceLimiter resourceLimiter) { /** * Register a new RPC operation. Blocks until the requested resources are available. This method * must be paired with a call to {@link #onOperationCompletion(long)}. - * @param heapSize The serialized size of the RPC + * @param id The id of the RPC * @return An operation id * @throws java.lang.InterruptedException if any. */ - public long registerOperationWithHeapSize(long heapSize) + public long registerOperation(long id) throws InterruptedException { - long id = resourceLimiter.registerOperationWithHeapSize(heapSize); - lock.lock(); try { operations.add(id); @@ -117,16 +111,13 @@ public long registerOperationWithHeapSize(long heapSize) */ // TODO: This functionality should be moved to BulkMutation where the functionality is used. The // abstraction. - public long registerComplexOperation(ComplexOperationStalenessHandler handler) { - final long id = complexOperationIdGenerator.incrementAndGet(); - + public void registerComplexOperation(long id, ComplexOperationStalenessHandler handler) { lock.lock(); try { complexOperations.put(id, handler); } finally { lock.unlock(); } - return id; } /** @@ -184,16 +175,6 @@ private void logNoSuccessWarning(long now) { noSuccessWarningCount++; } - /** - *

getMaxHeapSize.

- * - * @return The maximum allowed number of bytes across all across all outstanding RPCs - */ - public long getMaxHeapSize() { - return resourceLimiter.getMaxHeapSize(); - } - - /** *

* hasInflightRequests. @@ -232,8 +213,6 @@ int getNoSuccessWarningCount() { @VisibleForTesting void onOperationCompletion(long id) { - resourceLimiter.markCanBeCompleted(id); - lock.lock(); try { operations.remove(id); diff --git a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/OperationAccountantPerf.java b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/OperationAccountantPerf.java index 99235d2384..12e9d15499 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/OperationAccountantPerf.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/OperationAccountantPerf.java @@ -56,8 +56,8 @@ public static void main(String[] args) throws Exception { */ private static void test(ListeningExecutorService pool) throws InterruptedException, ExecutionException, TimeoutException { - final OperationAccountant underTest = - new OperationAccountant(new ResourceLimiter(SIZE, (int) SIZE)); + final ResourceLimiter resourceLimiter = new ResourceLimiter(SIZE, (int) SIZE); + final OperationAccountant underTest = new OperationAccountant(); final LinkedBlockingQueue registeredEvents = new LinkedBlockingQueue<>(); final int writerCount = 1; @@ -70,7 +70,9 @@ public void run() { int offerCount = REGISTER_COUNT / writerCount; try { for (int i = 0; i < offerCount; i++) { - registeredEvents.add(underTest.registerOperationWithHeapSize(1)); + long id = resourceLimiter.registerOperationWithHeapSize(1); + underTest.registerOperation(id); + registeredEvents.add(id); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -100,6 +102,7 @@ public void run() { if (registeredId == null) { i--; } else { + resourceLimiter.markCanBeCompleted(registeredId); underTest.onOperationCompletion(registeredId); } } diff --git a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestAsyncExecutor.java b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestAsyncExecutor.java index 54e77b73ab..d887b4eefd 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestAsyncExecutor.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestAsyncExecutor.java @@ -42,7 +42,6 @@ import com.google.cloud.bigtable.grpc.BigtableDataClient; import com.google.cloud.bigtable.grpc.async.AsyncExecutor; import com.google.cloud.bigtable.grpc.async.ResourceLimiter; -import com.google.cloud.bigtable.grpc.async.OperationAccountant; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; @@ -60,14 +59,11 @@ public class TestAsyncExecutor { private SettableFuture future; private AsyncExecutor underTest; - private OperationAccountant operationAccountant; @Before public void setUp() { MockitoAnnotations.initMocks(this); - operationAccountant = new OperationAccountant(new ResourceLimiter(1000, 10)); - - underTest = new AsyncExecutor(client, operationAccountant); + underTest = new AsyncExecutor(client, new ResourceLimiter(1000, 10)); future = SettableFuture.create(); } diff --git a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutation.java b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutation.java index bdb8849b8e..0e4ca879b8 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutation.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutation.java @@ -96,8 +96,8 @@ public void setup() throws InterruptedException { future = SettableFuture.create(); when(client.mutateRowsAsync(any(MutateRowsRequest.class))).thenReturn(future); ResourceLimiter resourceLimiter = new ResourceLimiter(1000, 10); - operationAccountant = new OperationAccountant(resourceLimiter); - asyncExecutor = new AsyncExecutor(client, operationAccountant); + operationAccountant = new OperationAccountant(); + asyncExecutor = new AsyncExecutor(client, resourceLimiter, operationAccountant); underTest = createBulkMutation(); } diff --git a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutationAwaitCompletion.java b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutationAwaitCompletion.java index a8ab9e800b..ef6b54a2b8 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutationAwaitCompletion.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestBulkMutationAwaitCompletion.java @@ -256,9 +256,10 @@ private void runOneBulkMutation() { * Creates a fully formed {@link BulkMutation} */ private BulkMutation createBulkMutation() { - OperationAccountant operationAccountant = new OperationAccountant( - resourceLimiter, clock, OperationAccountant.DEFAULT_FINISH_WAIT_MILLIS); - AsyncExecutor asyncExecutor = new AsyncExecutor(mockClient, operationAccountant); + OperationAccountant operationAccountant = + new OperationAccountant(clock, OperationAccountant.DEFAULT_FINISH_WAIT_MILLIS); + AsyncExecutor asyncExecutor = + new AsyncExecutor(mockClient, resourceLimiter, operationAccountant); BulkMutation bulkMutation = new BulkMutation( TestBulkMutation.TABLE_NAME, diff --git a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestOperationAccountant.java b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestOperationAccountant.java index b9d7631ccd..3279d3cf63 100644 --- a/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestOperationAccountant.java +++ b/bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/TestOperationAccountant.java @@ -72,10 +72,9 @@ public void setup(){ @Test public void testOnOperationCompletion() throws InterruptedException { - ResourceLimiter resourceLimiter = new ResourceLimiter(10l, 1000); - OperationAccountant underTest = new OperationAccountant(resourceLimiter); + OperationAccountant underTest = new OperationAccountant(); int size = (int) (100 * Math.random()); - long id = underTest.registerOperationWithHeapSize(size); + long id = underTest.registerOperation(size); assertTrue(underTest.hasInflightOperations()); underTest.onOperationCompletion(id); assertFalse(underTest.hasInflightOperations()); @@ -122,14 +121,14 @@ public void testFlush() throws Exception { final int registerCount = 1000; ExecutorService pool = Executors.newCachedThreadPool(); try { - final ResourceLimiter resourceLimiter = new ResourceLimiter(100l, 100); - final OperationAccountant underTest = new OperationAccountant(resourceLimiter); + final OperationAccountant underTest = new OperationAccountant(); final LinkedBlockingQueue registeredEvents = new LinkedBlockingQueue<>(); Future writeFuture = pool.submit(new Callable() { @Override public Boolean call() throws InterruptedException { - for (int i = 0; i < registerCount; i++) { - registeredEvents.offer(underTest.registerOperationWithHeapSize(i)); + for (long i = 0; i < registerCount; i++) { + underTest.registerOperation(i); + registeredEvents.offer(i); } underTest.awaitCompletion(); return true; @@ -161,8 +160,7 @@ public void testFlushWithRetries() throws Exception { final int registerCount = 1000; ExecutorService pool = Executors.newCachedThreadPool(); try { - ResourceLimiter resourceLimiter = new ResourceLimiter(100l, 100); - final OperationAccountant underTest = new OperationAccountant(resourceLimiter); + final OperationAccountant underTest = new OperationAccountant(); final AtomicBoolean allOperationsDone = new AtomicBoolean(); final LinkedBlockingQueue registeredEvents = new LinkedBlockingQueue<>(); final List> retryFutures = new ArrayList<>(); @@ -173,10 +171,11 @@ public void testFlushWithRetries() throws Exception { public void run() { try { for (int i = 0; i < registerCount; i++) { - registeredEvents.offer(underTest.registerOperationWithHeapSize(1)); + registeredEvents.offer(underTest.registerOperation(1)); // Add a retry for each rpc - final long id = underTest.registerComplexOperation(handler); + final long id = i + 10000; + underTest.registerComplexOperation(id, handler); SettableFuture future = SettableFuture.create(); Futures.addCallback(future, new FutureCallback(){ @@ -207,22 +206,18 @@ public void onFailure(Throwable t) { } } }); - Future readFuture = pool.submit(new Runnable() { + Future readFuture = pool.submit(new Callable() { @Override - public void run() { - try { - for (int i = 0; i < registerCount; i++) { - Long registeredId = registeredEvents.poll(1, TimeUnit.SECONDS); - if (registeredId == null){ - i--; - } else { - underTest.onOperationCompletion(registeredId); - } + public Void call() throws InterruptedException { + for (int i = 0; i < registerCount; i++) { + Long registeredId = registeredEvents.poll(1, TimeUnit.SECONDS); + if (registeredId == null){ + i--; + } else { + underTest.onOperationCompletion(registeredId); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); } + return null; } }); @@ -261,24 +256,19 @@ public Long answer(InvocationOnMock invocation) throws Throwable { }); long finishWaitTime = 100; - ResourceLimiter resourceLimiter = new ResourceLimiter(100l, 100); - final OperationAccountant underTest = - new OperationAccountant(resourceLimiter, clock, finishWaitTime); + final OperationAccountant underTest = new OperationAccountant(clock, finishWaitTime); - long complexOpId = underTest.registerComplexOperation(handler); + long complexOpId = 1000; + underTest.registerComplexOperation(complexOpId, handler); final int iterations = 4; ExecutorService pool = Executors.newCachedThreadPool(); try { - pool.submit(new Runnable() { + pool.submit(new Callable() { @Override - public void run() { - try { - underTest.awaitCompletion(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + public Void call() throws Exception { + underTest.awaitCompletion(); + return null; } }); // Sleep a multiple of the finish wait time to force a few iterations diff --git a/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/BigtableBufferedMutator.java b/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/BigtableBufferedMutator.java index 41189cc74d..03813a3311 100644 --- a/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/BigtableBufferedMutator.java +++ b/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/BigtableBufferedMutator.java @@ -305,8 +305,7 @@ private void offer(Mutation mutation) throws IOException { // TODO: Do this logic asynchronously. addExceptionCallback(bulkMutation.add(adapt(mutation)), mutation); } else { - long operationId = asyncExecutor.getOperationAccountant() - .registerOperationWithHeapSize(mutation.heapSize()); + long operationId = asyncExecutor.registerOperation(mutation.heapSize()); operation = new MutationOperation(mutation, operationId); if (executorService != null && bulkOptions.getAsyncMutatorCount() > 0) { initializeAsyncMutators(); diff --git a/bigtable-hbase-parent/bigtable-hbase/src/test/java/com/google/cloud/bigtable/hbase/TestBigtableBufferedMutator.java b/bigtable-hbase-parent/bigtable-hbase/src/test/java/com/google/cloud/bigtable/hbase/TestBigtableBufferedMutator.java index ab1aa53753..3aa9fa93bf 100644 --- a/bigtable-hbase-parent/bigtable-hbase/src/test/java/com/google/cloud/bigtable/hbase/TestBigtableBufferedMutator.java +++ b/bigtable-hbase-parent/bigtable-hbase/src/test/java/com/google/cloud/bigtable/hbase/TestBigtableBufferedMutator.java @@ -61,7 +61,6 @@ import com.google.cloud.bigtable.grpc.BigtableTableName; import com.google.cloud.bigtable.grpc.async.AsyncExecutor; import com.google.cloud.bigtable.grpc.async.BulkMutation; -import com.google.cloud.bigtable.grpc.async.OperationAccountant; import com.google.cloud.bigtable.grpc.async.ResourceLimiter; import com.google.cloud.bigtable.hbase.adapters.HBaseRequestAdapter; import com.google.common.util.concurrent.SettableFuture; @@ -102,11 +101,9 @@ public void setUp() { new Answer() { @Override public AsyncExecutor answer(InvocationOnMock invocation) throws Throwable { - OperationAccountant operationAccountant = - new OperationAccountant(new ResourceLimiter(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, - BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT)); - - return new AsyncExecutor(mockClient, operationAccountant); + ResourceLimiter resourceLimiter = new ResourceLimiter(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, + BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT); + return new AsyncExecutor(mockClient, resourceLimiter); } }); when(mockSession.createBulkMutation(any(BigtableTableName.class), any(AsyncExecutor.class)))