@@ -60,7 +60,6 @@
import com .google .cloud .spanner .admin .instance .v1 .stub .GrpcInstanceAdminStub ;
import com .google .cloud .spanner .admin .instance .v1 .stub .InstanceAdminStub ;
import com .google .cloud .spanner .admin .instance .v1 .stub .InstanceAdminStubSettings ;
import com .google .cloud .spanner .spi .v1 .SpannerRpc .Option ;
import com .google .cloud .spanner .v1 .stub .GrpcSpannerStub ;
import com .google .cloud .spanner .v1 .stub .SpannerStub ;
import com .google .cloud .spanner .v1 .stub .SpannerStubSettings ;
@@ -171,11 +170,13 @@
/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
@ InternalApi
public class GapicSpannerRpc implements SpannerRpc {
/**
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
* down when the {@link SpannerRpc} is closed.
*/
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {
// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16 ;
private final List <ScheduledExecutorService > executors = new LinkedList <>();
@@ -317,7 +318,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setMaxInboundMessageSize (MAX_MESSAGE_SIZE )
.setMaxInboundMetadataSize (MAX_METADATA_SIZE )
.setPoolSize (options .getNumChannels ())
.setExecutor (executorProvider .getExecutor ())
// Before updating this method to setExecutor, please verify with a code owner on
// the lowest version of gax-grpc that needs to be supported. Currently v1.47.17,
// which doesn't support the setExecutor variant.
.setExecutorProvider (executorProvider )
// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
@@ -480,6 +485,7 @@ private static void checkEmulatorConnection(
private static final class OperationFutureRetryAlgorithm <ResultT , MetadataT >
implements ResultRetryAlgorithm <OperationFuture <ResultT , MetadataT >> {
private static final ImmutableList <StatusCode .Code > RETRYABLE_CODES =
ImmutableList .of (StatusCode .Code .DEADLINE_EXCEEDED , StatusCode .Code .UNAVAILABLE );
@@ -519,6 +525,7 @@ public boolean shouldRetry(
private final class OperationFutureCallable <RequestT , ResponseT , MetadataT extends Message >
implements Callable <OperationFuture <ResponseT , MetadataT >> {
final OperationCallable <RequestT , ResponseT , MetadataT > operationCallable ;
final RequestT initialRequest ;
final MethodDescriptor <RequestT , Operation > method ;
@@ -575,6 +582,7 @@ public OperationFuture<ResponseT, MetadataT> call() throws Exception {
}
private interface OperationsLister {
Paginated <Operation > listOperations (String nextPageToken );
}
@@ -610,6 +618,7 @@ private Operation mostRecentOperation(
}
private static final class TimestampComparator implements Comparator <Timestamp > {
private static final TimestampComparator INSTANCE = new TimestampComparator ();
@ Override
@@ -1458,6 +1467,7 @@ public boolean isClosed() {
* the {@link ResultStreamConsumer}.
*/
private static class SpannerResponseObserver implements ResponseObserver <PartialResultSet > {
private StreamController controller ;
private final ResultStreamConsumer consumer ;