Skip to content

Commit

Permalink
Use ReadModifyWrite for Append and Increment operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
Angus Davis committed Feb 19, 2015
1 parent 8716d52 commit 46b20cc
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 293 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package com.google.cloud.bigtable.hbase;

import com.google.api.client.util.Preconditions;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowResponse;
import com.google.bigtable.v1.MutateRowRequest;
import com.google.bigtable.v1.ReadModifyWriteRowRequest;
import com.google.cloud.bigtable.hbase.adapters.AppendAdapter;
import com.google.cloud.bigtable.hbase.adapters.AppendResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.OperationAdapter;
import com.google.cloud.bigtable.hbase.adapters.ResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowMutationsAdapter;
import com.google.cloud.hadoop.hbase.BigtableClient;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -131,9 +129,8 @@ public void onFailure(Throwable throwable) {
protected final OperationAdapter<Delete, MutateRowRequest.Builder> deleteAdapter;
protected final RowMutationsAdapter rowMutationsAdapter;
protected final AppendAdapter appendAdapter;
protected final AppendResponseAdapter appendRespAdapter;
protected final IncrementAdapter incrementAdapter;
protected final IncrementRowResponseAdapter incrRespAdapter;
protected final ResponseAdapter<com.google.bigtable.v1.Row, Result> rowToResultAdapter;

public BatchExecutor(
BigtableClient client,
Expand All @@ -146,9 +143,8 @@ public BatchExecutor(
OperationAdapter<Delete, MutateRowRequest.Builder> deleteAdapter,
RowMutationsAdapter rowMutationsAdapter,
AppendAdapter appendAdapter,
AppendResponseAdapter appendRespAdapter,
IncrementAdapter incrementAdapter,
IncrementRowResponseAdapter incrRespAdapter) {
ResponseAdapter<com.google.bigtable.v1.Row, Result> rowToResultAdapter) {
this.client = client;
this.options = options;
this.tableMetadataSetter = tableMetadataSetter;
Expand All @@ -159,9 +155,8 @@ public BatchExecutor(
this.deleteAdapter = deleteAdapter;
this.rowMutationsAdapter = rowMutationsAdapter;
this.appendAdapter = appendAdapter;
this.appendRespAdapter = appendRespAdapter;
this.incrementAdapter = incrementAdapter;
this.incrRespAdapter = incrRespAdapter;
this.rowToResultAdapter = rowToResultAdapter;
}

/**
Expand Down Expand Up @@ -197,37 +192,26 @@ ListenableFuture<GetRowResponse> issueGetRequest(Get get) {
* Adapt and issue a single Append request returning a ListenableFuture
* for the AppendRowResponse.
*/
ListenableFuture<AppendRowResponse> issueAppendRequest(Append append) {
ListenableFuture<com.google.bigtable.v1.Row> issueAppendRequest(Append append) {
LOG.trace("issueAppendRequest(Append)");
AppendRowRequest.Builder builder = appendAdapter.adapt(append);
ReadModifyWriteRowRequest.Builder builder = appendAdapter.adapt(append);
tableMetadataSetter.setMetadata(builder);
AppendRowRequest request = builder.build();
ReadModifyWriteRowRequest request = builder.build();

try {
return client.appendRowAsync(request);
} catch (ServiceException e) {
LOG.error("Immediately failing async issueAppendRequest due to ServiceException %s", e);
return Futures.immediateFailedFuture(e);
}
return client.readModifyWriteRowAsync(request);
}

/**
* Adapt and issue a single Increment request returning a ListenableFuture
* for the IncrementRowResponse.
*/
ListenableFuture<IncrementRowResponse> issueIncrementRequest(
Increment increment) {
ListenableFuture<com.google.bigtable.v1.Row> issueIncrementRequest(Increment increment) {
LOG.trace("issueIncrementRequest(Increment)");
IncrementRowRequest.Builder builder = incrementAdapter.adapt(increment);
ReadModifyWriteRowRequest.Builder builder = incrementAdapter.adapt(increment);
tableMetadataSetter.setMetadata(builder);
IncrementRowRequest request = builder.build();
ReadModifyWriteRowRequest request = builder.build();

try {
return client.incrementRowAsync(request);
} catch (ServiceException e) {
LOG.error("Immediately failing async issueIncrementRequest due to ServiceException %s", e);
return Futures.immediateFailedFuture(e);
}
return client.readModifyWriteRowAsync(request);
}

/**
Expand Down Expand Up @@ -290,27 +274,27 @@ Object adaptResponse(GetRowResponse response) {
}
},
service);
} else if (row instanceof Append) {
ListenableFuture<AppendRowResponse> rpcResponseFuture =
} else if (row instanceof Append) {
ListenableFuture<com.google.bigtable.v1.Row> rpcResponseFuture =
issueAppendRequest((Append) row);
Futures.addCallback(rpcResponseFuture,
new RpcResultFutureCallback<T, AppendRowResponse>(
new RpcResultFutureCallback<T, com.google.bigtable.v1.Row>(
row, callback, index, results, resultFuture) {
@Override
Object adaptResponse(AppendRowResponse response) {
return appendRespAdapter.adaptResponse(response);
Object adaptResponse(com.google.bigtable.v1.Row response) {
return rowToResultAdapter.adaptResponse(response);
}
},
service);
} else if (row instanceof Increment) {
ListenableFuture<IncrementRowResponse> rpcResponseFuture =
ListenableFuture<com.google.bigtable.v1.Row> rpcResponseFuture =
issueIncrementRequest((Increment) row);
Futures.addCallback(rpcResponseFuture,
new RpcResultFutureCallback<T, IncrementRowResponse>(
new RpcResultFutureCallback<T, com.google.bigtable.v1.Row>(
row, callback, index, results, resultFuture) {
@Override
Object adaptResponse(IncrementRowResponse response) {
return incrRespAdapter.adaptResponse(response);
Object adaptResponse(com.google.bigtable.v1.Row response) {
return rowToResultAdapter.adaptResponse(response);
}
},
service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
package com.google.cloud.bigtable.hbase;

import com.google.cloud.bigtable.hbase.adapters.AppendAdapter;
import com.google.cloud.bigtable.hbase.adapters.AppendResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.BigtableRowAdapter;
import com.google.cloud.bigtable.hbase.adapters.DeleteAdapter;
import com.google.cloud.bigtable.hbase.adapters.FilterAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.MutationAdapter;
import com.google.cloud.bigtable.hbase.adapters.PutAdapter;
import com.google.cloud.bigtable.hbase.adapters.ResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowMutationsAdapter;
import com.google.cloud.bigtable.hbase.adapters.ScanAdapter;
Expand All @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;

Expand All @@ -57,6 +58,8 @@ public class BigtableBufferedMutator implements BufferedMutator {
protected final BatchExecutor batchExecutor;

protected final RowAdapter rowAdapter = new RowAdapter();
protected final ResponseAdapter<com.google.bigtable.v1.Row, Result> bigtableRowAdapter =
new BigtableRowAdapter();
protected final GetRowResponseAdapter getRowResponseAdapter =
new GetRowResponseAdapter(rowAdapter);
protected final ScanAdapter scanAdapter = new ScanAdapter(new FilterAdapter());
Expand All @@ -74,10 +77,7 @@ public class BigtableBufferedMutator implements BufferedMutator {
* batchExecutor so that we can remove these.
*/
protected final AppendAdapter appendAdapter = new AppendAdapter();
protected final AppendResponseAdapter appendRespAdapter = new AppendResponseAdapter(rowAdapter);
protected final IncrementAdapter incrementAdapter = new IncrementAdapter();
protected final IncrementRowResponseAdapter incrRespAdapter =
new IncrementRowResponseAdapter(rowAdapter);

public BigtableBufferedMutator(Configuration configuration,
TableName tableName,
Expand All @@ -94,7 +94,9 @@ public BigtableBufferedMutator(Configuration configuration,
this.listener = listener;

putAdapter = new PutAdapter(configuration);
mutationAdapter = new MutationAdapter(deleteAdapter, putAdapter,
mutationAdapter = new MutationAdapter(
deleteAdapter,
putAdapter,
new UnsupportedOperationAdapter<Increment>("increment"),
new UnsupportedOperationAdapter<Append>("append"));
rowMutationsAdapter = new RowMutationsAdapter(mutationAdapter);
Expand All @@ -110,9 +112,8 @@ public BigtableBufferedMutator(Configuration configuration,
deleteAdapter,
rowMutationsAdapter,
appendAdapter,
appendRespAdapter,
incrementAdapter,
incrRespAdapter);
bigtableRowAdapter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,24 @@
*/
package com.google.cloud.bigtable.hbase;

import com.google.bigtable.anviltop.AnviltopData;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.ReadTableRequest;
import com.google.bigtable.v1.CheckAndMutateRowResponse;
import com.google.bigtable.v1.CheckAndMutateRowRequest;
import com.google.bigtable.v1.MutateRowRequest;
import com.google.bigtable.v1.ReadModifyWriteRowRequest;
import com.google.cloud.bigtable.hbase.adapters.AppendAdapter;
import com.google.cloud.bigtable.hbase.adapters.AppendResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.BigtableResultScannerAdapter;
import com.google.cloud.bigtable.hbase.adapters.BigtableRowAdapter;
import com.google.cloud.bigtable.hbase.adapters.DeleteAdapter;
import com.google.cloud.bigtable.hbase.adapters.FilterAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.MutationAdapter;
import com.google.cloud.bigtable.hbase.adapters.PutAdapter;
import com.google.cloud.bigtable.hbase.adapters.ResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowMutationsAdapter;
import com.google.cloud.bigtable.hbase.adapters.ScanAdapter;
Expand Down Expand Up @@ -85,11 +81,11 @@ public class BigtableTable implements Table {
protected final BigtableOptions options;
protected final BigtableClient client;
protected final RowAdapter rowAdapter = new RowAdapter();
protected final ResponseAdapter<com.google.bigtable.v1.Row, Result> bigtableRowAdapter =
new BigtableRowAdapter();
protected final PutAdapter putAdapter;
protected final AppendAdapter appendAdapter = new AppendAdapter();
protected final AppendResponseAdapter appendRespAdapter = new AppendResponseAdapter(rowAdapter);
protected final IncrementAdapter incrementAdapter = new IncrementAdapter();
protected final IncrementRowResponseAdapter incrRespAdapter = new IncrementRowResponseAdapter(rowAdapter);
protected final DeleteAdapter deleteAdapter = new DeleteAdapter();
protected final MutationAdapter mutationAdapter;
protected final RowMutationsAdapter rowMutationsAdapter;
Expand Down Expand Up @@ -147,9 +143,8 @@ public BigtableTable(TableName tableName,
deleteAdapter,
rowMutationsAdapter,
appendAdapter,
appendRespAdapter,
incrementAdapter,
incrRespAdapter);
bigtableRowAdapter);
this.metadataSetter = new TableMetadataSetter(
tableName, options.getProjectId(), options.getZone(), options.getCluster());
}
Expand Down Expand Up @@ -422,16 +417,15 @@ public void mutateRow(RowMutations rm) throws IOException {
@Override
public Result append(Append append) throws IOException {
LOG.trace("append(Append)");
AppendRowRequest.Builder appendRowRequest = appendAdapter.adapt(append);
appendRowRequest
.setProjectId(options.getProjectId())
.setTableName(tableName.getQualifierAsString());

ReadModifyWriteRowRequest.Builder appendRowRequest = appendAdapter.adapt(append);
metadataSetter.setMetadata(appendRowRequest);
try {
AppendRowResponse response = client.appendRow(appendRowRequest.build());
return appendRespAdapter.adaptResponse(response);
} catch (ServiceException e) {
LOG.error("Encountered ServiceException when executing append. Exception: %s", e);
com.google.bigtable.v1.Row response =
client.readModifyWriteRow(appendRowRequest.build());
return bigtableRowAdapter.adaptResponse(response);
} catch (RuntimeException e) {
LOG.error("Encountered Exception when executing append. Exception: %s", e);
throw new IOException(
makeGenericExceptionMessage(
"append",
Expand All @@ -445,18 +439,15 @@ public Result append(Append append) throws IOException {
@Override
public Result increment(Increment increment) throws IOException {
LOG.trace("increment(Increment)");
IncrementRowRequest.Builder incrementRowRequest = incrementAdapter.adapt(
increment);
incrementRowRequest
.setProjectId(options.getProjectId())
.setTableName(tableName.getQualifierAsString());
ReadModifyWriteRowRequest.Builder incrementRowRequest =
incrementAdapter.adapt(increment);
metadataSetter.setMetadata(incrementRowRequest);

try {
IncrementRowResponse response = client.incrementRow(
incrementRowRequest.build());
return incrRespAdapter.adaptResponse(response);
} catch (ServiceException e) {
LOG.error("Encountered ServiceException when executing increment. Exception: %s", e);
com.google.bigtable.v1.Row response = client.readModifyWriteRow(incrementRowRequest.build());
return bigtableRowAdapter.adaptResponse(response);
} catch (RuntimeException e) {
LOG.error("Encountered RuntimeException when executing increment. Exception: %s", e);
throw new IOException(
makeGenericExceptionMessage(
"increment",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowRequest;
import com.google.bigtable.v1.CheckAndMutateRowRequest;
import com.google.bigtable.v1.MutateRowRequest;
import com.google.bigtable.v1.ReadModifyWriteRowRequest;

import org.apache.hadoop.hbase.TableName;

Expand Down Expand Up @@ -34,11 +35,6 @@ public void setMetadata(AnviltopServiceMessages.GetRowRequest.Builder builder) {
builder.setProjectId(projectId);
}

public void setMetadata(AppendRowRequest.Builder builder) {
builder.setTableName(tableName.getQualifierAsString());
builder.setProjectId(projectId);
}

public void setMetadata(IncrementRowRequest.Builder builder) {
builder.setTableName(tableName.getQualifierAsString());
builder.setProjectId(projectId);
Expand All @@ -51,4 +47,8 @@ public void setMetadata(MutateRowRequest.Builder builder) {
public void setMetadata(CheckAndMutateRowRequest.Builder builder) {
builder.setTableName(formattedV1TableName);
}

public void setMetadata(ReadModifyWriteRowRequest.Builder builder) {
builder.setTableName(formattedV1TableName);
}
}
Loading

0 comments on commit 46b20cc

Please sign in to comment.