Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

Facilitate explicit channel shutdown. #58

Merged
merged 2 commits into from
Aug 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions src/main/java/com/google/cloud/genomics/utils/grpc/Example.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.google.cloud.genomics.utils.grpc;

import io.grpc.Channel;

import java.io.FileNotFoundException;
import java.util.Iterator;

Expand All @@ -18,6 +16,7 @@
import com.google.genomics.v1.StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub;

public class Example {

public static void main(String[] args) throws Exception {
final String clientSecretsJson = "client_secrets.json";
GenomicsFactory.OfflineAuth auth = null;
Expand All @@ -35,7 +34,7 @@ public static void main(String[] args) throws Exception {
return;
}

Channel channel = Channels.fromOfflineAuth(auth);
GenomicsChannel channel = GenomicsChannel.fromOfflineAuth(auth);

// Regular RPC example: list all reference set assembly ids.
ReferenceServiceV1BlockingStub refStub =
Expand All @@ -57,14 +56,17 @@ public static void main(String[] args) throws Exception {
.setEnd(41277499)
.build();

Iterator<StreamVariantsResponse> iter = varStub.streamVariants(varRequest);
while (iter.hasNext()) {
StreamVariantsResponse varResponse = iter.next();
System.out.println("Response:");
System.out.println(varResponse.toString());
System.out.println();
try {
Iterator<StreamVariantsResponse> iter = varStub.streamVariants(varRequest);
while (iter.hasNext()) {
StreamVariantsResponse varResponse = iter.next();
System.out.println("Response:");
System.out.println(varResponse.toString());
System.out.println();
}
System.out.println("Done");
} finally {
channel.shutdownNow();
}
System.out.println("Done");
System.exit(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelImpl;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand All @@ -18,47 +22,105 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLException;

/**
* A convenience class for creating gRPC channels to the Google Genomics API.
*/
public class Channels {
public class GenomicsChannel extends Channel {
private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com";
private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics";
// TODO: This constant should come from grpc-java.
private static final String API_KEY_HEADER = "X-Goog-Api-Key";

// NOTE: Unfortunately we need to keep a handle to both of these since Channel does not expose
// the shutdown method and the ClientInterceptors do not return the ChannelImpl instance.
private final ChannelImpl channelImpl;
private final Channel delegate;

private ChannelImpl getGenomicsChannelImpl() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
performantCiphers.add(cipher);
}
}

return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
}

private GenomicsChannel(String apiKey) throws SSLException {
channelImpl = getGenomicsChannelImpl();
Metadata.Headers headers = new Metadata.Headers();
Metadata.Key<String> apiKeyHeaderKey =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeaderKey, apiKey);
delegate = ClientInterceptors.intercept(channelImpl,
MetadataUtils.newAttachHeadersInterceptor(headers));
}

private GenomicsChannel(GoogleCredentials creds) throws SSLException {
channelImpl = getGenomicsChannelImpl();
creds = creds.createScoped(
Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
delegate = ClientInterceptors.intercept(channelImpl, interceptor);
}

@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> arg0, CallOptions arg1) {
return delegate.newCall(arg0, arg1);
}

/**
* @see io.grpc.ChannelImpl#shutdownNow()
*/
public void shutdownNow() {
channelImpl.shutdownNow();
}

/**
* @throws InterruptedException
* @see io.grpc.ChannelImpl#shutdown()
* @see io.grpc.ChannelImpl#awaitTermination(long, TimeUnit)
*/
public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
channelImpl.shutdown().awaitTermination(timeout, unit);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the application
* default credentials for auth.
*/
public static Channel fromDefaultCreds() throws IOException {
public static GenomicsChannel fromDefaultCreds() throws IOException {
return fromCreds(GoogleCredentials.getApplicationDefault());
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* api key for auth.
*/
public static Channel fromApiKey(String apiKey) throws SSLException {
Metadata.Headers headers = new Metadata.Headers();
Metadata.Key<String> apiKeyHeaderKey =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeaderKey, apiKey);
return ClientInterceptors.intercept(getGenomicsChannel(),
MetadataUtils.newAttachHeadersInterceptor(headers));
public static GenomicsChannel fromApiKey(String apiKey) throws SSLException {
return new GenomicsChannel(apiKey);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* credentials for auth.
*/
public static Channel fromCreds(GoogleCredentials creds) throws IOException {
creds = creds.createScoped(
Arrays.asList("https://www.googleapis.com/auth/genomics"));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
return ClientInterceptors.intercept(getGenomicsChannel(), interceptor);
public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException {
return new GenomicsChannel(creds);
}

/**
Expand All @@ -73,34 +135,15 @@ public static Channel fromCreds(GoogleCredentials creds) throws IOException {
* @throws IOException
* @throws GeneralSecurityException
*/
public static Channel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
if(auth.hasUserCredentials()) {
return fromCreds(auth.getUserCredentials());
// TODO: https://github.com/googlegenomics/utils-java/issues/51
// } else if(auth.hasApiKey()) {
// return fromApiKey(auth.apiKey);
// return new Channels(auth.apiKey);
}
// Fall back to Default Credentials if the user did not specify user credentials or an api key.
return fromDefaultCreds();
}

private static Channel getGenomicsChannel() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
performantCiphers.add(cipher);
}
}

Channel channel = NettyChannelBuilder.forAddress("genomics.googleapis.com", 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
return channel;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public class ReadStreamIterator extends ForwardingIterator<StreamReadsResponse> {
protected final Predicate<Read> shardPredicate;
private Iterator<StreamReadsResponse> delegate;
protected final Iterator<StreamReadsResponse> delegate;
protected final GenomicsChannel genomicsChannel;

/**
* Create a stream iterator that can enforce shard boundary semantics.
Expand All @@ -56,8 +57,9 @@ public ReadStreamIterator(StreamReadsRequest request, GenomicsFactory.OfflineAut
ShardBoundary.getStrictReadPredicate(request.getStart()) :
null;

genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
StreamingReadServiceGrpc.StreamingReadServiceBlockingStub readStub =
StreamingReadServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth));
StreamingReadServiceGrpc.newBlockingStub(genomicsChannel);

delegate = readStub.streamReads(request);
}
Expand All @@ -67,11 +69,30 @@ protected Iterator<StreamReadsResponse> delegate() {
return delegate;
}

/**
* @see java.util.Iterator#hasNext()
*/
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}
}

/**
* @see java.util.Iterator#next()
*/
public StreamReadsResponse next() {
StreamReadsResponse response = delegate.next();
StreamReadsResponse response = null;
try {
response = delegate.next();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}

if(null == shardPredicate) {
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public class VariantStreamIterator extends ForwardingIterator<StreamVariantsResponse> {
protected final Predicate<Variant> shardPredicate;
protected Iterator<StreamVariantsResponse> delegate;
protected final Iterator<StreamVariantsResponse> delegate;
protected final GenomicsChannel genomicsChannel;

/**
* Create a stream iterator that can enforce shard boundary semantics.
Expand All @@ -56,8 +57,9 @@ public VariantStreamIterator(StreamVariantsRequest request, GenomicsFactory.Offl
ShardBoundary.getStrictVariantPredicate(request.getStart()) :
null;

genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
StreamingVariantServiceGrpc.StreamingVariantServiceBlockingStub variantStub =
StreamingVariantServiceGrpc.newBlockingStub(Channels.fromOfflineAuth(auth));
StreamingVariantServiceGrpc.newBlockingStub(genomicsChannel);


delegate = variantStub.streamVariants(request);
Expand All @@ -68,11 +70,30 @@ protected Iterator<StreamVariantsResponse> delegate() {
return delegate;
}

/**
* @see java.util.Iterator#hasNext()
*/
public boolean hasNext() {
try {
return delegate.hasNext();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}
}

/**
* @see java.util.Iterator#next()
*/
public StreamVariantsResponse next() {
StreamVariantsResponse response = delegate.next();
StreamVariantsResponse response = null;
try {
response = delegate.next();
} catch (RuntimeException e) {
genomicsChannel.shutdownNow();
throw e;
}

if(null == shardPredicate) {
return response;
}
Expand Down