Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #58 from deflaux/master
Browse files Browse the repository at this point in the history
Facilitate explicit channel shutdown.
  • Loading branch information
ssgross committed Aug 18, 2015
2 parents ee0c927 + 0e8d4ab commit fedc4a2
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 53 deletions.
24 changes: 13 additions & 11 deletions src/main/java/com/google/cloud/genomics/utils/grpc/Example.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.google.cloud.genomics.utils.grpc;

import io.grpc.Channel;

import java.io.FileNotFoundException;
import java.util.Iterator;

Expand All @@ -18,6 +16,7 @@
import com.google.genomics.v1.StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub;

public class Example {

public static void main(String[] args) throws Exception {
final String clientSecretsJson = "client_secrets.json";
GenomicsFactory.OfflineAuth auth = null;
Expand All @@ -35,7 +34,7 @@ public static void main(String[] args) throws Exception {
return;
}

Channel channel = Channels.fromOfflineAuth(auth);
GenomicsChannel channel = GenomicsChannel.fromOfflineAuth(auth);

// Regular RPC example: list all reference set assembly ids.
ReferenceServiceV1BlockingStub refStub =
Expand All @@ -57,14 +56,17 @@ public static void main(String[] args) throws Exception {
.setEnd(41277499)
.build();

Iterator<StreamVariantsResponse> iter = varStub.streamVariants(varRequest);
while (iter.hasNext()) {
StreamVariantsResponse varResponse = iter.next();
System.out.println("Response:");
System.out.println(varResponse.toString());
System.out.println();
try {
Iterator<StreamVariantsResponse> iter = varStub.streamVariants(varRequest);
while (iter.hasNext()) {
StreamVariantsResponse varResponse = iter.next();
System.out.println("Response:");
System.out.println(varResponse.toString());
System.out.println();
}
System.out.println("Done");
} finally {
channel.shutdownNow();
}
System.out.println("Done");
System.exit(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelImpl;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand All @@ -18,47 +22,105 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLException;

/**
* A convenience class for creating gRPC channels to the Google Genomics API.
*/
public class Channels {
public class GenomicsChannel extends Channel {
private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com";
private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics";
// TODO: This constant should come from grpc-java.
private static final String API_KEY_HEADER = "X-Goog-Api-Key";

// NOTE: Unfortunately we need to keep a handle to both of these since Channel does not expose
// the shutdown method and the ClientInterceptors do not return the ChannelImpl instance.
private final ChannelImpl channelImpl;
private final Channel delegate;

private ChannelImpl getGenomicsChannelImpl() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
performantCiphers.add(cipher);
}
}

return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
}

private GenomicsChannel(String apiKey) throws SSLException {
channelImpl = getGenomicsChannelImpl();
Metadata.Headers headers = new Metadata.Headers();
Metadata.Key<String> apiKeyHeaderKey =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeaderKey, apiKey);
delegate = ClientInterceptors.intercept(channelImpl,
MetadataUtils.newAttachHeadersInterceptor(headers));
}

private GenomicsChannel(GoogleCredentials creds) throws SSLException {
channelImpl = getGenomicsChannelImpl();
creds = creds.createScoped(
Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
delegate = ClientInterceptors.intercept(channelImpl, interceptor);
}

@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> arg0, CallOptions arg1) {
return delegate.newCall(arg0, arg1);
}

/**
* @see io.grpc.ChannelImpl#shutdownNow()
*/
public void shutdownNow() {
channelImpl.shutdownNow();
}

/**
* @throws InterruptedException
* @see io.grpc.ChannelImpl#shutdown()
* @see io.grpc.ChannelImpl#awaitTermination(long, TimeUnit)
*/
public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
channelImpl.shutdown().awaitTermination(timeout, unit);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the application
* default credentials for auth.
*/
public static Channel fromDefaultCreds() throws IOException {
public static GenomicsChannel fromDefaultCreds() throws IOException {
return fromCreds(GoogleCredentials.getApplicationDefault());
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* api key for auth.
*/
public static Channel fromApiKey(String apiKey) throws SSLException {
Metadata.Headers headers = new Metadata.Headers();
Metadata.Key<String> apiKeyHeaderKey =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeaderKey, apiKey);
return ClientInterceptors.intercept(getGenomicsChannel(),
MetadataUtils.newAttachHeadersInterceptor(headers));
public static GenomicsChannel fromApiKey(String apiKey) throws SSLException {
return new GenomicsChannel(apiKey);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* credentials for auth.
*/
public static Channel fromCreds(GoogleCredentials creds) throws IOException {
creds = creds.createScoped(
Arrays.asList("https://www.googleapis.com/auth/genomics"));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
return ClientInterceptors.intercept(getGenomicsChannel(), interceptor);
public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException {
return new GenomicsChannel(creds);
}

/**
Expand All @@ -73,34 +135,15 @@ public static Channel fromCreds(GoogleCredentials creds) throws IOException {
* @throws IOException
* @throws GeneralSecurityException
*/
public static Channel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
if(auth.hasUserCredentials()) {
return fromCreds(auth.getUserCredentials());
// TODO: https://github.com/googlegenomics/utils-java/issues/51
// } else if(auth.hasApiKey()) {
// return fromApiKey(auth.apiKey);
// return new Channels(auth.apiKey);
}
// Fall back to Default Credentials if the user did not specify user credentials or an api key.
return fromDefaultCreds();
}

private static Channel getGenomicsChannel() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
performantCiphers.add(cipher);
}
}

Channel channel = NettyChannelBuilder.forAddress("genomics.googleapis.com", 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
return channel;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public class ReadStreamIterator extends ForwardingIterator<StreamReadsResponse> {
protected final Predicate<Read> shardPredicate;
private Iterator<StreamReadsResponse> delegate;
protected final Iterator<StreamReadsResponse> delegate;
protected final GenomicsChannel genomicsChannel;

/**
* Create a stream iterator that can enforce shard boundary semantics.
Expand All @@ -56,8 +57,9 @@ public ReadStreamIterator(StreamReadsRequest request, GenomicsFactory.OfflineAut
ShardBoundary.getStrictReadPredicate(request.getStart()) :
null;

genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
StreamingReadServiceGrpc.StreamingReadServiceBlockingStub readStub =
StreamingReadServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth));
StreamingReadServiceGrpc.newBlockingStub(genomicsChannel);

delegate = readStub.streamReads(request);
}
Expand All @@ -67,11 +69,30 @@ protected Iterator<StreamReadsResponse> delegate() {
return delegate;
}

/**
* @see java.util.Iterator#hasNext()
*/
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}
}

/**
* @see java.util.Iterator#next()
*/
public StreamReadsResponse next() {
StreamReadsResponse response = delegate.next();
StreamReadsResponse response = null;
try {
response = delegate.next();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}

if(null == shardPredicate) {
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public class VariantStreamIterator extends ForwardingIterator<StreamVariantsResponse> {
protected final Predicate<Variant> shardPredicate;
protected Iterator<StreamVariantsResponse> delegate;
protected final Iterator<StreamVariantsResponse> delegate;
protected final GenomicsChannel genomicsChannel;

/**
* Create a stream iterator that can enforce shard boundary semantics.
Expand All @@ -56,8 +57,9 @@ public VariantStreamIterator(StreamVariantsRequest request, GenomicsFactory.Offl
ShardBoundary.getStrictVariantPredicate(request.getStart()) :
null;

genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub variantStub =
StreamingVariantServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth));
StreamingVariantServiceGrpc.newBlockingStub(genomicsChannel);


delegate = variantStub.streamVariants(request);
Expand All @@ -68,11 +70,30 @@ protected Iterator<StreamVariantsResponse> delegate() {
return delegate;
}

/**
* @see java.util.Iterator#hasNext()
*/
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}
}

/**
* @see java.util.Iterator#next()
*/
public StreamVariantsResponse next() {
StreamVariantsResponse response = delegate.next();
StreamVariantsResponse response = null;
try {
response = delegate.next();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}

if(null == shardPredicate) {
return response;
}
Expand Down

0 comments on commit fedc4a2

Please sign in to comment.