Skip to content
Permalink
Browse files
feat: introducing bulk read API through Batcher (#99)
* feat: introducing bulk read API through Batcher

This change introduces BulkReadAPI on BigtableDataClient. This operation
accepts row keys in a batch mode and behind the scene fetch rows based
on configurable batches.

* Added Query.clone test case and some formatting changes

* Address feedback comments

* Address more feedback comments

* Updated QueryTest with asserts

* Moved TODO into getDefaultChannelPoolSize()

* Minor changes to address feedback changes
  • Loading branch information
rahulKQL authored and igorbernstein2 committed Jan 16, 2020
1 parent b375e87 commit e87179ebe1e53b7962a940a9aba761da8047e7e4
@@ -26,6 +26,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
@@ -936,6 +937,85 @@ public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String ta
return stub.newMutateRowsBatcher(tableId);
}

/**
* Reads rows for given tableId in a batch. If the row does not exist, the value will be null.
* This operation should be called with in a single thread.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]")) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // [Optional] Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
* batcher.flush();
* }
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
* pending batches until its resolved.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(String tableId) {
return newBulkReadRowsBatcher(tableId, null);
}

/**
* Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
* value will be null. This operation should be called with in a single thread.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
*
* // Build the filter expression
* Filter filter = FILTERS.chain()
* .filter(FILTERS.key().regex("prefix.*"))
* .filter(FILTERS.limit().cellsPerRow(10));
*
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]", filter)) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // [Optional] Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
* batcher.flush();
* }
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
* pending batches until its resolved.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter) {
Query query = Query.create(tableId);
if (filter != null) {
query = query.filter(filter);
}
return stub.newBulkReadRowsBatcher(query);
}

/**
* Convenience method to mutate multiple rows in a batch. Each individual row is mutated
* atomically as in MutateRow, but the entire batch is not executed atomically. This method
@@ -274,6 +274,12 @@ public static Query fromProto(@Nonnull ReadRowsRequest request) {
return query;
}

public Query clone() {
Query query = Query.create(tableId);
query.builder = this.builder.clone();
return query;
}

private static ByteString wrapKey(String key) {
if (key == null) {
return null;
@@ -0,0 +1,150 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.batching.BatchingCallSettings;
import com.google.api.gax.batching.BatchingDescriptor;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;

/**
* This settings holds the batching thresholds as well as retry configuration for bulk read API.
*
* <p>Sample configuration:
*
* <pre>{@code
* BigtableBulkReadRowsCallSettings defaultBulkReadCallSettings =
* bigtableDataCallSettings.getStubSettings().bulkReadRowsSettings();
*
* BigtableBulkReadRowsCallSettings customBulkReadCallSettings = defaultBulkReadCallSettings
* .toBuilder()
* .setBatchingSettings(
* defaultBulkReadCallSettings.getBatchingSettings().toBuilder()
* .setDelayThreshold(Duration.ofSeconds(10))
* .build())
* .setRetryableCodes(Code.DEADLINE_EXCEEDED)
* .build();
* }</pre>
*
* @see BatchingSettings for batching thresholds explantion.
* @see RetrySettings for retry configuration.
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public class BigtableBulkReadRowsCallSettings extends UnaryCallSettings<Query, List<Row>> {

private final BatchingCallSettings<ByteString, Row, Query, List<Row>> batchingCallSettings;

private BigtableBulkReadRowsCallSettings(Builder builder) {
super(builder);
batchingCallSettings =
BatchingCallSettings.newBuilder(builder.batchingDescriptor)
.setBatchingSettings(builder.batchingSettings)
.setRetrySettings(builder.getRetrySettings())
.setRetryableCodes(builder.getRetryableCodes())
.build();
}

/** Returns batching settings which contains multiple batch threshold levels. */
public BatchingSettings getBatchingSettings() {
return batchingCallSettings.getBatchingSettings();
}

/** Returns an adapter that packs and unpacks batching elements. */
BatchingDescriptor<ByteString, Row, Query, List<Row>> getBatchingDescriptor() {
return batchingCallSettings.getBatchingDescriptor();
}

