Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
Expand All @@ -42,7 +43,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -56,6 +59,7 @@ class NettyServer implements InternalServer, WithLogId {
private final LogId logId = LogId.allocate(getClass().getName());
private final SocketAddress address;
private final Class<? extends ServerChannel> channelType;
private final Map<ChannelOption<?>, ?> channelOptions;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
private final boolean usingSharedBossGroup;
Expand All @@ -80,6 +84,7 @@ class NettyServer implements InternalServer, WithLogId {

NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
Map<ChannelOption<?>, ?> channelOptions,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
Expand All @@ -90,6 +95,8 @@ class NettyServer implements InternalServer, WithLogId {
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
Expand Down Expand Up @@ -136,6 +143,15 @@ public void start(ServerListener serverListener) throws IOException {
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
}

if (channelOptions != null) {
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
@SuppressWarnings("unchecked")
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
b.childOption(key, entry.getValue());
}
}

b.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
Expand Down
18 changes: 17 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
Expand All @@ -65,6 +68,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer

private final SocketAddress address;
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<ChannelOption<?>, Object>();
@Nullable
private EventLoopGroup bossEventLoopGroup;
@Nullable
Expand Down Expand Up @@ -123,6 +128,17 @@ public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType
return this;
}

/**
* Specifies a channel option. As the underlying channel as well as network implementation may
* ignore this value applications should consider it a hint.
*
* @since 1.9.0
*/
public <T> NettyServerBuilder withChildOption(ChannelOption<T> option, T value) {
this.channelOptions.put(option, value);
return this;
}

/**
* Provides the boss EventGroupLoop to the server.
*
Expand Down Expand Up @@ -403,7 +419,7 @@ protected NettyServer buildTransportServer(
}

return new NettyServer(
address, channelType, bossEventLoopGroup, workerEventLoopGroup,
address, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be a copy of the map. If someone modifies the map later, existing servers would accidentally see the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. How about coping the map in NettyServer constructor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine to me.

negotiator, streamTracerFactories, transportTracerFactory,
maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,9 @@ private void startServer() throws IOException {
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(0),
NioServerSocketChannel.class, group, group, negotiator,
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
group, group, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
Expand Down
72 changes: 72 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,18 @@
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -39,6 +48,7 @@ public void getPort() throws Exception {
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
Expand Down Expand Up @@ -75,6 +85,7 @@ public void getPort_notStarted() throws Exception {
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
Expand All @@ -91,4 +102,65 @@ public void getPort_notStarted() throws Exception {

assertThat(ns.getPort()).isEqualTo(-1);
}

@Test(timeout = 60000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use timeout here, and instead use a JUnit4 timeout rule:

@Rule public final Timeout globalTimeout = Timeout.seconds(30);

The reason is that the timeout rule works correctly when debugging / stepping through code.

public void childChannelOptions() throws Exception {
final int originalLowWaterMark = 2097169;
final int originalHighWaterMark = 2097211;

Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();

channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark));

final AtomicInteger lowWaterMark = new AtomicInteger(0);
final AtomicInteger highWaterMark = new AtomicInteger(0);

final CountDownLatch countDownLatch = new CountDownLatch(1);

NettyServer ns = new NettyServer(
new InetSocketAddress(9999),
NioServerSocketChannel.class,
channelOptions,
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, 1, // ignore
1, 1, // ignore
true, 0); // ignore
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
Channel channel = ((NettyServerTransport)transport).channel();
WriteBufferWaterMark writeBufferWaterMark = channel.config()
.getOption(ChannelOption.WRITE_BUFFER_WATER_MARK);
lowWaterMark.set(writeBufferWaterMark.low());
highWaterMark.set(writeBufferWaterMark.high());

countDownLatch.countDown();

return null;
}

@Override
public void serverShutdown() {}
});

Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 9999), 8000);
countDownLatch.await();
socket.close();

assertThat(lowWaterMark.get()).isEqualTo(originalLowWaterMark);
assertThat(highWaterMark.get()).isEqualTo(originalHighWaterMark);

ns.shutdown();
}
}