Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #69 from deflaux/retry-grpc-stream
Browse files Browse the repository at this point in the history
Added tests for gRPC retries.
  • Loading branch information
deflaux committed Nov 13, 2015
2 parents e9edd09 + 2d7a9b3 commit 711c10a
Show file tree
Hide file tree
Showing 13 changed files with 942 additions and 157 deletions.
@@ -1,5 +1,7 @@
package com.google.cloud.genomics.utils.grpc;

import io.grpc.ManagedChannel;

import java.io.FileNotFoundException;
import java.util.Iterator;

Expand Down Expand Up @@ -34,7 +36,7 @@ public static void main(String[] args) throws Exception {
return;
}

GenomicsChannel channel = GenomicsChannel.fromOfflineAuth(auth);
ManagedChannel channel = GenomicsChannel.fromOfflineAuth(auth);

// Regular RPC example: list all reference set assembly ids.
ReferenceServiceV1BlockingStub refStub =
Expand Down
@@ -1,15 +1,21 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.genomics.utils.grpc;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand All @@ -20,32 +26,30 @@
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLException;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

/**
* A convenience class for creating gRPC channels to the Google Genomics API.
*/
public class GenomicsChannel extends Channel {
public class GenomicsChannel {
private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com";
private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics";
private static final String API_KEY_HEADER = "X-Goog-Api-Key";
// TODO https://github.com/googlegenomics/utils-java/issues/48
private static final String PARTIAL_RESPONSE_HEADER = "X-Goog-FieldMask";

// NOTE: Unfortunately we need to keep a handle to both of these since Channel does not expose
// the shutdown method and the ClientInterceptors do not return the ManagedChannel instance.
private final ManagedChannel managedChannel;
private final Channel delegate;

private ManagedChannel getGenomicsManagedChannel() throws SSLException {
private static ManagedChannel getGenomicsManagedChannel(List<ClientInterceptor> interceptors)
throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> defaultCiphers = GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
Expand All @@ -56,99 +60,74 @@ private ManagedChannel getGenomicsManagedChannel() throws SSLException {
return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
.intercept(interceptors).build();
}

private GenomicsChannel(String apiKey) throws SSLException {
managedChannel = getGenomicsManagedChannel();

/**
* Create a new gRPC channel to the Google Genomics API, using the provided api key for auth.
*
* @param apiKey the api key
* @return the ManagedChannel
* @throws SSLException
*/
public static ManagedChannel fromApiKey(String apiKey) throws SSLException {
Metadata headers = new Metadata();
Metadata.Key<String> apiKeyHeader =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeader, apiKey);
delegate = ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(headers));
}

private GenomicsChannel(GoogleCredentials creds) throws SSLException {
managedChannel = getGenomicsManagedChannel();
creds = creds.createScoped(
Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
delegate = ClientInterceptors.intercept(managedChannel, interceptor);
}

@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> arg0, CallOptions arg1) {
return delegate.newCall(arg0, arg1);
// TODO https://github.com/googlegenomics/utils-java/issues/48
return getGenomicsManagedChannel(Collections.singletonList(MetadataUtils
.newAttachHeadersInterceptor(headers)));
}

/**
* @see io.grpc.ManagedChannel#shutdownNow()
*/
public void shutdownNow() {
managedChannel.shutdownNow();
}

/**
* @throws InterruptedException
* @see io.grpc.ManagedChannel#shutdown()
* @see io.grpc.ManagedChannel#awaitTermination(long, TimeUnit)
* Create a new gRPC channel to the Google Genomics API, using the provided credentials for auth.
*
* @param creds the credential
* @return the ManagedChannel
* @throws SSLException
*/
public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
managedChannel.shutdown().awaitTermination(timeout, unit);
public static ManagedChannel fromCreds(GoogleCredentials creds) throws SSLException {
ClientInterceptor interceptor =
new ClientAuthInterceptor(creds.createScoped(Arrays.asList(GENOMICS_SCOPE)),
Executors.newSingleThreadExecutor());
// TODO https://github.com/googlegenomics/utils-java/issues/48
return getGenomicsManagedChannel(Collections.singletonList(interceptor));
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the application
* default credentials for auth.
* Create a new gRPC channel to the Google Genomics API, using the application default credentials
* for auth.
*
* @return the ManagedChannel
* @throws SSLException
* @throws IOException
*/
public static GenomicsChannel fromDefaultCreds() throws IOException {
public static ManagedChannel fromDefaultCreds() throws SSLException, IOException {
return fromCreds(GoogleCredentials.getApplicationDefault());
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* api key for auth.
*/
public static GenomicsChannel fromApiKey(String apiKey) throws SSLException {
return new GenomicsChannel(apiKey);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* credentials for auth.
*/
public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException {
return new GenomicsChannel(creds);
}


/**
* Initialize auth for a gRPC channel from OfflineAuth or the application default credentials.
* Create a new gRPC channel to the Google Genomics API, using OfflineAuth or the application
* default credentials.
*
* This library works with both the older and newer support for OAuth2 clients.
*
* https://developers.google.com/identity/protocols/application-default-credentials
*
* @param auth An OfflineAuth object.
* @return The gRPC channel authorized using either the information in the OfflineAuth or application default credentials.
* @param auth the OfflineAuth object
* @return the ManagedChannel
* @throws IOException
* @throws GeneralSecurityException
*/
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
if(auth.hasUserCredentials()) {
public static ManagedChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth)
throws IOException, GeneralSecurityException {
if (auth.hasUserCredentials()) {
return fromCreds(auth.getUserCredentials());
} else if(auth.hasApiKey()) {
} else if (auth.hasApiKey()) {
return fromApiKey(auth.apiKey);
}
// Fall back to Default Credentials if the user did not specify user credentials or an api key.
return fromDefaultCreds();
}

@Override
public String authority() {
return delegate.authority();
}
}

Expand Up @@ -13,16 +13,16 @@
*/
package com.google.cloud.genomics.utils.grpc;

import io.grpc.ManagedChannel;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
Expand All @@ -33,7 +33,7 @@
* Includes complex retry logic to upon failure resume the stream at the last known good start
* position without returning duplicate data.
*
* TODO: refactor this further to simplify the generic signature.
* TODO: refactor this further to simplify the generic signature.
*
* @param <Request> Streaming request type.
* @param <Response> Streaming response type.
Expand All @@ -43,12 +43,14 @@
public abstract class GenomicsStreamIterator<Request, Response, Item, Stub extends io.grpc.stub.AbstractStub<Stub>>
implements Iterator<Response> {
private static final Logger LOG = Logger.getLogger(GenomicsStreamIterator.class.getName());
private final ExponentialBackOff backoff;
private final GenomicsChannel genomicsChannel;
private final Predicate<Item> shardPredicate;

protected final ManagedChannel genomicsChannel;
protected final Predicate<Item> shardPredicate;
protected final Stub stub;
protected final Request originalRequest;

protected ExponentialBackOff backoff;

// Stateful members used to facilitate complex retry behavior for gRPC streams.
private Iterator<Response> delegate;
private Item lastSuccessfulDataItem;
Expand All @@ -57,20 +59,19 @@ public abstract class GenomicsStreamIterator<Request, Response, Item, Stub exten
/**
* Create a stream iterator that will filter shard data using the predicate, if supplied.
*
* @param channel The channel.
* @param request The request for the shard of data.
* @param auth The OfflineAuth to use for the request.
* @param fields Which fields to include in a partial response or null for all. NOT YET
* IMPLEMENTED.
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce
* a shard boundary and/or limit to SNPs only) or null for no filtering.
* @throws IOException
* @throws GeneralSecurityException
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce a
* shard boundary and/or limit to SNPs only) or null for no filtering.
*/
public GenomicsStreamIterator(Request request, GenomicsFactory.OfflineAuth auth, String fields,
Predicate<Item> shardPredicate) throws IOException, GeneralSecurityException {

protected GenomicsStreamIterator(ManagedChannel channel, Request request, String fields,
Predicate<Item> shardPredicate) {
this.originalRequest = request;
this.shardPredicate = shardPredicate;
genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
this.genomicsChannel = channel;
stub = createStub(genomicsChannel);

// Using default backoff settings. For details, see
Expand All @@ -83,7 +84,7 @@ public GenomicsStreamIterator(Request request, GenomicsFactory.OfflineAuth auth,
idSentinel = null;
}

abstract Stub createStub(GenomicsChannel genomicsChannel);
abstract Stub createStub(ManagedChannel genomicsChannel);

abstract Iterator<Response> createIteratorFromStub(Request request);

Expand Down Expand Up @@ -168,6 +169,7 @@ private void setStreamStateForRetry() {
// We have never returned any data. No need to set up state needed to filter previously
// returned results.
delegate = createIterator(originalRequest);
return;
}

if (getRequestStart(originalRequest) < getDataItemStart(lastSuccessfulDataItem)) {
Expand Down

0 comments on commit 711c10a

Please sign in to comment.