Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

Commit

Permalink
Merge 451a3a2 into e9edd09
Browse files Browse the repository at this point in the history
  • Loading branch information
deflaux committed Nov 9, 2015
2 parents e9edd09 + 451a3a2 commit 7d98217
Show file tree
Hide file tree
Showing 13 changed files with 941 additions and 119 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@
<artifactId>grpc-all</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-interop-testing</artifactId>
<version>0.9.0</version>
<scope>test</scope>
</dependency>

<!-- Used to test the custom builder logic -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.genomics.utils.grpc;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.auth.ClientAuthInterceptor;
Expand All @@ -26,10 +36,13 @@

import javax.net.ssl.SSLException;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

/**
* A convenience class for creating gRPC channels to the Google Genomics API.
*/
public class GenomicsChannel extends Channel {
public class GenomicsChannel extends ManagedChannel {
private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com";
private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics";
private static final String API_KEY_HEADER = "X-Goog-Api-Key";
Expand All @@ -40,12 +53,11 @@ public class GenomicsChannel extends Channel {
// the shutdown method and the ClientInterceptors do not return the ManagedChannel instance.
private final ManagedChannel managedChannel;
private final Channel delegate;

private ManagedChannel getGenomicsManagedChannel() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> defaultCiphers = GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
Expand All @@ -55,75 +67,96 @@ private ManagedChannel getGenomicsManagedChannel() throws SSLException {

return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build()).build();
}

private GenomicsChannel(String apiKey) throws SSLException {
managedChannel = getGenomicsManagedChannel();
Metadata headers = new Metadata();
Metadata.Key<String> apiKeyHeader =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeader, apiKey);
delegate = ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(headers));
delegate =
ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(headers));
}

private GenomicsChannel(GoogleCredentials creds) throws SSLException {
managedChannel = getGenomicsManagedChannel();
creds = creds.createScoped(
Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
creds = creds.createScoped(Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor =
new ClientAuthInterceptor(creds, Executors.newSingleThreadExecutor());
delegate = ClientInterceptors.intercept(managedChannel, interceptor);
}

@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> arg0, CallOptions arg1) {
return delegate.newCall(arg0, arg1);
}

/**
* @see io.grpc.ManagedChannel#shutdownNow()
*/
public void shutdownNow() {
managedChannel.shutdownNow();
}

/**
* @throws InterruptedException
* @throws InterruptedException
* @see io.grpc.ManagedChannel#shutdown()
* @see io.grpc.ManagedChannel#awaitTermination(long, TimeUnit)
*/
public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
managedChannel.shutdown().awaitTermination(timeout, unit);
}


@Override
public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
return managedChannel.awaitTermination(time, unit);
}

@Override
public boolean isShutdown() {
return managedChannel.isShutdown();
}

@Override
public boolean isTerminated() {
return managedChannel.isTerminated();
}

@Override
public ManagedChannel shutdown() {
return managedChannel.shutdown();
}

@Override
public ManagedChannel shutdownNow() {
return managedChannel.shutdownNow();
}

