diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java b/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java index 1d97183..fec04fb 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java @@ -1,7 +1,5 @@ package com.google.cloud.genomics.utils.grpc; -import io.grpc.Channel; - import java.io.FileNotFoundException; import java.util.Iterator; @@ -18,6 +16,7 @@ import com.google.genomics.v1.StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub; public class Example { + public static void main(String[] args) throws Exception { final String clientSecretsJson = "client_secrets.json"; GenomicsFactory.OfflineAuth auth = null; @@ -35,7 +34,7 @@ public static void main(String[] args) throws Exception { return; } - Channel channel = Channels.fromOfflineAuth(auth); + GenomicsChannel channel = GenomicsChannel.fromOfflineAuth(auth); // Regular RPC example: list all reference set assembly ids. ReferenceServiceV1BlockingStub refStub = @@ -57,14 +56,17 @@ public static void main(String[] args) throws Exception { .setEnd(41277499) .build(); - Iterator iter = varStub.streamVariants(varRequest); - while (iter.hasNext()) { - StreamVariantsResponse varResponse = iter.next(); - System.out.println("Response:"); - System.out.println(varResponse.toString()); - System.out.println(); + try { + Iterator iter = varStub.streamVariants(varRequest); + while (iter.hasNext()) { + StreamVariantsResponse varResponse = iter.next(); + System.out.println("Response:"); + System.out.println(varResponse.toString()); + System.out.println(); + } + System.out.println("Done"); + } finally { + channel.shutdownNow(); } - System.out.println("Done"); - System.exit(0); } } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/Channels.java b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java similarity index 56% rename from src/main/java/com/google/cloud/genomics/utils/grpc/Channels.java rename to src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java index e528acc..6a261d4 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/Channels.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java @@ -3,9 +3,13 @@ import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.genomics.utils.GenomicsFactory; +import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelImpl; +import io.grpc.ClientCall; import io.grpc.ClientInterceptors; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.auth.ClientAuthInterceptor; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; @@ -18,21 +22,88 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; /** * A convenience class for creating gRPC channels to the Google Genomics API. */ -public class Channels { +public class GenomicsChannel extends Channel { + private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com"; + private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics"; // TODO: This constant should come from grpc-java. private static final String API_KEY_HEADER = "X-Goog-Api-Key"; + // NOTE: Unfortunately we need to keep a handle to both of these since Channel does not expose + // the shutdown method and the ClientInterceptors do not return the ChannelImpl instance. + private final ChannelImpl channelImpl; + private final Channel delegate; + + private ChannelImpl getGenomicsChannelImpl() throws SSLException { + // Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable + // them here. + List defaultCiphers = + GrpcSslContexts.forClient().ciphers(null).build().cipherSuites(); + List performantCiphers = new ArrayList<>(); + for (String cipher : defaultCiphers) { + if (!cipher.contains("GCM")) { + performantCiphers.add(cipher); + } + } + + return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443) + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build()) + .build(); + } + + private GenomicsChannel(String apiKey) throws SSLException { + channelImpl = getGenomicsChannelImpl(); + Metadata.Headers headers = new Metadata.Headers(); + Metadata.Key apiKeyHeaderKey = + Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER); + headers.put(apiKeyHeaderKey, apiKey); + delegate = ClientInterceptors.intercept(channelImpl, + MetadataUtils.newAttachHeadersInterceptor(headers)); + } + + private GenomicsChannel(GoogleCredentials creds) throws SSLException { + channelImpl = getGenomicsChannelImpl(); + creds = creds.createScoped( + Arrays.asList(GENOMICS_SCOPE)); + ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds, + Executors.newSingleThreadExecutor()); + delegate = ClientInterceptors.intercept(channelImpl, interceptor); + } + + @Override + public ClientCall newCall( + MethodDescriptor arg0, CallOptions arg1) { + return delegate.newCall(arg0, arg1); + } + + /** + * @see io.grpc.ChannelImpl#shutdownNow() + */ + public void shutdownNow() { + channelImpl.shutdownNow(); + } + + /** + * @throws InterruptedException + * @see io.grpc.ChannelImpl#shutdown() + * @see io.grpc.ChannelImpl#awaitTermination(long, TimeUnit) + */ + public void shutdown(long timeout, TimeUnit unit) throws InterruptedException { + channelImpl.shutdown().awaitTermination(timeout, unit); + } + /** * Creates a new gRPC channel to the Google Genomics API, using the application * default credentials for auth. */ - public static Channel fromDefaultCreds() throws IOException { + public static GenomicsChannel fromDefaultCreds() throws IOException { return fromCreds(GoogleCredentials.getApplicationDefault()); } @@ -40,25 +111,16 @@ public static Channel fromDefaultCreds() throws IOException { * Creates a new gRPC channel to the Google Genomics API, using the provided * api key for auth. */ - public static Channel fromApiKey(String apiKey) throws SSLException { - Metadata.Headers headers = new Metadata.Headers(); - Metadata.Key apiKeyHeaderKey = - Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER); - headers.put(apiKeyHeaderKey, apiKey); - return ClientInterceptors.intercept(getGenomicsChannel(), - MetadataUtils.newAttachHeadersInterceptor(headers)); + public static GenomicsChannel fromApiKey(String apiKey) throws SSLException { + return new GenomicsChannel(apiKey); } /** * Creates a new gRPC channel to the Google Genomics API, using the provided * credentials for auth. */ - public static Channel fromCreds(GoogleCredentials creds) throws IOException { - creds = creds.createScoped( - Arrays.asList("https://www.googleapis.com/auth/genomics")); - ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds, - Executors.newSingleThreadExecutor()); - return ClientInterceptors.intercept(getGenomicsChannel(), interceptor); + public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException { + return new GenomicsChannel(creds); } /** @@ -73,34 +135,15 @@ public static Channel fromCreds(GoogleCredentials creds) throws IOException { * @throws IOException * @throws GeneralSecurityException */ - public static Channel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException { + public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException { if(auth.hasUserCredentials()) { return fromCreds(auth.getUserCredentials()); // TODO: https://github.com/googlegenomics/utils-java/issues/51 // } else if(auth.hasApiKey()) { -// return fromApiKey(auth.apiKey); +// return new Channels(auth.apiKey); } // Fall back to Default Credentials if the user did not specify user credentials or an api key. return fromDefaultCreds(); } - - private static Channel getGenomicsChannel() throws SSLException { - // Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable - // them here. - List defaultCiphers = - GrpcSslContexts.forClient().ciphers(null).build().cipherSuites(); - List performantCiphers = new ArrayList<>(); - for (String cipher : defaultCiphers) { - if (!cipher.contains("GCM")) { - performantCiphers.add(cipher); - } - } - - Channel channel = NettyChannelBuilder.forAddress("genomics.googleapis.com", 443) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build()) - .build(); - return channel; - } } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java b/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java index e5645d0..c62f0e1 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java @@ -36,7 +36,8 @@ */ public class ReadStreamIterator extends ForwardingIterator { protected final Predicate shardPredicate; - private Iterator delegate; + protected final Iterator delegate; + protected final GenomicsChannel genomicsChannel; /** * Create a stream iterator that can enforce shard boundary semantics. @@ -56,8 +57,9 @@ public ReadStreamIterator(StreamReadsRequest request, GenomicsFactory.OfflineAut ShardBoundary.getStrictReadPredicate(request.getStart()) : null; + genomicsChannel = GenomicsChannel.fromOfflineAuth(auth); StreamingReadServiceGrpc.StreamingReadServiceBlockingStub readStub = - StreamingReadServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth)); + StreamingReadServiceGrpc.newBlockingStub(genomicsChannel); delegate = readStub.streamReads(request); } @@ -67,11 +69,30 @@ protected Iterator delegate() { return delegate; } + /** + * @see java.util.Iterator#hasNext() + */ + public boolean hasNext() { + try { + return delegate.hasNext(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + } + /** * @see java.util.Iterator#next() */ public StreamReadsResponse next() { - StreamReadsResponse response = delegate.next(); + StreamReadsResponse response = null; + try { + response = delegate.next(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + if(null == shardPredicate) { return response; } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java b/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java index b9d0fc7..75aa401 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java @@ -36,7 +36,8 @@ */ public class VariantStreamIterator extends ForwardingIterator { protected final Predicate shardPredicate; - protected Iterator delegate; + protected final Iterator delegate; + protected final GenomicsChannel genomicsChannel; /** * Create a stream iterator that can enforce shard boundary semantics. @@ -56,8 +57,9 @@ public VariantStreamIterator(StreamVariantsRequest request, GenomicsFactory.Offl ShardBoundary.getStrictVariantPredicate(request.getStart()) : null; + genomicsChannel = GenomicsChannel.fromOfflineAuth(auth); StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub variantStub = - StreamingVariantServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth)); + StreamingVariantServiceGrpc.newBlockingStub(genomicsChannel); delegate = variantStub.streamVariants(request); @@ -68,11 +70,30 @@ protected Iterator delegate() { return delegate; } + /** + * @see java.util.Iterator#hasNext() + */ + public boolean hasNext() { + try { + return delegate.hasNext(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + } + /** * @see java.util.Iterator#next() */ public StreamVariantsResponse next() { - StreamVariantsResponse response = delegate.next(); + StreamVariantsResponse response = null; + try { + response = delegate.next(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + if(null == shardPredicate) { return response; }