Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ReadModifyWrite for Append and Increment operations. #132

Merged
merged 1 commit into from
Feb 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package com.google.cloud.bigtable.hbase;

import com.google.api.client.util.Preconditions;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowResponse;
import com.google.bigtable.v1.MutateRowRequest;
import com.google.bigtable.v1.ReadModifyWriteRowRequest;
import com.google.cloud.bigtable.hbase.adapters.AppendAdapter;
import com.google.cloud.bigtable.hbase.adapters.AppendResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.OperationAdapter;
import com.google.cloud.bigtable.hbase.adapters.ResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowMutationsAdapter;
import com.google.cloud.hadoop.hbase.BigtableClient;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -131,9 +129,8 @@ public void onFailure(Throwable throwable) {
protected final OperationAdapter<Delete, MutateRowRequest.Builder> deleteAdapter;
protected final RowMutationsAdapter rowMutationsAdapter;
protected final AppendAdapter appendAdapter;
protected final AppendResponseAdapter appendRespAdapter;
protected final IncrementAdapter incrementAdapter;
protected final IncrementRowResponseAdapter incrRespAdapter;
protected final ResponseAdapter<com.google.bigtable.v1.Row, Result> rowToResultAdapter;

public BatchExecutor(
BigtableClient client,
Expand All @@ -146,9 +143,8 @@ public BatchExecutor(
OperationAdapter<Delete, MutateRowRequest.Builder> deleteAdapter,
RowMutationsAdapter rowMutationsAdapter,
AppendAdapter appendAdapter,
AppendResponseAdapter appendRespAdapter,
IncrementAdapter incrementAdapter,
IncrementRowResponseAdapter incrRespAdapter) {
ResponseAdapter<com.google.bigtable.v1.Row, Result> rowToResultAdapter) {
this.client = client;
this.options = options;
this.tableMetadataSetter = tableMetadataSetter;
Expand All @@ -159,9 +155,8 @@ public BatchExecutor(
this.deleteAdapter = deleteAdapter;
this.rowMutationsAdapter = rowMutationsAdapter;
this.appendAdapter = appendAdapter;
this.appendRespAdapter = appendRespAdapter;
this.incrementAdapter = incrementAdapter;
this.incrRespAdapter = incrRespAdapter;
this.rowToResultAdapter = rowToResultAdapter;
}

/**
Expand Down Expand Up @@ -197,37 +192,26 @@ ListenableFuture<GetRowResponse> issueGetRequest(Get get) {
* Adapt and issue a single Append request returning a ListenableFuture
* for the AppendRowResponse.
*/
ListenableFuture<AppendRowResponse> issueAppendRequest(Append append) {
ListenableFuture<com.google.bigtable.v1.Row> issueAppendRequest(Append append) {
LOG.trace("issueAppendRequest(Append)");
AppendRowRequest.Builder builder = appendAdapter.adapt(append);
ReadModifyWriteRowRequest.Builder builder = appendAdapter.adapt(append);
tableMetadataSetter.setMetadata(builder);
AppendRowRequest request = builder.build();
ReadModifyWriteRowRequest request = builder.build();

try {
return client.appendRowAsync(request);
} catch (ServiceException e) {
LOG.error("Immediately failing async issueAppendRequest due to ServiceException %s", e);
return Futures.immediateFailedFuture(e);
}
return client.readModifyWriteRowAsync(request);
}

/**
* Adapt and issue a single Increment request returning a ListenableFuture
* for the IncrementRowResponse.
*/
ListenableFuture<IncrementRowResponse> issueIncrementRequest(
Increment increment) {
ListenableFuture<com.google.bigtable.v1.Row> issueIncrementRequest(Increment increment) {
LOG.trace("issueIncrementRequest(Increment)");
IncrementRowRequest.Builder builder = incrementAdapter.adapt(increment);
ReadModifyWriteRowRequest.Builder builder = incrementAdapter.adapt(increment);
tableMetadataSetter.setMetadata(builder);
IncrementRowRequest request = builder.build();
ReadModifyWriteRowRequest request = builder.build();

try {
return client.incrementRowAsync(request);
} catch (ServiceException e) {
LOG.error("Immediately failing async issueIncrementRequest due to ServiceException %s", e);
return Futures.immediateFailedFuture(e);
}
return client.readModifyWriteRowAsync(request);
}

/**
Expand Down Expand Up @@ -290,27 +274,27 @@ Object adaptResponse(GetRowResponse response) {
}
},
service);
} else if (row instanceof Append) {
ListenableFuture<AppendRowResponse> rpcResponseFuture =
} else if (row instanceof Append) {
ListenableFuture<com.google.bigtable.v1.Row> rpcResponseFuture =
issueAppendRequest((Append) row);
Futures.addCallback(rpcResponseFuture,
new RpcResultFutureCallback<T, AppendRowResponse>(
new RpcResultFutureCallback<T, com.google.bigtable.v1.Row>(
row, callback, index, results, resultFuture) {
@Override
Object adaptResponse(AppendRowResponse response) {
return appendRespAdapter.adaptResponse(response);
Object adaptResponse(com.google.bigtable.v1.Row response) {
return rowToResultAdapter.adaptResponse(response);
}
},
service);
} else if (row instanceof Increment) {
ListenableFuture<IncrementRowResponse> rpcResponseFuture =
ListenableFuture<com.google.bigtable.v1.Row> rpcResponseFuture =
issueIncrementRequest((Increment) row);
Futures.addCallback(rpcResponseFuture,
new RpcResultFutureCallback<T, IncrementRowResponse>(
new RpcResultFutureCallback<T, com.google.bigtable.v1.Row>(
row, callback, index, results, resultFuture) {
@Override
Object adaptResponse(IncrementRowResponse response) {
return incrRespAdapter.adaptResponse(response);
Object adaptResponse(com.google.bigtable.v1.Row response) {
return rowToResultAdapter.adaptResponse(response);
}
},
service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
package com.google.cloud.bigtable.hbase;

import com.google.cloud.bigtable.hbase.adapters.AppendAdapter;
import com.google.cloud.bigtable.hbase.adapters.AppendResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.BigtableRowAdapter;
import com.google.cloud.bigtable.hbase.adapters.DeleteAdapter;
import com.google.cloud.bigtable.hbase.adapters.FilterAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.MutationAdapter;
import com.google.cloud.bigtable.hbase.adapters.PutAdapter;
import com.google.cloud.bigtable.hbase.adapters.ResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowMutationsAdapter;
import com.google.cloud.bigtable.hbase.adapters.ScanAdapter;
Expand All @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;

Expand All @@ -57,6 +58,8 @@ public class BigtableBufferedMutator implements BufferedMutator {
protected final BatchExecutor batchExecutor;

protected final RowAdapter rowAdapter = new RowAdapter();
protected final ResponseAdapter<com.google.bigtable.v1.Row, Result> bigtableRowAdapter =
new BigtableRowAdapter();
protected final GetRowResponseAdapter getRowResponseAdapter =
new GetRowResponseAdapter(rowAdapter);
protected final ScanAdapter scanAdapter = new ScanAdapter(new FilterAdapter());
Expand All @@ -74,10 +77,7 @@ public class BigtableBufferedMutator implements BufferedMutator {
* batchExecutor so that we can remove these.
*/
protected final AppendAdapter appendAdapter = new AppendAdapter();
protected final AppendResponseAdapter appendRespAdapter = new AppendResponseAdapter(rowAdapter);
protected final IncrementAdapter incrementAdapter = new IncrementAdapter();
protected final IncrementRowResponseAdapter incrRespAdapter =
new IncrementRowResponseAdapter(rowAdapter);

public BigtableBufferedMutator(Configuration configuration,
TableName tableName,
Expand All @@ -94,7 +94,9 @@ public BigtableBufferedMutator(Configuration configuration,
this.listener = listener;

putAdapter = new PutAdapter(configuration);
mutationAdapter = new MutationAdapter(deleteAdapter, putAdapter,
mutationAdapter = new MutationAdapter(
deleteAdapter,
putAdapter,
new UnsupportedOperationAdapter<Increment>("increment"),
new UnsupportedOperationAdapter<Append>("append"));
rowMutationsAdapter = new RowMutationsAdapter(mutationAdapter);
Expand All @@ -110,9 +112,8 @@ public BigtableBufferedMutator(Configuration configuration,
deleteAdapter,
rowMutationsAdapter,
appendAdapter,
appendRespAdapter,
incrementAdapter,
incrRespAdapter);
bigtableRowAdapter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,24 @@
*/
package com.google.cloud.bigtable.hbase;

import com.google.bigtable.anviltop.AnviltopData;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.AppendRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.GetRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowRequest;
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowResponse;
import com.google.bigtable.anviltop.AnviltopServiceMessages.ReadTableRequest;
import com.google.bigtable.v1.CheckAndMutateRowResponse;
import com.google.bigtable.v1.CheckAndMutateRowRequest;
import com.google.bigtable.v1.MutateRowRequest;
import com.google.bigtable.v1.ReadModifyWriteRowRequest;
import com.google.cloud.bigtable.hbase.adapters.AppendAdapter;
import com.google.cloud.bigtable.hbase.adapters.AppendResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.BigtableResultScannerAdapter;
import com.google.cloud.bigtable.hbase.adapters.BigtableRowAdapter;
import com.google.cloud.bigtable.hbase.adapters.DeleteAdapter;
import com.google.cloud.bigtable.hbase.adapters.FilterAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetAdapter;
import com.google.cloud.bigtable.hbase.adapters.GetRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementAdapter;
import com.google.cloud.bigtable.hbase.adapters.IncrementRowResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.MutationAdapter;
import com.google.cloud.bigtable.hbase.adapters.PutAdapter;
import com.google.cloud.bigtable.hbase.adapters.ResponseAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowAdapter;
import com.google.cloud.bigtable.hbase.adapters.RowMutationsAdapter;
import com.google.cloud.bigtable.hbase.adapters.ScanAdapter;
Expand Down Expand Up @@ -85,11 +81,11 @@ public class BigtableTable implements Table {
protected final BigtableOptions options;
protected final BigtableClient client;
protected final RowAdapter rowAdapter = new RowAdapter();
protected final ResponseAdapter<com.google.bigtable.v1.Row, Result> bigtableRowAdapter =
new BigtableRowAdapter();
protected final PutAdapter putAdapter;
protected final AppendAdapter appendAdapter = new AppendAdapter();
protected final AppendResponseAdapter appendRespAdapter = new AppendResponseAdapter(rowAdapter);
protected final IncrementAdapter incrementAdapter = new IncrementAdapter();
protected final IncrementRowResponseAdapter incrRespAdapter = new IncrementRowResponseAdapter(rowAdapter);
protected final DeleteAdapter deleteAdapter = new DeleteAdapter();
protected final MutationAdapter mutationAdapter;
protected final RowMutationsAdapter rowMutationsAdapter;
Expand Down Expand Up @@ -147,9 +143,8 @@ public BigtableTable(TableName tableName,
deleteAdapter,
rowMutationsAdapter,
appendAdapter,
appendRespAdapter,
incrementAdapter,
incrRespAdapter);
bigtableRowAdapter);
this.metadataSetter = new TableMetadataSetter(
tableName, options.getProjectId(), options.getZone(), options.getCluster());
}
Expand Down Expand Up @@ -422,16 +417,21 @@ public void mutateRow(RowMutations rm) throws IOException {
@Override
public Result append(Append append) throws IOException {
LOG.trace("append(Append)");
AppendRowRequest.Builder appendRowRequest = appendAdapter.adapt(append);
appendRowRequest
.setProjectId(options.getProjectId())
.setTableName(tableName.getQualifierAsString());

ReadModifyWriteRowRequest.Builder appendRowRequest = appendAdapter.adapt(append);
metadataSetter.setMetadata(appendRowRequest);
try {
AppendRowResponse response = client.appendRow(appendRowRequest.build());
return appendRespAdapter.adaptResponse(response);
} catch (ServiceException e) {
LOG.error("Encountered ServiceException when executing append. Exception: %s", e);
com.google.bigtable.v1.Row response =
client.readModifyWriteRow(appendRowRequest.build());
// The bigtable API will always return the mutated results. In order to maintain
// compatibility, simply return null when results were not requested.
if (append.isReturnResults()) {
return bigtableRowAdapter.adaptResponse(response);
} else {
return null;
}
} catch (RuntimeException e) {
LOG.error("Encountered Exception when executing append. Exception: %s", e);
throw new IOException(
makeGenericExceptionMessage(
"append",
Expand All @@ -445,18 +445,15 @@ public Result append(Append append) throws IOException {
@Override
public Result increment(Increment increment) throws IOException {
LOG.trace("increment(Increment)");
IncrementRowRequest.Builder incrementRowRequest = incrementAdapter.adapt(
increment);
incrementRowRequest
.setProjectId(options.getProjectId())
.setTableName(tableName.getQualifierAsString());
ReadModifyWriteRowRequest.Builder incrementRowRequest =
incrementAdapter.adapt(increment);
metadataSetter.setMetadata(incrementRowRequest);

try {
IncrementRowResponse response = client.incrementRow(
incrementRowRequest.build());
return incrRespAdapter.adaptResponse(response);
} catch (ServiceException e) {
LOG.error("Encountered ServiceException when executing increment. Exception: %s", e);
com.google.bigtable.v1.Row response = client.readModifyWriteRow(incrementRowRequest.build());
return bigtableRowAdapter.adaptResponse(response);
} catch (RuntimeException e) {
LOG.error("Encountered RuntimeException when executing increment. Exception: %s", e);
throw new IOException(
makeGenericExceptionMessage(
"increment",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.bigtable.anviltop.AnviltopServiceMessages.IncrementRowRequest;
import com.google.bigtable.v1.CheckAndMutateRowRequest;
import com.google.bigtable.v1.MutateRowRequest;
import com.google.bigtable.v1.ReadModifyWriteRowRequest;

import org.apache.hadoop.hbase.TableName;

Expand Down Expand Up @@ -34,11 +35,6 @@ public void setMetadata(AnviltopServiceMessages.GetRowRequest.Builder builder) {
builder.setProjectId(projectId);
}

public void setMetadata(AppendRowRequest.Builder builder) {
builder.setTableName(tableName.getQualifierAsString());
builder.setProjectId(projectId);
}

public void setMetadata(IncrementRowRequest.Builder builder) {
builder.setTableName(tableName.getQualifierAsString());
builder.setProjectId(projectId);
Expand All @@ -51,4 +47,8 @@ public void setMetadata(MutateRowRequest.Builder builder) {
public void setMetadata(CheckAndMutateRowRequest.Builder builder) {
builder.setTableName(formattedV1TableName);
}

public void setMetadata(ReadModifyWriteRowRequest.Builder builder) {
builder.setTableName(formattedV1TableName);
}
}
Loading