@Override
public String authority() {
return managedChannel.authority();
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the application
* default credentials for auth.
* Creates a new gRPC channel to the Google Genomics API, using the application default
* credentials for auth.
*/
public static GenomicsChannel fromDefaultCreds() throws IOException {
return fromCreds(GoogleCredentials.getApplicationDefault());
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* api key for auth.
* Creates a new gRPC channel to the Google Genomics API, using the provided api key for auth.
*/
public static GenomicsChannel fromApiKey(String apiKey) throws SSLException {
return new GenomicsChannel(apiKey);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* credentials for auth.
* Creates a new gRPC channel to the Google Genomics API, using the provided credentials for auth.
*/
public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException {
return new GenomicsChannel(creds);
}

/**
* Initialize auth for a gRPC channel from OfflineAuth or the application default credentials.
*
Expand All @@ -132,23 +165,19 @@ public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOExcept
* https://developers.google.com/identity/protocols/application-default-credentials
*
* @param auth An OfflineAuth object.
* @return The gRPC channel authorized using either the information in the OfflineAuth or application default credentials.
* @return The gRPC channel authorized using either the information in the OfflineAuth or
* application default credentials.
* @throws IOException
* @throws GeneralSecurityException
*/
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
if(auth.hasUserCredentials()) {
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth)
throws IOException, GeneralSecurityException {
if (auth.hasUserCredentials()) {
return fromCreds(auth.getUserCredentials());
} else if(auth.hasApiKey()) {
} else if (auth.hasApiKey()) {
return fromApiKey(auth.apiKey);
}
// Fall back to Default Credentials if the user did not specify user credentials or an api key.
return fromDefaultCreds();
}

@Override
public String authority() {
return delegate.authority();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package com.google.cloud.genomics.utils.grpc;

import io.grpc.ManagedChannel;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
Expand All @@ -33,7 +33,7 @@
* Includes complex retry logic to upon failure resume the stream at the last known good start
* position without returning duplicate data.
*
* TODO: refactor this further to simplify the generic signature.
* TODO: refactor this further to simplify the generic signature.
*
* @param <Request> Streaming request type.
* @param <Response> Streaming response type.
Expand All @@ -43,12 +43,14 @@
public abstract class GenomicsStreamIterator<Request, Response, Item, Stub extends io.grpc.stub.AbstractStub<Stub>>
implements Iterator<Response> {
private static final Logger LOG = Logger.getLogger(GenomicsStreamIterator.class.getName());
private final ExponentialBackOff backoff;
private final GenomicsChannel genomicsChannel;
private final Predicate<Item> shardPredicate;

protected final ManagedChannel genomicsChannel;
protected final Predicate<Item> shardPredicate;
protected final Stub stub;
protected final Request originalRequest;

protected ExponentialBackOff backoff;

// Stateful members used to facilitate complex retry behavior for gRPC streams.
private Iterator<Response> delegate;
private Item lastSuccessfulDataItem;
Expand All @@ -57,20 +59,19 @@ public abstract class GenomicsStreamIterator<Request, Response, Item, Stub exten
/**
* Create a stream iterator that will filter shard data using the predicate, if supplied.
*
* @param channel The channel.
* @param request The request for the shard of data.
* @param auth The OfflineAuth to use for the request.
* @param fields Which fields to include in a partial response or null for all. NOT YET
* IMPLEMENTED.
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce
* a shard boundary and/or limit to SNPs only) or null for no filtering.
* @throws IOException
* @throws GeneralSecurityException
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce a
* shard boundary and/or limit to SNPs only) or null for no filtering.
*/
public GenomicsStreamIterator(Request request, GenomicsFactory.OfflineAuth auth, String fields,
Predicate<Item> shardPredicate) throws IOException, GeneralSecurityException {

protected GenomicsStreamIterator(ManagedChannel channel, Request request, String fields,
Predicate<Item> shardPredicate) {
this.originalRequest = request;
this.shardPredicate = shardPredicate;
genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
this.genomicsChannel = channel;
stub = createStub(genomicsChannel);

// Using default backoff settings. For details, see
Expand All @@ -83,7 +84,7 @@ public GenomicsStreamIterator(Request request, GenomicsFactory.OfflineAuth auth,
idSentinel = null;
}

abstract Stub createStub(GenomicsChannel genomicsChannel);
abstract Stub createStub(ManagedChannel genomicsChannel);

abstract Iterator<Response> createIteratorFromStub(Request request);

Expand Down Expand Up @@ -168,6 +169,7 @@ private void setStreamStateForRetry() {
// We have never returned any data. No need to set up state needed to filter previously
// returned results.
delegate = createIterator(originalRequest);
return;
}

if (getRequestStart(originalRequest) < getDataItemStart(lastSuccessfulDataItem)) {
Expand Down

0 comments on commit 7d98217

Please sign in to comment.