From 042a40622aea0094b98202fed865364e4836a335 Mon Sep 17 00:00:00 2001 From: Nicole Deflaux Date: Mon, 17 Aug 2015 18:21:03 -0700 Subject: [PATCH 1/2] Facilitate explicit channel shutdown. --- .../cloud/genomics/utils/grpc/Example.java | 28 +++-- .../{Channels.java => GenomicsChannel.java} | 116 ++++++++++++------ .../utils/grpc/ReadStreamIterator.java | 27 +++- .../utils/grpc/VariantStreamIterator.java | 27 +++- 4 files changed, 143 insertions(+), 55 deletions(-) rename src/main/java/com/google/cloud/genomics/utils/grpc/{Channels.java => GenomicsChannel.java} (57%) diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java b/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java index 1d97183..3ac0aa7 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java @@ -1,7 +1,5 @@ package com.google.cloud.genomics.utils.grpc; -import io.grpc.Channel; - import java.io.FileNotFoundException; import java.util.Iterator; @@ -18,6 +16,7 @@ import com.google.genomics.v1.StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub; public class Example { + public static void main(String[] args) throws Exception { final String clientSecretsJson = "client_secrets.json"; GenomicsFactory.OfflineAuth auth = null; @@ -35,11 +34,11 @@ public static void main(String[] args) throws Exception { return; } - Channel channel = Channels.fromOfflineAuth(auth); + GenomicsChannel channel = GenomicsChannel.fromOfflineAuth(auth); // Regular RPC example: list all reference set assembly ids. ReferenceServiceV1BlockingStub refStub = - ReferenceServiceV1Grpc.newBlockingStub(channel); + ReferenceServiceV1Grpc.newBlockingStub(channel.getChannel()); SearchReferenceSetsRequest request = SearchReferenceSetsRequest.newBuilder().build(); SearchReferenceSetsResponse response = refStub.searchReferenceSets(request); @@ -49,7 +48,7 @@ public static void main(String[] args) throws Exception { // Streaming RPC example: request the variants within BRCA1 for the Platinum Genomes variant set. StreamingVariantServiceBlockingStub varStub = - StreamingVariantServiceGrpc.newBlockingStub(channel); + StreamingVariantServiceGrpc.newBlockingStub(channel.getChannel()); StreamVariantsRequest varRequest = StreamVariantsRequest.newBuilder() .setVariantSetId("3049512673186936334") .setReferenceName("chr17") @@ -57,14 +56,17 @@ public static void main(String[] args) throws Exception { .setEnd(41277499) .build(); - Iterator iter = varStub.streamVariants(varRequest); - while (iter.hasNext()) { - StreamVariantsResponse varResponse = iter.next(); - System.out.println("Response:"); - System.out.println(varResponse.toString()); - System.out.println(); + try { + Iterator iter = varStub.streamVariants(varRequest); + while (iter.hasNext()) { + StreamVariantsResponse varResponse = iter.next(); + System.out.println("Response:"); + System.out.println(varResponse.toString()); + System.out.println(); + } + System.out.println("Done"); + } finally { + channel.shutdownNow(); } - System.out.println("Done"); - System.exit(0); } } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/Channels.java b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java similarity index 57% rename from src/main/java/com/google/cloud/genomics/utils/grpc/Channels.java rename to src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java index e528acc..faaff6c 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/Channels.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java @@ -4,6 +4,7 @@ import com.google.cloud.genomics.utils.GenomicsFactory; import io.grpc.Channel; +import io.grpc.ChannelImpl; import io.grpc.ClientInterceptors; import io.grpc.Metadata; import io.grpc.auth.ClientAuthInterceptor; @@ -18,21 +19,92 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; /** * A convenience class for creating gRPC channels to the Google Genomics API. */ -public class Channels { +public class GenomicsChannel { + private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com"; + private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics"; // TODO: This constant should come from grpc-java. private static final String API_KEY_HEADER = "X-Goog-Api-Key"; + // NOTE: Unfortunately we need to keep a handle to both of these since Channel does not expose + // the shutdown method and the ClientInterceptors do not return the ChannelImpl instance. + private final ChannelImpl channelImpl; + private final Channel channel; + + private ChannelImpl getGenomicsChannelImpl() throws SSLException { + // Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable + // them here. + List defaultCiphers = + GrpcSslContexts.forClient().ciphers(null).build().cipherSuites(); + List performantCiphers = new ArrayList<>(); + for (String cipher : defaultCiphers) { + if (!cipher.contains("GCM")) { + performantCiphers.add(cipher); + } + } + + return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443) + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build()) + .build(); + } + + private GenomicsChannel(String apiKey) throws SSLException { + channelImpl = getGenomicsChannelImpl(); + Metadata.Headers headers = new Metadata.Headers(); + Metadata.Key apiKeyHeaderKey = + Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER); + headers.put(apiKeyHeaderKey, apiKey); + channel = ClientInterceptors.intercept(channelImpl, + MetadataUtils.newAttachHeadersInterceptor(headers)); + } + + private GenomicsChannel(GoogleCredentials creds) throws SSLException { + channelImpl = getGenomicsChannelImpl(); + creds = creds.createScoped( + Arrays.asList(GENOMICS_SCOPE)); + ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds, + Executors.newSingleThreadExecutor()); + channel = ClientInterceptors.intercept(channelImpl, interceptor); + } + + /** + * Get the underlying gRPC channel. + * + * This is needed to pass to gRPC client factory methods. + * @return The Channel object. + */ + public Channel getChannel() { + return this.channel; + } + + /** + * @see io.grpc.ChannelImpl#shutdownNow() + */ + public void shutdownNow() { + channelImpl.shutdownNow(); + } + + /** + * @throws InterruptedException + * @see io.grpc.ChannelImpl#shutdown() + * @see io.grpc.ChannelImpl#awaitTermination(long, TimeUnit) + */ + public void shutdown(long timeout, TimeUnit unit) throws InterruptedException { + channelImpl.shutdown().awaitTermination(timeout, unit); + } + /** * Creates a new gRPC channel to the Google Genomics API, using the application * default credentials for auth. */ - public static Channel fromDefaultCreds() throws IOException { + public static GenomicsChannel fromDefaultCreds() throws IOException { return fromCreds(GoogleCredentials.getApplicationDefault()); } @@ -40,25 +112,16 @@ public static Channel fromDefaultCreds() throws IOException { * Creates a new gRPC channel to the Google Genomics API, using the provided * api key for auth. */ - public static Channel fromApiKey(String apiKey) throws SSLException { - Metadata.Headers headers = new Metadata.Headers(); - Metadata.Key apiKeyHeaderKey = - Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER); - headers.put(apiKeyHeaderKey, apiKey); - return ClientInterceptors.intercept(getGenomicsChannel(), - MetadataUtils.newAttachHeadersInterceptor(headers)); + public static GenomicsChannel fromApiKey(String apiKey) throws SSLException { + return new GenomicsChannel(apiKey); } /** * Creates a new gRPC channel to the Google Genomics API, using the provided * credentials for auth. */ - public static Channel fromCreds(GoogleCredentials creds) throws IOException { - creds = creds.createScoped( - Arrays.asList("https://www.googleapis.com/auth/genomics")); - ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds, - Executors.newSingleThreadExecutor()); - return ClientInterceptors.intercept(getGenomicsChannel(), interceptor); + public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException { + return new GenomicsChannel(creds); } /** @@ -73,34 +136,15 @@ public static Channel fromCreds(GoogleCredentials creds) throws IOException { * @throws IOException * @throws GeneralSecurityException */ - public static Channel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException { + public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException { if(auth.hasUserCredentials()) { return fromCreds(auth.getUserCredentials()); // TODO: https://github.com/googlegenomics/utils-java/issues/51 // } else if(auth.hasApiKey()) { -// return fromApiKey(auth.apiKey); +// return new Channels(auth.apiKey); } // Fall back to Default Credentials if the user did not specify user credentials or an api key. return fromDefaultCreds(); } - - private static Channel getGenomicsChannel() throws SSLException { - // Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable - // them here. - List defaultCiphers = - GrpcSslContexts.forClient().ciphers(null).build().cipherSuites(); - List performantCiphers = new ArrayList<>(); - for (String cipher : defaultCiphers) { - if (!cipher.contains("GCM")) { - performantCiphers.add(cipher); - } - } - - Channel channel = NettyChannelBuilder.forAddress("genomics.googleapis.com", 443) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build()) - .build(); - return channel; - } } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java b/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java index e5645d0..dcdf20c 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java @@ -36,7 +36,8 @@ */ public class ReadStreamIterator extends ForwardingIterator { protected final Predicate shardPredicate; - private Iterator delegate; + protected final Iterator delegate; + protected final GenomicsChannel genomicsChannel; /** * Create a stream iterator that can enforce shard boundary semantics. @@ -56,8 +57,9 @@ public ReadStreamIterator(StreamReadsRequest request, GenomicsFactory.OfflineAut ShardBoundary.getStrictReadPredicate(request.getStart()) : null; + genomicsChannel = GenomicsChannel.fromOfflineAuth(auth); StreamingReadServiceGrpc.StreamingReadServiceBlockingStub readStub = - StreamingReadServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth)); + StreamingReadServiceGrpc.newBlockingStub(genomicsChannel.getChannel()); delegate = readStub.streamReads(request); } @@ -67,11 +69,30 @@ protected Iterator delegate() { return delegate; } + /** + * @see java.util.Iterator#hasNext() + */ + public boolean hasNext() { + try { + return delegate.hasNext(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + } + /** * @see java.util.Iterator#next() */ public StreamReadsResponse next() { - StreamReadsResponse response = delegate.next(); + StreamReadsResponse response = null; + try { + response = delegate.next(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + if(null == shardPredicate) { return response; } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java b/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java index b9d0fc7..0896d40 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java @@ -36,7 +36,8 @@ */ public class VariantStreamIterator extends ForwardingIterator { protected final Predicate shardPredicate; - protected Iterator delegate; + protected final Iterator delegate; + protected final GenomicsChannel genomicsChannel; /** * Create a stream iterator that can enforce shard boundary semantics. @@ -56,8 +57,9 @@ public VariantStreamIterator(StreamVariantsRequest request, GenomicsFactory.Offl ShardBoundary.getStrictVariantPredicate(request.getStart()) : null; + genomicsChannel = GenomicsChannel.fromOfflineAuth(auth); StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub variantStub = - StreamingVariantServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth)); + StreamingVariantServiceGrpc.newBlockingStub(genomicsChannel.getChannel()); delegate = variantStub.streamVariants(request); @@ -68,11 +70,30 @@ protected Iterator delegate() { return delegate; } + /** + * @see java.util.Iterator#hasNext() + */ + public boolean hasNext() { + try { + return delegate.hasNext(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + } + /** * @see java.util.Iterator#next() */ public StreamVariantsResponse next() { - StreamVariantsResponse response = delegate.next(); + StreamVariantsResponse response = null; + try { + response = delegate.next(); + } catch (RuntimeException e) { + genomicsChannel.shutdownNow(); + throw e; + } + if(null == shardPredicate) { return response; } From 0e8d4ab245c48ba5b6402c555dc5d6f1c46f1642 Mon Sep 17 00:00:00 2001 From: Nicole Deflaux Date: Tue, 18 Aug 2015 10:46:38 -0700 Subject: [PATCH 2/2] Extend Channel. --- .../cloud/genomics/utils/grpc/Example.java | 4 +-- .../genomics/utils/grpc/GenomicsChannel.java | 25 +++++++++---------- .../utils/grpc/ReadStreamIterator.java | 2 +- .../utils/grpc/VariantStreamIterator.java | 2 +- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java b/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java index 3ac0aa7..fec04fb 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/Example.java @@ -38,7 +38,7 @@ public static void main(String[] args) throws Exception { // Regular RPC example: list all reference set assembly ids. ReferenceServiceV1BlockingStub refStub = - ReferenceServiceV1Grpc.newBlockingStub(channel.getChannel()); + ReferenceServiceV1Grpc.newBlockingStub(channel); SearchReferenceSetsRequest request = SearchReferenceSetsRequest.newBuilder().build(); SearchReferenceSetsResponse response = refStub.searchReferenceSets(request); @@ -48,7 +48,7 @@ public static void main(String[] args) throws Exception { // Streaming RPC example: request the variants within BRCA1 for the Platinum Genomes variant set. StreamingVariantServiceBlockingStub varStub = - StreamingVariantServiceGrpc.newBlockingStub(channel.getChannel()); + StreamingVariantServiceGrpc.newBlockingStub(channel); StreamVariantsRequest varRequest = StreamVariantsRequest.newBuilder() .setVariantSetId("3049512673186936334") .setReferenceName("chr17") diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java index faaff6c..6a261d4 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsChannel.java @@ -3,10 +3,13 @@ import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.genomics.utils.GenomicsFactory; +import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ChannelImpl; +import io.grpc.ClientCall; import io.grpc.ClientInterceptors; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.auth.ClientAuthInterceptor; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; @@ -26,7 +29,7 @@ /** * A convenience class for creating gRPC channels to the Google Genomics API. */ -public class GenomicsChannel { +public class GenomicsChannel extends Channel { private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com"; private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics"; // TODO: This constant should come from grpc-java. @@ -35,7 +38,7 @@ public class GenomicsChannel { // NOTE: Unfortunately we need to keep a handle to both of these since Channel does not expose // the shutdown method and the ClientInterceptors do not return the ChannelImpl instance. private final ChannelImpl channelImpl; - private final Channel channel; + private final Channel delegate; private ChannelImpl getGenomicsChannelImpl() throws SSLException { // Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable @@ -61,7 +64,7 @@ private GenomicsChannel(String apiKey) throws SSLException { Metadata.Key apiKeyHeaderKey = Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER); headers.put(apiKeyHeaderKey, apiKey); - channel = ClientInterceptors.intercept(channelImpl, + delegate = ClientInterceptors.intercept(channelImpl, MetadataUtils.newAttachHeadersInterceptor(headers)); } @@ -71,19 +74,15 @@ private GenomicsChannel(GoogleCredentials creds) throws SSLException { Arrays.asList(GENOMICS_SCOPE)); ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds, Executors.newSingleThreadExecutor()); - channel = ClientInterceptors.intercept(channelImpl, interceptor); + delegate = ClientInterceptors.intercept(channelImpl, interceptor); } - /** - * Get the underlying gRPC channel. - * - * This is needed to pass to gRPC client factory methods. - * @return The Channel object. - */ - public Channel getChannel() { - return this.channel; + @Override + public ClientCall newCall( + MethodDescriptor arg0, CallOptions arg1) { + return delegate.newCall(arg0, arg1); } - + /** * @see io.grpc.ChannelImpl#shutdownNow() */ diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java b/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java index dcdf20c..c62f0e1 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/ReadStreamIterator.java @@ -59,7 +59,7 @@ public ReadStreamIterator(StreamReadsRequest request, GenomicsFactory.OfflineAut genomicsChannel = GenomicsChannel.fromOfflineAuth(auth); StreamingReadServiceGrpc.StreamingReadServiceBlockingStub readStub = - StreamingReadServiceGrpc.newBlockingStub(genomicsChannel.getChannel()); + StreamingReadServiceGrpc.newBlockingStub(genomicsChannel); delegate = readStub.streamReads(request); } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java b/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java index 0896d40..75aa401 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/VariantStreamIterator.java @@ -59,7 +59,7 @@ public VariantStreamIterator(StreamVariantsRequest request, GenomicsFactory.Offl genomicsChannel = GenomicsChannel.fromOfflineAuth(auth); StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub variantStub = - StreamingVariantServiceGrpc.newBlockingStub(genomicsChannel.getChannel()); + StreamingVariantServiceGrpc.newBlockingStub(genomicsChannel); delegate = variantStub.streamVariants(request);