Skip to content

Commit

Permalink
core, netty: server builders extend a public API class
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk committed Sep 2, 2020
1 parent c76b2bc commit 5f23337
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 113 deletions.
41 changes: 33 additions & 8 deletions core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
import com.google.common.base.Preconditions;
import io.grpc.Deadline;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal;
import io.grpc.ServerBuilder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerImplBuilder;
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
import io.grpc.internal.SharedResourcePool;
import java.io.File;
import java.util.Collections;
Expand Down Expand Up @@ -67,8 +72,7 @@
* </pre>
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1783")
public final class InProcessServerBuilder
extends AbstractServerImplBuilder<InProcessServerBuilder> {
public final class InProcessServerBuilder extends ForwardingServerBuilder<InProcessServerBuilder> {
/**
* Create a server builder that will bind with the given name.
*
Expand All @@ -93,22 +97,40 @@ public static String generateName() {
return UUID.randomUUID().toString();
}

private final ServerImplBuilder serverImplBuilder;
final String name;
int maxInboundMetadataSize = Integer.MAX_VALUE;
ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);

private InProcessServerBuilder(String name) {
this.name = Preconditions.checkNotNull(name, "name");

final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
public List<? extends InternalServer> buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return buildTransportServers(streamTracerFactories);
}
}

serverImplBuilder = new ServerImplBuilder(new InProcessClientTransportServersBuilder());

// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
setStatsRecordStartedRpcs(false);
setStatsRecordFinishedRpcs(false);
serverImplBuilder.setStatsRecordStartedRpcs(false);
serverImplBuilder.setStatsRecordFinishedRpcs(false);
// Disable handshake timeout because it is unnecessary, and can trigger Thread creation that can
// break some environments (like tests).
handshakeTimeout(Long.MAX_VALUE, TimeUnit.SECONDS);
}

@Internal
@Override
protected ServerBuilder<?> delegate() {
return serverImplBuilder;
}

/**
* Provides a custom scheduled executor service.
*
Expand Down Expand Up @@ -140,7 +162,7 @@ public InProcessServerBuilder scheduledExecutorService(
* @since 1.24.0
*/
public InProcessServerBuilder deadlineTicker(Deadline.Ticker ticker) {
setDeadlineTicker(ticker);
serverImplBuilder.setDeadlineTicker(ticker);
return this;
}

Expand All @@ -164,8 +186,7 @@ public InProcessServerBuilder maxInboundMetadataSize(int bytes) {
return this;
}

@Override
protected List<InProcessServer> buildTransportServers(
List<InProcessServer> buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return Collections.singletonList(new InProcessServer(this, streamTracerFactories));
}
Expand All @@ -174,4 +195,8 @@ protected List<InProcessServer> buildTransportServers(
public InProcessServerBuilder useTransportSecurity(File certChain, File privateKey) {
throw new UnsupportedOperationException("TLS not supported in InProcessServer");
}

void setStatsEnabled(boolean value) {
this.serverImplBuilder.setStatsEnabled(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.inprocess;

import io.grpc.Internal;

/**
* Internal {@link InProcessServerBuilder} accessor. This is intended for usage internal to
* the gRPC team. If you *really* think you need to use this, contact the gRPC team first.
*/
@Internal
public class InternalInProcessServerBuilder {
public static void setStatsEnabled(InProcessServerBuilder builder, boolean value) {
builder.setStatsEnabled(value);
}

private InternalInProcessServerBuilder() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import io.grpc.auth.MoreCallCredentials;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.StatsTestUtils;
import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
Expand Down Expand Up @@ -171,7 +170,6 @@ public abstract class AbstractInteropTest {

private ScheduledExecutorService testServiceExecutor;
private Server server;
private boolean customCensusModulePresent;

private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -245,21 +243,7 @@ private void startServer() {
new TestServiceImpl(testServiceExecutor),
allInterceptors))
.addStreamTracerFactory(serverStreamTracerFactory);
if (builder instanceof AbstractServerImplBuilder) {
customCensusModulePresent = true;
ServerStreamTracer.Factory censusTracerFactory =
InternalCensusStatsAccessor
.getServerStreamTracerFactory(
tagger, tagContextBinarySerializer, serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true, false /* real-time metrics */);
AbstractServerImplBuilder<?> sb = (AbstractServerImplBuilder<?>) builder;
io.grpc.internal.TestingAccessor.setStatsEnabled(sb, false);
sb.addStreamTracerFactory(censusTracerFactory);
}
if (metricsExpected()) {
assertThat(builder).isInstanceOf(AbstractServerImplBuilder.class);
}

try {
server = builder.build().start();
} catch (IOException ex) {
Expand Down Expand Up @@ -373,6 +357,20 @@ protected final ClientInterceptor createCensusStatsClientInterceptor() {
true, true, true, false /* real-time metrics */);
}

protected final ServerStreamTracer.Factory createCustomCensusTracerFactory() {
return InternalCensusStatsAccessor.getServerStreamTracerFactory(
tagger, tagContextBinarySerializer, serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true, false /* real-time metrics */);
}

/**
* Return {@code true} when custom census module used.
*/
protected boolean customCensusModulePresent() {
return false;
}

/**
* Return true if exact metric values should be checked.
*/
Expand Down Expand Up @@ -1510,7 +1508,7 @@ public void customMetadata() throws Exception {
@Test(timeout = 10000)
public void censusContextsPropagated() {
Assume.assumeTrue("Skip the test because server is not in the same process.", server != null);
Assume.assumeTrue(customCensusModulePresent);
Assume.assumeTrue(customCensusModulePresent());
Span clientParentSpan = Tracing.getTracer().spanBuilder("Test.interopTest").startSpan();
// A valid ID is guaranteed to be unique, so we can verify it is actually propagated.
assertTrue(clientParentSpan.getContext().getTraceId().isValid());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.testing.integration;

import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyServerBuilder;

public abstract class AbstractNettyInteropTest extends AbstractInteropTest {
protected NettyServerBuilder withCustomCensusModule(NettyServerBuilder builder) {
InternalNettyServerBuilder.setStatsEnabled(builder, false);
builder.addStreamTracerFactory(createCustomCensusTracerFactory());
return builder;
}

@Override
protected boolean customCensusModulePresent() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.grpc.testing.integration;

import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -25,12 +25,12 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AutoWindowSizingOnTest extends AbstractInteropTest {
public class AutoWindowSizingOnTest extends AbstractNettyInteropTest {

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
protected ServerBuilder<?> getServerBuilder() {
return withCustomCensusModule(
NettyServerBuilder.forPort(0).maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.grpc.testing.integration;

import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -33,19 +33,20 @@
* Run transport tests over the Netty in-process channel.
*/
@RunWith(JUnit4.class)
public class Http2NettyLocalChannelTest extends AbstractInteropTest {
public class Http2NettyLocalChannelTest extends AbstractNettyInteropTest {

private DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.channelType(LocalServerChannel.class)
.workerEventLoopGroup(eventLoopGroup)
.bossEventLoopGroup(eventLoopGroup);
protected ServerBuilder<?> getServerBuilder() {
return withCustomCensusModule(
NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.channelType(LocalServerChannel.class)
.workerEventLoopGroup(eventLoopGroup)
.bossEventLoopGroup(eventLoopGroup));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.InternalNettyChannelBuilder;
Expand All @@ -38,21 +38,22 @@
* Integration tests for GRPC over HTTP2 using the Netty framework.
*/
@RunWith(JUnit4.class)
public class Http2NettyTest extends AbstractInteropTest {
public class Http2NettyTest extends AbstractNettyInteropTest {

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ServerBuilder<?> getServerBuilder() {
// Starts the server with HTTPS.
try {
return NettyServerBuilder.forPort(0)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.sslContext(GrpcSslContexts
.forServer(TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key"))
.clientAuth(ClientAuth.REQUIRE)
.trustManager(TestUtils.loadCert("ca.pem"))
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
.build());
return withCustomCensusModule(
NettyServerBuilder.forPort(0)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.sslContext(GrpcSslContexts
.forServer(TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key"))
.clientAuth(ClientAuth.REQUIRE)
.trustManager(TestUtils.loadCert("ca.pem"))
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
.build()));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.common.base.Throwables;
import com.squareup.okhttp.ConnectionSpec;
import io.grpc.ManagedChannel;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.internal.testing.TestUtils;
Expand Down Expand Up @@ -52,7 +52,7 @@
* Integration tests for GRPC over Http2 using the OkHttp framework.
*/
@RunWith(JUnit4.class)
public class Http2OkHttpTest extends AbstractInteropTest {
public class Http2OkHttpTest extends AbstractNettyInteropTest {

private static final String BAD_HOSTNAME = "I.am.a.bad.hostname";

Expand All @@ -64,7 +64,7 @@ public static void loadConscrypt() throws Exception {
}

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ServerBuilder<?> getServerBuilder() {
// Starts the server with HTTPS.
try {
SslProvider sslProvider = SslContext.defaultServerProvider();
Expand All @@ -77,10 +77,11 @@ protected AbstractServerImplBuilder<?> getServerBuilder() {
.forServer(TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key"));
GrpcSslContexts.configure(contextBuilder, sslProvider);
contextBuilder.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE);
return NettyServerBuilder.forPort(0)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.sslContext(contextBuilder.build());
return withCustomCensusModule(
NettyServerBuilder.forPort(0)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.sslContext(contextBuilder.build()));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
Expand Down
Loading

0 comments on commit 5f23337

Please sign in to comment.