Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions alts/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ java_library(
"@io_netty_netty_buffer//jar",
"@io_netty_netty_codec//jar",
"@io_netty_netty_common//jar",
"@io_netty_netty_handler//jar",
"@io_netty_netty_transport//jar",
],
)
Expand All @@ -35,10 +36,13 @@ java_library(
":handshaker_java_grpc",
"//core",
"//core:internal",
"//auth",
"//netty",
"@com_google_auth_google_auth_library_oauth2_http//jar",
"@com_google_code_findbugs_jsr305//jar",
"@com_google_guava_guava//jar",
"@io_netty_netty_common//jar",
"@io_netty_netty_handler//jar",
"@io_netty_netty_transport//jar",
"@org_apache_commons_commons_lang3//jar",
],
Expand Down
11 changes: 10 additions & 1 deletion alts/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@ buildscript {
}

dependencies {
compile project(':grpc-core'),
compile project(':grpc-auth'),
project(':grpc-core'),
project(':grpc-netty'),
project(':grpc-protobuf'),
project(':grpc-stub'),
libraries.lang,
libraries.protobuf
compile (libraries.google_auth_oauth2_http) {
// prefer 3.0.0 from libraries instead of 1.3.9
exclude group: 'com.google.code.findbugs', module: 'jsr305'
// prefer 20.0 from libraries instead of 19.0
exclude group: 'com.google.guava', module: 'guava'
}
runtime project(':grpc-grpclb')
testCompile libraries.guava,
libraries.guava_testlib,
libraries.junit,
libraries.mockito,
libraries.truth
testRuntime libraries.netty_tcnative,
libraries.conscrypt
signature 'org.codehaus.mojo.signature:java17:1.0@signature'
}

Expand Down
218 changes: 218 additions & 0 deletions alts/src/main/java/io/grpc/alts/GoogleDefaultChannelBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.alts;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingChannelBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.alts.internal.AltsClientOptions;
import io.grpc.alts.internal.AltsTsiHandshaker;
import io.grpc.alts.internal.GoogleDefaultProtocolNegotiator;
import io.grpc.alts.internal.HandshakerServiceGrpc;
import io.grpc.alts.internal.RpcProtocolVersionsUtil;
import io.grpc.alts.internal.TsiHandshaker;
import io.grpc.alts.internal.TsiHandshakerFactory;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ProxyParameters;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilter;
import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilterFactory;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;

/**
* Google default version of {@code ManagedChannelBuilder}. This class sets up a secure channel
* using ALTS if applicable and using TLS as fallback.
*/
public final class GoogleDefaultChannelBuilder
extends ForwardingChannelBuilder<GoogleDefaultChannelBuilder> {

private final NettyChannelBuilder delegate;
private final TcpfFactory tcpfFactory = new TcpfFactory();

private GoogleDefaultChannelBuilder(String target) {
delegate = NettyChannelBuilder.forTarget(target);
InternalNettyChannelBuilder.setDynamicTransportParamsFactory(delegate(), tcpfFactory);
}

/** "Overrides" the static method in {@link ManagedChannelBuilder}. */
public static final GoogleDefaultChannelBuilder forTarget(String target) {
return new GoogleDefaultChannelBuilder(target);
}

/** "Overrides" the static method in {@link ManagedChannelBuilder}. */
public static GoogleDefaultChannelBuilder forAddress(String name, int port) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this version necessary/helpful? It seems just a target string would be more than enough. People wouldn't need to pass in ipv6 addresses here, nor would it even be common to specify a port.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let remove this forAddress()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, if I remove this forAddress(), tests will fail on https://github.com/grpc/grpc-java/blob/master/interop-testing/src/test/java/io/grpc/ChannelAndServerBuilderTest.java#L91, because GoogleDefaultChannelBuilder extends ForwardingChannelBuilder and it supposes to define forAddress().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's right, ManagedChannelBuilder.forAddress exists. Yeah, we will want this function to exist here.

return forTarget(GrpcUtil.authorityFromHostAndPort(name, port));
}

@Override
protected NettyChannelBuilder delegate() {
return delegate;
}

@Override
public ManagedChannel build() {
@Nullable CallCredentials credentials = null;
Status status = Status.OK;
try {
credentials = MoreCallCredentials.from(GoogleCredentials.getApplicationDefault());
} catch (IOException e) {
status =
Status.FAILED_PRECONDITION
.withDescription("Failed to get Google default credentials")
.withCause(e);
}
return delegate().intercept(new GoogleDefaultInterceptor(credentials, status)).build();
}

@VisibleForTesting
TransportCreationParamsFilterFactory getTcpfFactoryForTest() {
return tcpfFactory;
}

