Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

Facilitate explicit channel shutdown. #58

Merged
merged 2 commits into from
Aug 18, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions src/main/java/com/google/cloud/genomics/utils/grpc/Example.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.google.cloud.genomics.utils.grpc;

import io.grpc.Channel;

import java.io.FileNotFoundException;
import java.util.Iterator;

Expand All @@ -18,6 +16,7 @@
import com.google.genomics.v1.StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub;

public class Example {

public static void main(String[] args) throws Exception {
final String clientSecretsJson = "client_secrets.json";
GenomicsFactory.OfflineAuth auth = null;
Expand All @@ -35,11 +34,11 @@ public static void main(String[] args) throws Exception {
return;
}

Channel channel = Channels.fromOfflineAuth(auth);
GenomicsChannel channel = GenomicsChannel.fromOfflineAuth(auth);

// Regular RPC example: list all reference set assembly ids.
ReferenceServiceV1BlockingStub refStub =
ReferenceServiceV1Grpc.newBlockingStub(channel);
ReferenceServiceV1Grpc.newBlockingStub(channel.getChannel());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channel.getChannel() in particular feels pretty awkward. Maybe it would be better to have GenomicsChannel extend Channel so it could be used directly as a gRPC channel. There is only one abstract method which you can delegate to the gRPC channel you are wrapping.

SearchReferenceSetsRequest request =
SearchReferenceSetsRequest.newBuilder().build();
SearchReferenceSetsResponse response = refStub.searchReferenceSets(request);
Expand All @@ -49,22 +48,25 @@ public static void main(String[] args) throws Exception {

// Streaming RPC example: request the variants within BRCA1 for the Platinum Genomes variant set.
StreamingVariantServiceBlockingStub varStub =
StreamingVariantServiceGrpc.newBlockingStub(channel);
StreamingVariantServiceGrpc.newBlockingStub(channel.getChannel());
StreamVariantsRequest varRequest = StreamVariantsRequest.newBuilder()
.setVariantSetId("3049512673186936334")
.setReferenceName("chr17")
.setStart(41196311)
.setEnd(41277499)
.build();

Iterator<StreamVariantsResponse> iter = varStub.streamVariants(varRequest);
while (iter.hasNext()) {
StreamVariantsResponse varResponse = iter.next();
System.out.println("Response:");
System.out.println(varResponse.toString());
System.out.println();
try {
Iterator<StreamVariantsResponse> iter = varStub.streamVariants(varRequest);
while (iter.hasNext()) {
StreamVariantsResponse varResponse = iter.next();
System.out.println("Response:");
System.out.println(varResponse.toString());
System.out.println();
}
System.out.println("Done");
} finally {
channel.shutdownNow();
}
System.out.println("Done");
System.exit(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.cloud.genomics.utils.GenomicsFactory;

import io.grpc.Channel;
import io.grpc.ChannelImpl;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.auth.ClientAuthInterceptor;
Expand All @@ -18,47 +19,109 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLException;

/**
* A convenience class for creating gRPC channels to the Google Genomics API.
*/
public class Channels {
public class GenomicsChannel {
private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com";
private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics";
// TODO: This constant should come from grpc-java.
private static final String API_KEY_HEADER = "X-Goog-Api-Key";

// NOTE: Unfortunately we need to keep a handle to both of these since Channel does not expose
// the shutdown method and the ClientInterceptors do not return the ChannelImpl instance.
private final ChannelImpl channelImpl;
private final Channel channel;

private ChannelImpl getGenomicsChannelImpl() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
performantCiphers.add(cipher);
}
}

return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
}

private GenomicsChannel(String apiKey) throws SSLException {
channelImpl = getGenomicsChannelImpl();
Metadata.Headers headers = new Metadata.Headers();
Metadata.Key<String> apiKeyHeaderKey =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeaderKey, apiKey);
channel = ClientInterceptors.intercept(channelImpl,
MetadataUtils.newAttachHeadersInterceptor(headers));
}

private GenomicsChannel(GoogleCredentials creds) throws SSLException {
channelImpl = getGenomicsChannelImpl();
creds = creds.createScoped(
Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
channel = ClientInterceptors.intercept(channelImpl, interceptor);
}

/**
* Get the underlying gRPC channel.
*
* This is needed to pass to gRPC client factory methods.
* @return The Channel object.
*/
public Channel getChannel() {
return this.channel;
}

/**
* @see io.grpc.ChannelImpl#shutdownNow()
*/
public void shutdownNow() {
channelImpl.shutdownNow();
}

/**
* @throws InterruptedException
* @see io.grpc.ChannelImpl#shutdown()
* @see io.grpc.ChannelImpl#awaitTermination(long, TimeUnit)
*/
public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
channelImpl.shutdown().awaitTermination(timeout, unit);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the application
* default credentials for auth.
*/
public static Channel fromDefaultCreds() throws IOException {
public static GenomicsChannel fromDefaultCreds() throws IOException {
return fromCreds(GoogleCredentials.getApplicationDefault());
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* api key for auth.
*/
public static Channel fromApiKey(String apiKey) throws SSLException {
Metadata.Headers headers = new Metadata.Headers();
Metadata.Key<String> apiKeyHeaderKey =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeaderKey, apiKey);
return ClientInterceptors.intercept(getGenomicsChannel(),
MetadataUtils.newAttachHeadersInterceptor(headers));
public static GenomicsChannel fromApiKey(String apiKey) throws SSLException {
return new GenomicsChannel(apiKey);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* credentials for auth.
*/
public static Channel fromCreds(GoogleCredentials creds) throws IOException {
creds = creds.createScoped(
Arrays.asList("https://www.googleapis.com/auth/genomics"));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
return ClientInterceptors.intercept(getGenomicsChannel(), interceptor);
public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException {
return new GenomicsChannel(creds);
}

/**
Expand All @@ -73,34 +136,15 @@ public static Channel fromCreds(GoogleCredentials creds) throws IOException {
* @throws IOException
* @throws GeneralSecurityException
*/
public static Channel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
if(auth.hasUserCredentials()) {
return fromCreds(auth.getUserCredentials());
// TODO: https://github.com/googlegenomics/utils-java/issues/51
// } else if(auth.hasApiKey()) {
// return fromApiKey(auth.apiKey);
// return new Channels(auth.apiKey);
}
// Fall back to Default Credentials if the user did not specify user credentials or an api key.
return fromDefaultCreds();
}

private static Channel getGenomicsChannel() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
performantCiphers.add(cipher);
}
}

Channel channel = NettyChannelBuilder.forAddress("genomics.googleapis.com", 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
return channel;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public class ReadStreamIterator extends ForwardingIterator<StreamReadsResponse> {
protected final Predicate<Read> shardPredicate;
private Iterator<StreamReadsResponse> delegate;
protected final Iterator<StreamReadsResponse> delegate;
protected final GenomicsChannel genomicsChannel;

/**
* Create a stream iterator that can enforce shard boundary semantics.
Expand All @@ -56,8 +57,9 @@ public ReadStreamIterator(StreamReadsRequest request, GenomicsFactory.OfflineAut
ShardBoundary.getStrictReadPredicate(request.getStart()) :
null;

genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
StreamingReadServiceGrpc.StreamingReadServiceBlockingStub readStub =
StreamingReadServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth));
StreamingReadServiceGrpc.newBlockingStub(genomicsChannel.getChannel());

delegate = readStub.streamReads(request);
}
Expand All @@ -67,11 +69,30 @@ protected Iterator<StreamReadsResponse> delegate() {
return delegate;
}

/**
* @see java.util.Iterator#hasNext()
*/
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}
}

/**
* @see java.util.Iterator#next()
*/
public StreamReadsResponse next() {
StreamReadsResponse response = delegate.next();
StreamReadsResponse response = null;
try {
response = delegate.next();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}

if(null == shardPredicate) {
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public class VariantStreamIterator extends ForwardingIterator<StreamVariantsResponse> {
protected final Predicate<Variant> shardPredicate;
protected Iterator<StreamVariantsResponse> delegate;
protected final Iterator<StreamVariantsResponse> delegate;
protected final GenomicsChannel genomicsChannel;

/**
* Create a stream iterator that can enforce shard boundary semantics.
Expand All @@ -56,8 +57,9 @@ public VariantStreamIterator(StreamVariantsRequest request, GenomicsFactory.Offl
ShardBoundary.getStrictVariantPredicate(request.getStart()) :
null;

genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub variantStub =
StreamingVariantServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth));
StreamingVariantServiceGrpc.newBlockingStub(genomicsChannel.getChannel());


delegate = variantStub.streamVariants(request);
Expand All @@ -68,11 +70,30 @@ protected Iterator<StreamVariantsResponse> delegate() {
return delegate;
}

/**
* @see java.util.Iterator#hasNext()
*/
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}
}

/**
* @see java.util.Iterator#next()
*/
public StreamVariantsResponse next() {
StreamVariantsResponse response = delegate.next();
StreamVariantsResponse response = null;
try {
response = delegate.next();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}

if(null == shardPredicate) {
return response;
}
Expand Down