Skip to content
Permalink
Browse files
feat: Add experimental DirectPath support (#396)
* feat: Add experimental DirectPath support
  • Loading branch information
WeiranFang committed Sep 9, 2020
1 parent 45d8419 commit 46264d11529accde7b520638264732937b2feb03
@@ -15,8 +15,10 @@
</parent>
<properties>
<site.installationModule>google-cloud-spanner</site.installationModule>
<skipUTs>false</skipUTs>
</properties>


<build>
<plugins>
<plugin>
@@ -49,6 +51,7 @@
<id>default-test</id>
<configuration>
<excludedGroups>com.google.cloud.spanner.TracerTest,com.google.cloud.spanner.IntegrationTest</excludedGroups>
<skipTests>${skipUTs}</skipTests>
</configuration>
</execution>
<execution>
@@ -360,5 +363,26 @@
</plugins>
</build>
</profile>
<profile>
<id>spanner-directpath-it</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<spanner.testenv.config.class>com.google.cloud.spanner.GceTestEnvConfig</spanner.testenv.config.class>
<spanner.testenv.instance>projects/directpath-prod-manual-testing/instances/spanner-testing</spanner.testenv.instance>
<spanner.gce.config.project_id>directpath-prod-manual-testing</spanner.gce.config.project_id>
<spanner.attempt_directpath>true</spanner.attempt_directpath>
<spanner.directpath_test_scenario>ipv4</spanner.directpath_test_scenario>
</systemPropertyVariables>
<forkedProcessTimeoutInSeconds>3000</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
@@ -230,6 +230,9 @@ private void awaitTermination() throws InterruptedException {
private static final int DEFAULT_PERIOD_SECONDS = 10;
private static final int GRPC_KEEPALIVE_SECONDS = 2 * 60;

// TODO(weiranf): Remove this temporary endpoint once DirectPath goes to public beta.
private static final String DIRECT_PATH_ENDPOINT = "aa423245250f2bbf.sandbox.googleapis.com:443";

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
@@ -307,31 +310,37 @@ public GapicSpannerRpc(final SpannerOptions options) {
.build());
// First check if SpannerOptions provides a TransportChannerProvider. Create one
// with information gathered from SpannerOptions if none is provided
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(options.getChannelConfigurator())
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
.setExecutor(executorProvider.getExecutor())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))

// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
.withEncoding(compressorName))
.setHeaderProvider(mergedHeaderProvider);

// TODO(weiranf): Set to true by default once DirectPath goes to public beta.
if (shouldAttemptDirectPath()) {
defaultChannelProviderBuilder.setEndpoint(DIRECT_PATH_ENDPOINT).setAttemptDirectPath(true);
}

TransportChannelProvider channelProvider =
MoreObjects.firstNonNull(
options.getChannelProvider(),
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(options.getChannelConfigurator())
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
.setExecutor(executorProvider.getExecutor())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))

// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
.withEncoding(compressorName))
.setHeaderProvider(mergedHeaderProvider)
.build());
options.getChannelProvider(), defaultChannelProviderBuilder.build());

CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);
@@ -422,6 +431,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
}
}

// TODO(weiranf): Remove this once DirectPath goes to public beta.
private static boolean shouldAttemptDirectPath() {
return Boolean.getBoolean("spanner.attempt_directpath");
}

private static void checkEmulatorConnection(
SpannerOptions options,
TransportChannelProvider channelProvider,
@@ -26,11 +26,15 @@
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

@@ -41,6 +45,12 @@ public class GceTestEnvConfig implements TestEnvConfig {
public static final String GCE_CREDENTIALS_FILE = "spanner.gce.config.credentials_file";
public static final String GCE_STREAM_BROKEN_PROBABILITY =
"spanner.gce.config.stream_broken_probability";
public static final String ATTEMPT_DIRECT_PATH = "spanner.attempt_directpath";
public static final String DIRECT_PATH_TEST_SCENARIO = "spanner.directpath_test_scenario";

// IP address prefixes allocated for DirectPath backends.
public static final String DP_IPV6_PREFIX = "2001:4860:8040";
public static final String DP_IPV4_PREFIX = "34.126";

private final SpannerOptions options;

@@ -51,6 +61,8 @@ public GceTestEnvConfig() {
double errorProbability =
Double.parseDouble(System.getProperty(GCE_STREAM_BROKEN_PROBABILITY, "0.0"));
checkState(errorProbability <= 1.0);
boolean attemptDirectPath = Boolean.getBoolean(ATTEMPT_DIRECT_PATH);
String directPathTestScenario = System.getProperty(DIRECT_PATH_TEST_SCENARIO, "");
SpannerOptions.Builder builder =
SpannerOptions.newBuilder().setAutoThrottleAdministrativeRequests();
if (!projectId.isEmpty()) {
@@ -66,12 +78,14 @@ public GceTestEnvConfig() {
throw new RuntimeException(e);
}
}
options =
builder
.setInterceptorProvider(
SpannerInterceptorProvider.createDefault()
.with(new GrpcErrorInjector(errorProbability)))
.build();
SpannerInterceptorProvider interceptorProvider =
SpannerInterceptorProvider.createDefault().with(new GrpcErrorInjector(errorProbability));
if (attemptDirectPath) {
interceptorProvider =
interceptorProvider.with(new DirectPathAddressCheckInterceptor(directPathTestScenario));
}
builder.setInterceptorProvider(interceptorProvider);
options = builder.build();
}

@Override
@@ -87,6 +101,7 @@ public void tearDown() {}

/** Injects errors in streaming calls to simulate call restarts */
private static class GrpcErrorInjector implements ClientInterceptor {

private final double errorProbability;
private final Random random = new Random();

@@ -140,4 +155,64 @@ private boolean mayInjectError() {
return random.nextDouble() < errorProbability;
}
}

/**
* Captures the request attributes "Grpc.TRANSPORT_ATTR_REMOTE_ADDR" when connection is
* established and verifies if the remote address is a DirectPath address. This is only used for
* DirectPath testing. {@link ClientCall#getAttributes()}
*/
private static class DirectPathAddressCheckInterceptor implements ClientInterceptor {
private final String directPathTestScenario;

DirectPathAddressCheckInterceptor(String directPathTestScenario) {
this.directPathTestScenario = directPathTestScenario;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final ClientCall<ReqT, RespT> clientCall = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
// Check peer IP after connection is established.
SocketAddress remoteAddr =
clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (!verifyRemoteAddress(remoteAddr)) {
throw new RuntimeException(
String.format(
"Synthetically aborting the current request because it did not adhere"
+ " to the test environment's requirement for DirectPath."
+ " Expected test for DirectPath %s scenario,"
+ " but RPC was destined for %s",
directPathTestScenario, remoteAddr.toString()));
}
super.onHeaders(headers);
}
},
headers);
}
};
}

private boolean verifyRemoteAddress(SocketAddress remoteAddr) {
if (remoteAddr instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) remoteAddr).getAddress();
String addr = inetAddress.getHostAddress();
if (directPathTestScenario.equals("ipv4")) {
// For ipv4-only VM, client should connect to ipv4 DirectPath addresses.
return addr.startsWith(DP_IPV4_PREFIX);
} else if (directPathTestScenario.equals("ipv6")) {
// For ipv6-enabled VM, client could connect to either ipv4 or ipv6 DirectPath addresses.
return addr.startsWith(DP_IPV6_PREFIX) || addr.startsWith(DP_IPV4_PREFIX);
}
}
// For all other scenarios(e.g. fallback), we should allow both DirectPath and CFE addresses.
return true;
}
}
}

0 comments on commit 46264d1

Please sign in to comment.