Skip to content

Commit

Permalink
Refactoring BigtableAsyncRpc (#959)
Browse files Browse the repository at this point in the history
The goal here is to move the read rows functionality into the same
listener model that all of the other RPCs use.  That will help with
metrics gathering and the "observer" read rows functionality described
in #703.
  • Loading branch information
sduskis committed Aug 11, 2016
1 parent 4dcfc59 commit 36f7173
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public ResultScanner<Row> createScanner(ReadRowsRequest request) {
}
};

private final BigtableAsyncUtilities asyncUtilities;
private CallOptionsFactory callOptionsFactory = new CallOptionsFactory.Default();

private final BigtableAsyncRpc<SampleRowKeysRequest, SampleRowKeysResponse> sampleRowKeysAsync;
Expand Down Expand Up @@ -199,7 +198,6 @@ public BigtableDataGrpcClient(
this.retryExecutorService = retryExecutorService;
this.bigtableOptions = bigtableOptions;
this.retryOptions = bigtableOptions.getRetryOptions();
this.asyncUtilities = asyncUtilities;

this.sampleRowKeysAsync =
asyncUtilities.createAsyncRpc(
Expand Down Expand Up @@ -415,25 +413,22 @@ private ResultScanner<Row> streamRows(ReadRowsRequest request) {

expandPoolIfNecessary(this.bigtableOptions.getChannelCount());

ClientCall<ReadRowsRequest, ReadRowsResponse> readRowsCall =
channelPool.newCall(BigtableGrpc.METHOD_READ_ROWS, CallOptions.DEFAULT);
// TODO: This should use getCallOptions(ReqT request, BigtableAsyncRpc<ReqT, ?> rpc) for Gets.
CallOptions callOptions = CallOptions.DEFAULT;
ClientCall<ReadRowsRequest, ReadRowsResponse> readRowsCall = readRowsAsync.newCall(callOptions);

return new StreamingBigtableResultScanner(
createReader(request, readRowsCall),
createCancellationToken(readRowsCall, timerContext));
}

protected ResponseQueueReader createReader(ReadRowsRequest request,
ClientCall<ReadRowsRequest, ReadRowsResponse> readRowsCall) {
ResponseQueueReader responseQueueReader = new ResponseQueueReader(
retryOptions.getReadPartialRowTimeoutMillis(), retryOptions.getStreamingBufferSize());
ResponseQueueReader reader =
new ResponseQueueReader(
retryOptions.getReadPartialRowTimeoutMillis(), retryOptions.getStreamingBufferSize());

StreamObserver<ReadRowsResponse> rowMerger = new RowMerger(responseQueueReader);
StreamObserver<ReadRowsResponse> rowMerger = new RowMerger(reader);
ClientCall.Listener<ReadRowsResponse> listener =
new StreamObserverAdapter<>(readRowsCall, rowMerger);
asyncUtilities.asyncServerStreamingCall(readRowsCall, request, listener,
createMetadata(request.getTableName()));
return responseQueueReader;

readRowsAsync.start(readRowsCall, request, listener, createMetadata(request.getTableName()));

CancellationToken cancellationToken = createCancellationToken(readRowsCall, timerContext);
return new StreamingBigtableResultScanner(reader, cancellationToken);
}

private CancellationToken createCancellationToken(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,17 @@ public ListenableFuture<ResultT> getCompletionFuture() {
/**
* {@inheritDoc}
*
* Calls {@link BigtableAsyncRpc#call} with this as the listener so that
* retries happen correctly.
* <p>Calls {@link BigtableAsyncRpc#newCall(CallOptions)} and {@link
* BigtableAsyncRpc#start(ClientCall, Object, io.grpc.ClientCall.Listener, Metadata)} with this as
* the listener so that retries happen correctly.
*/
@Override
public void run() {
this.rpcTimerContext = this.rpc.getRpcMetrics().timeRpc();
Metadata metadata = new Metadata();
metadata.merge(originalMetadata);
this.call = rpc.call(getRetryRequest(), this, callOptions, metadata);
this.call = rpc.newCall(callOptions);
rpc.start(this.call, getRetryRequest(), this, metadata);
}

protected RequestT getRetryRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,24 @@ public void incrementRetriesExhastedCounter() {
/**
* Creates a {@link io.grpc.ClientCall}.
*
* @param callOptions A set of gRPC options to use on this call.
* @return A ClientCall that represents a new request.
*/
ClientCall<REQUEST, RESPONSE> newCall(CallOptions callOptions);

/**
* Creates a {@link io.grpc.ClientCall}.
*
* @param call The ClientCall to use. See {@link BigtableAsyncRpc#newCall(CallOptions)}
* @param request The request to send.
* @param listener A listener which handles responses.
* @param callOptions A set of gRPC options to use on this call.
* @param metadata A set of predefined headers to use.
* @return A ClientCall that represents a new request.
*/
ClientCall<REQUEST, RESPONSE> call(REQUEST request, ClientCall.Listener<RESPONSE> listener,
CallOptions callOptions, Metadata metadata);
void start(
ClientCall<REQUEST, RESPONSE> call,
REQUEST request,
ClientCall.Listener<RESPONSE> listener,
Metadata metadata);

/**
* Can this request be retried?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientCall.Listener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;

Expand All @@ -46,19 +47,6 @@ public interface BigtableAsyncUtilities {
<RequestT, ResponseT> BigtableAsyncRpc<RequestT, ResponseT> createAsyncRpc(
MethodDescriptor<RequestT, ResponseT> method, Predicate<RequestT> isRetryable);

/**
* <p>asyncServerStreamingCall.</p>
*
* @param call a {@link io.grpc.ClientCall} object.
* @param request a RequestT object.
* @param listener a {@link io.grpc.ClientCall.Listener} object.
* @param metadata a {@link io.grpc.Metadata} object.
* @param <RequestT> a RequestT object.
* @param <ResponseT> a ResponseT object.
*/
<RequestT, ResponseT> void asyncServerStreamingCall(ClientCall<RequestT, ResponseT> call,
RequestT request, ClientCall.Listener<ResponseT> listener, Metadata metadata);

public static class Default implements BigtableAsyncUtilities {
private final Channel channel;

Expand All @@ -71,14 +59,6 @@ public <RequestT, ResponseT> BigtableAsyncRpc<RequestT, ResponseT> createAsyncRp
final MethodDescriptor<RequestT, ResponseT> method, final Predicate<RequestT> isRetryable) {
final BigtableAsyncRpc.RpcMetrics metrics = RpcMetrics.createRpcMetrics(method);
return new BigtableAsyncRpc<RequestT, ResponseT>() {
@Override
public ClientCall<RequestT, ResponseT> call(RequestT request,
ClientCall.Listener<ResponseT> listener, CallOptions callOptions, Metadata metadata) {
ClientCall<RequestT, ResponseT> call = channel.newCall(method, callOptions);
start(call, request, listener, 1, metadata);
return call;
}

@Override
public boolean isRetryable(RequestT request) {
return isRetryable.apply(request);
Expand All @@ -93,36 +73,34 @@ public MethodDescriptor<RequestT, ResponseT> getMethodDescriptor() {
public BigtableAsyncRpc.RpcMetrics getRpcMetrics() {
return metrics;
}
};
}

@Override
public <RequestT, ResponseT> void asyncServerStreamingCall(ClientCall<RequestT, ResponseT> call,
RequestT request, ClientCall.Listener<ResponseT> listener, Metadata metadata) {
// gRPC treats streaming and unary calls differently for the number of responses to retrieve.
// See createAsyncUnaryRpc for how unary calls are handled.
//
// See ClientCalls.startCall() for more information.
start(call, request, listener, 1, metadata);
}
@Override
public ClientCall<RequestT, ResponseT> newCall(CallOptions callOptions) {
return channel.newCall(method, callOptions);
}

private static <RequestT, ResponseT> void start(ClientCall<RequestT, ResponseT> call,
RequestT request, ClientCall.Listener<ResponseT> listener, int requestCount,
Metadata metadata) {
call.start(listener, metadata);
call.request(requestCount);
try {
call.sendMessage(request);
} catch (Throwable t) {
call.cancel("Exception in sendMessage.", t);
throw Throwables.propagate(t);
}
try {
call.halfClose();
} catch (Throwable t) {
call.cancel("Exception in halfClose.", t);
throw Throwables.propagate(t);
}
@Override
public void start(
ClientCall<RequestT, ResponseT> call,
RequestT request,
Listener<ResponseT> listener,
Metadata metadata) {
call.start(listener, metadata);
call.request(1);
try {
call.sendMessage(request);
} catch (Throwable t) {
call.cancel("Exception in sendMessage.", t);
throw Throwables.propagate(t);
}
try {
call.halfClose();
} catch (Throwable t) {
call.cancel("Exception in halfClose.", t);
throw Throwables.propagate(t);
}
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public BigtableAsyncRpc answer(InvocationOnMock invocation) throws Throwable {
when(mockAsyncUtilities.createAsyncRpc(any(MethodDescriptor.class), any(Predicate.class)))
.thenAnswer(answer);

when(mockBigtableRpc.newCall(any(CallOptions.class))).thenReturn(mockClientCall);
tableMetadata = new Metadata();
tableMetadata.put(GoogleCloudResourcePrefixInterceptor.GRPC_RESOURCE_PREFIX_KEY, TABLE_NAME);
}
Expand Down Expand Up @@ -231,28 +232,26 @@ public void testCheckAndMutateRowPredicate() {

@Test
public void testSingleRowRead() {
ReadRowsRequest.Builder requestBuilder = ReadRowsRequest.newBuilder();
ReadRowsRequest.Builder requestBuilder = ReadRowsRequest.newBuilder().setTableName(TABLE_NAME);
requestBuilder.getRowsBuilder().addRowKeys(ByteString.copyFrom(new byte[0]));
createClient(false).readRows(requestBuilder.build());
verify(mockChannelPool, times(1)).newCall(same(BigtableGrpc.METHOD_READ_ROWS),
same(CallOptions.DEFAULT));
verifyRequestCalled(requestBuilder.build());
}

@Test
public void testMultiRowRead() {
ReadRowsRequest.Builder requestBuilder = ReadRowsRequest.newBuilder();
ReadRowsRequest.Builder requestBuilder = ReadRowsRequest.newBuilder().setTableName(TABLE_NAME);
requestBuilder.getRowsBuilder().addRowRanges(RowRange.getDefaultInstance());
createClient(false).readRows(requestBuilder.build());
verify(mockChannelPool, times(1)).newCall(same(BigtableGrpc.METHOD_READ_ROWS),
same(CallOptions.DEFAULT));
verifyRequestCalled(requestBuilder.build());
}

private void setResponse(final Object response) {
Answer<Void> answer =
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
invocation.getArgumentAt(1, ClientCall.Listener.class).onMessage(response);
invocation.getArgumentAt(2, ClientCall.Listener.class).onMessage(response);
Metadata metadata = invocation.getArgumentAt(3, Metadata.class);
interceptor.updateHeaders(metadata);
String headerValue =
Expand All @@ -263,15 +262,20 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
};
doAnswer(answer)
.when(mockBigtableRpc)
.call(any(), any(ClientCall.Listener.class), any(CallOptions.class), any(Metadata.class));
.start(same(mockClientCall),
any(),
any(ClientCall.Listener.class),
any(Metadata.class));
}

private void verifyRequestCalled(Object request) {
verify(mockBigtableRpc, times(1))
.call(
.newCall(any(CallOptions.class));
verify(mockBigtableRpc, times(1))
.start(
same(mockClientCall),
eq(request),
any(ClientCall.Listener.class),
any(CallOptions.class),
any(Metadata.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
*/
@RunWith(JUnit4.class)
@SuppressWarnings({ "unchecked", "rawtypes" })
public class RetryingUnaryRpcCallListenerTest {
public class TestRetryingUnaryRpcCallListener {

private static final BigtableAsyncRpc.RpcMetrics metrics =
BigtableAsyncRpc.RpcMetrics.createRpcMetrics(BigtableGrpc.METHOD_READ_ROWS);
Expand Down Expand Up @@ -129,14 +129,19 @@ public void testOK() throws Exception {
Answer<Void> answer = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Listener listener = invocation.getArgumentAt(1, ClientCall.Listener.class);
Listener listener = invocation.getArgumentAt(2, ClientCall.Listener.class);
listener.onMessage(result);
listener.onClose(Status.OK, null);
return null;
}
};
doAnswer(answer).when(readAsync).call(any(ReadRowsRequest.class),
any(ClientCall.Listener.class), any(CallOptions.class), any(Metadata.class));
doAnswer(answer)
.when(readAsync)
.start(
any(ClientCall.class),
any(ReadRowsRequest.class),
any(ClientCall.Listener.class),
any(Metadata.class));
underTest.start();
Assert.assertEquals(result, underTest.getCompletionFuture().get(1, TimeUnit.SECONDS));
verify(nanoClock, times(0)).nanoTime();
Expand All @@ -150,7 +155,7 @@ public void testRecoveredFailure() throws Exception {
Answer<Void> answer = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Listener listener = invocation.getArgumentAt(1, ClientCall.Listener.class);
Listener listener = invocation.getArgumentAt(2, ClientCall.Listener.class);
if (counter.incrementAndGet() < 5) {
listener.onClose(errorStatus, null);
} else {
Expand All @@ -160,8 +165,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return null;
}
};
doAnswer(answer).when(readAsync).call(any(ReadRowsRequest.class),
any(ClientCall.Listener.class), any(CallOptions.class), any(Metadata.class));
doAnswer(answer).when(readAsync).start(any(ClientCall.class), any(ReadRowsRequest.class),
any(ClientCall.Listener.class), any(Metadata.class));
underTest.start();

Assert.assertEquals(result, underTest.getCompletionFuture().get(1, TimeUnit.HOURS));
Expand All @@ -174,12 +179,12 @@ public void testCompleteFailure() throws Exception {
Answer<Void> answer = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
invocation.getArgumentAt(1, ClientCall.Listener.class).onClose(errorStatus, null);
invocation.getArgumentAt(2, ClientCall.Listener.class).onClose(errorStatus, null);
return null;
}
};
doAnswer(answer).when(readAsync).call(any(ReadRowsRequest.class),
any(ClientCall.Listener.class), any(CallOptions.class), any(Metadata.class));
doAnswer(answer).when(readAsync).start(any(ClientCall.class), any(ReadRowsRequest.class),
any(ClientCall.Listener.class), any(Metadata.class));
try {
underTest.start();
underTest.getCompletionFuture().get(1, TimeUnit.MINUTES);
Expand Down

0 comments on commit 36f7173

Please sign in to comment.