static BigtableBulkReadRowsCallSettings.Builder newBuilder(
BatchingDescriptor<ByteString, Row, Query, List<Row>> batchingDescriptor) {
return new Builder(batchingDescriptor);
}

/**
* Get a builder with the same values as this object. See the class documentation of {@link
* BigtableBatchingCallSettings} for a sample settings configuration.
*/
@Override
public final BigtableBulkReadRowsCallSettings.Builder toBuilder() {
return new BigtableBulkReadRowsCallSettings.Builder(this);
}

public static class Builder extends UnaryCallSettings.Builder<Query, List<Row>> {

private BatchingDescriptor<ByteString, Row, Query, List<Row>> batchingDescriptor;
private BatchingSettings batchingSettings;

private Builder(
@Nonnull BatchingDescriptor<ByteString, Row, Query, List<Row>> batchingDescriptor) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null");
}

private Builder(@Nonnull BigtableBulkReadRowsCallSettings settings) {
super(settings);
this.batchingDescriptor = settings.getBatchingDescriptor();
this.batchingSettings = settings.getBatchingSettings();
}

/** Sets the batching settings with various thresholds. */
public Builder setBatchingSettings(@Nonnull BatchingSettings batchingSettings) {
Preconditions.checkNotNull(batchingSettings, "batching settings can't be null");
this.batchingSettings = batchingSettings;
return this;
}

/** Returns the {@link BatchingSettings}. */
public BatchingSettings getBatchingSettings() {
return batchingSettings;
}

/** Sets the rpc failure {@link StatusCode.Code code}, for which retries should be performed. */
@Override
public Builder setRetryableCodes(StatusCode.Code... codes) {
super.setRetryableCodes(codes);
return this;
}

/** Sets the rpc failure {@link StatusCode.Code code}, for which retries should be performed. */
@Override
public Builder setRetryableCodes(Set<StatusCode.Code> retryableCodes) {
super.setRetryableCodes(retryableCodes);
return this;
}

/** Sets the {@link RetrySettings} values for each retry attempts. */
@Override
public Builder setRetrySettings(@Nonnull RetrySettings retrySettings) {
super.setRetrySettings(retrySettings);
return this;
}

/** Builds the {@link BigtableBulkReadRowsCallSettings} object with provided configuration. */
@Override
public BigtableBulkReadRowsCallSettings build() {
return new BigtableBulkReadRowsCallSettings(this);
}
}
}
@@ -52,12 +52,15 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.tracing.WrappedTracerFactory;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.Tagger;
@@ -381,9 +384,8 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
}

/**
* Creates a {@link com.google.api.gax.batching.BatcherImpl} to handle {@link
* MutateRowsRequest.Entry} mutations. This is meant to be used for automatic batching with flow
* control.
* Creates a {@link BatcherImpl} to handle {@link MutateRowsRequest.Entry} mutations. This is
* meant to be used for automatic batching with flow control.
*
* <ul>
* <li>Uses {@link MutateRowsBatchingDescriptor} to spool the {@link RowMutationEntry} mutations
@@ -409,6 +411,31 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tabl
clientContext.getExecutor());
}

/**
* Creates a {@link BatcherImpl} to handle {@link Query#rowKey(String)}. This is meant for bulk
* read with flow control.
*
* <ul>
* <li>Uses {@link ReadRowsBatchingDescriptor} to merge the row-keys and send them out as {@link
* Query}.
* <li>Uses {@link #readRowsCallable()} to perform RPC.
* <li>Batching thresholds can be configured from {@link
* EnhancedBigtableStubSettings#bulkReadRowsSettings()}.
* <li>Schedule retries for retryable exceptions until there are no more entries or there are no
* more retry attempts left.
* <li>Split the responses using {@link ReadRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
Preconditions.checkNotNull(query, "query cannot be null");
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
readRowsCallable().all(),
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
}

/**
* Internal helper to create the base MutateRows callable chain. The chain is responsible for
* retrying individual entry in case of error.

0 comments on commit e87179e

Please sign in to comment.