private static final class TcpfFactory implements TransportCreationParamsFilterFactory {

private final SslContext sslContext;
private final AltsClientOptions handshakerOptions =
new AltsClientOptions.Builder()
.setRpcProtocolVersions(RpcProtocolVersionsUtil.getRpcProtocolVersions())
.build();

private final TsiHandshakerFactory altsHandshakerFactory =
new TsiHandshakerFactory() {
@Override
public TsiHandshaker newHandshaker() {
// Used the shared grpc channel to connecting to the ALTS handshaker service.
ManagedChannel channel = HandshakerServiceChannel.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be referenced counted. We can't have resources/threads live forever. That'll cause memory leaks in environments with custom ClassLoaders. You can create a SharedResourceHolder.Resource for it.

Many environments are quite temperamental when it comes to creating new threads, so it looks like we'll also want it to be lazily initialized, the first time ALTS is used in this channel.

Copy link
Contributor Author

@jiangtaoli2016 jiangtaoli2016 Aug 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created #4755. Let fix this quarter in a separate PR. It would be fairly easy to unref the channel if nobody is using it. I like to keep the thread open until application closes. Otherwise keep creating/closing thread could also be expensive. Is there concept of fiber (i.e., lightweight thread) in Java?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like to keep the thread open until application closes.

No. The threads are actually the primary problem.

Otherwise keep creating/closing thread could also be expensive.

This is what our SharedResourceHolder deals with, to a degree. It waits a second before actually freeing, in case there will be another use soon. In general though, we would expect users to have a long-lived Channel, so there shouldn't be rampant thread creation.

Is there concept of fiber (i.e., lightweight thread) in Java?

Nope. I wish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a look at SharedResourceHolder. I may schedule a mtg with you and @carl-mastrangelo. I remember old time ago, @carl-mastrangelo and I thought about SharedResourceHolder, but we did not use it. I don't remember the exact reason.

return AltsTsiHandshaker.newClient(
HandshakerServiceGrpc.newStub(channel), handshakerOptions);
}
};

private TcpfFactory() {
try {
sslContext = GrpcSslContexts.forClient().build();
} catch (SSLException ex) {
throw new RuntimeException(ex);
}
}

@Override
public TransportCreationParamsFilter create(
final SocketAddress serverAddress,
final String authority,
final String userAgent,
final ProxyParameters proxy) {
checkArgument(
serverAddress instanceof InetSocketAddress,
"%s must be a InetSocketAddress",
serverAddress);
final GoogleDefaultProtocolNegotiator negotiator =
new GoogleDefaultProtocolNegotiator(altsHandshakerFactory, sslContext, authority);
return new TransportCreationParamsFilter() {
@Override
public SocketAddress getTargetServerAddress() {
return serverAddress;
}

@Override
public String getAuthority() {
return authority;
}

@Override
public String getUserAgent() {
return userAgent;
}

@Override
public GoogleDefaultProtocolNegotiator getProtocolNegotiator() {
return negotiator;
}
};
}
}

/**
* An implementation of {@link ClientInterceptor} that adds Google call credentials on each call.
*/
static final class GoogleDefaultInterceptor implements ClientInterceptor {

@Nullable private final CallCredentials credentials;
private final Status status;

public GoogleDefaultInterceptor(@Nullable CallCredentials credentials, Status status) {
this.credentials = credentials;
this.status = status;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (!status.isOk()) {
return new FailingClientCall<>(status);
}
return next.newCall(method, callOptions.withCallCredentials(credentials));
}
}

/** An implementation of {@link ClientCall} that fails when started. */
static final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {

private final Status error;

public FailingClientCall(Status error) {
this.error = error;
}

@Override
public void start(ClientCall.Listener<RespT> listener, Metadata headers) {
listener.onClose(error, new Metadata());
}

@Override
public void request(int numMessages) {}

@Override
public void cancel(String message, Throwable cause) {}

@Override
public void halfClose() {}

@Override
public void sendMessage(ReqT message) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* the handshaker service is local and is over plaintext. Each application will have at most one
* connection to the handshaker service.
*
* <p>TODO: Release the channel if it is not used.
* <p>TODO: Release the channel if it is not used. https://github.com/grpc/grpc-java/issues/4755.
*/
final class HandshakerServiceChannel {
// Default handshaker service address.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.alts.internal;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.GrpcAttributes;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.ProtocolNegotiator;
import io.grpc.netty.ProtocolNegotiators;
import io.netty.handler.ssl.SslContext;

/** A client-side GPRC {@link ProtocolNegotiator} for Google Default Channel. */
public final class GoogleDefaultProtocolNegotiator implements ProtocolNegotiator {
private final ProtocolNegotiator altsProtocolNegotiator;
private final ProtocolNegotiator tlsProtocolNegotiator;

public GoogleDefaultProtocolNegotiator(
TsiHandshakerFactory altsFactory, SslContext sslContext, String authority) {
altsProtocolNegotiator = AltsProtocolNegotiator.create(altsFactory);
tlsProtocolNegotiator = ProtocolNegotiators.tls(sslContext, authority);
}

@VisibleForTesting
GoogleDefaultProtocolNegotiator(
ProtocolNegotiator altsProtocolNegotiator, ProtocolNegotiator tlsProtocolNegotiator) {
this.altsProtocolNegotiator = altsProtocolNegotiator;
this.tlsProtocolNegotiator = tlsProtocolNegotiator;
}

@Override
public Handler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
if (grpcHandler.getEagAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null
|| grpcHandler.getEagAttributes().get(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND) != null) {
return altsProtocolNegotiator.newHandler(grpcHandler);
} else {
return tlsProtocolNegotiator.newHandler(grpcHandler);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.alts;

import static com.google.common.truth.Truth.assertThat;

import io.grpc.alts.internal.GoogleDefaultProtocolNegotiator;
import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilterFactory;
import io.grpc.netty.ProtocolNegotiator;
import java.net.InetSocketAddress;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public final class GoogleDefaultChannelBuilderTest {

@Test
public void buildsNettyChannel() throws Exception {
GoogleDefaultChannelBuilder builder = GoogleDefaultChannelBuilder.forTarget("localhost:8080");

TransportCreationParamsFilterFactory tcpfFactory = builder.getTcpfFactoryForTest();
assertThat(tcpfFactory).isNotNull();
ProtocolNegotiator protocolNegotiator =
tcpfFactory
.create(new InetSocketAddress(8080), "fakeAuthority", "fakeUserAgent", null)
.getProtocolNegotiator();
assertThat(protocolNegotiator).isInstanceOf(GoogleDefaultProtocolNegotiator.class);
}
}
Loading