Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
// NIO and Netty Parameters
protected static final String SERVER_TCP_NODELAY = "serverTcpNoDelay";
protected static final String SERVER_SOCK_KEEPALIVE = "serverSockKeepalive";
protected static final String SERVER_TCP_KEEPIDLE = "serverTcpKeepIdle";
protected static final String SERVER_TCP_KEEPINTVL = "serverTcpKeepIntvl";
protected static final String SERVER_TCP_KEEPCNT = "serverTcpKeepCnt";
protected static final String SERVER_SOCK_LINGER = "serverTcpLinger";
protected static final String SERVER_WRITEBUFFER_LOW_WATER_MARK = "serverWriteBufferLowWaterMark";
protected static final String SERVER_WRITEBUFFER_HIGH_WATER_MARK = "serverWriteBufferHighWaterMark";
Expand Down Expand Up @@ -1627,6 +1630,69 @@ public ServerConfiguration setServerSockKeepalive(boolean keepalive) {
return this;
}

/**
* Set TCP_KEEPIDLE value for SO_KEEPALIVE.
*
* @param keepIdle
* TCP_KEEPIDLE value in seconds
* @return server configuration
*/
public ServerConfiguration setServerTcpKeepIdle(int keepIdle) {
setProperty(SERVER_TCP_KEEPIDLE, keepIdle);
return this;
}

/**
* Get TCP_KEEPIDLE value for SO_KEEPALIVE.
*
* @return TCP_KEEPIDLE value in seconds, default -1
*/
public int getServerTcpKeepIdle() {
return getInt(SERVER_TCP_KEEPIDLE, -1);
}

/**
* Set TCP_KEEPINTVL value for SO_KEEPALIVE.
*
* @param keepIntvl
* TCP_KEEPINTVL value in seconds
* @return server configuration
*/
public ServerConfiguration setServerTcpKeepIntvl(int keepIntvl) {
setProperty(SERVER_TCP_KEEPINTVL, keepIntvl);
return this;
}

/**
* Get TCP_KEEPINTVL value for SO_KEEPALIVE.
*
* @return TCP_KEEPINTVL value in seconds, default -1
*/
public int getServerTcpKeepIntvl() {
return getInt(SERVER_TCP_KEEPINTVL, -1);
}

/**
* Set TCP_KEEPCNT value for SO_KEEPALIVE.
*
* @param keepCnt
* TCP_KEEPCNT value
* @return server configuration
*/
public ServerConfiguration setServerTcpKeepCnt(int keepCnt) {
setProperty(SERVER_TCP_KEEPCNT, keepCnt);
return this;
}

/**
* Get TCP_KEEPCNT value for SO_KEEPALIVE.
*
* @return TCP_KEEPCNT value, default -1
*/
public int getServerTcpKeepCnt() {
return getInt(SERVER_TCP_KEEPCNT, -1);
}

/**
* Get zookeeper client backoff retry start time in millis.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
Expand All @@ -48,6 +49,7 @@
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.uring.IoUringChannelOption;
import io.netty.channel.uring.IoUringServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.flush.FlushConsolidationHandler;
Expand Down Expand Up @@ -316,13 +318,40 @@ private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddre
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
bootstrap.group(acceptorGroup, eventLoopGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, conf.getServerSockKeepalive());
bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));

// Set TCP keepalive parameters if configured. Use childOption() so that these
// options are applied to each accepted SocketChannel (not the listening
// ServerSocketChannel), which is the Netty-idiomatic approach and avoids
// relying on OS-level socket option inheritance behavior.
if (EventLoopUtil.isIoUringGroup(eventLoopGroup)) {
if (conf.getServerTcpKeepIdle() > 0) {
bootstrap.childOption(IoUringChannelOption.TCP_KEEPIDLE, conf.getServerTcpKeepIdle());
}
if (conf.getServerTcpKeepIntvl() > 0) {
bootstrap.childOption(IoUringChannelOption.TCP_KEEPINTVL, conf.getServerTcpKeepIntvl());
}
if (conf.getServerTcpKeepCnt() > 0) {
bootstrap.childOption(IoUringChannelOption.TCP_KEEPCNT, conf.getServerTcpKeepCnt());
}
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
if (conf.getServerTcpKeepIdle() > 0) {
bootstrap.childOption(EpollChannelOption.TCP_KEEPIDLE, conf.getServerTcpKeepIdle());
}
if (conf.getServerTcpKeepIntvl() > 0) {
bootstrap.childOption(EpollChannelOption.TCP_KEEPINTVL, conf.getServerTcpKeepIntvl());
}
if (conf.getServerTcpKeepCnt() > 0) {
bootstrap.childOption(EpollChannelOption.TCP_KEEPCNT, conf.getServerTcpKeepCnt());
}
}

if (EventLoopUtil.isIoUringGroup(eventLoopGroup)) {
bootstrap.channel(IoUringServerSocketChannel.class);
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
Expand Down Expand Up @@ -382,24 +411,16 @@ protected void initChannel(SocketChannel ch) throws Exception {
ServerBootstrap jvmBootstrap = new ServerBootstrap();
jvmBootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
jvmBootstrap.group(jvmEventLoopGroup, jvmEventLoopGroup);
jvmBootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
jvmBootstrap.childOption(ChannelOption.SO_KEEPALIVE, conf.getServerSockKeepalive());
jvmBootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
jvmBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(conf.getRecvByteBufAllocatorSizeMin(),
conf.getRecvByteBufAllocatorSizeInitial(), conf.getRecvByteBufAllocatorSizeMax()));
jvmBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
conf.getServerWriteBufferLowWaterMark(), conf.getServerWriteBufferHighWaterMark()));

if (jvmEventLoopGroup instanceof DefaultEventLoopGroup) {
jvmBootstrap.channel(LocalServerChannel.class);
} else if (EventLoopUtil.isIoUringGroup(jvmEventLoopGroup)) {
jvmBootstrap.channel(IoUringServerSocketChannel.class);
} else if (jvmEventLoopGroup instanceof EpollEventLoopGroup) {
jvmBootstrap.channel(EpollServerSocketChannel.class);
} else {
jvmBootstrap.channel(NioServerSocketChannel.class);
}
// Local transport (BOOKKEEPER-896) is an in-VM transport that does not involve
// the network stack, so network-related socket options such as TCP_NODELAY,
// SO_KEEPALIVE, SO_LINGER and TCP_KEEP* are not applicable and must not be set here.
jvmBootstrap.channel(LocalServerChannel.class);

jvmBootstrap.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,17 @@ protected ChannelFuture connect() {
if (conf.getTcpKeepCnt() > 0) {
bootstrap.option(EpollChannelOption.TCP_KEEPCNT, conf.getTcpKeepCnt());
}
} else if (EventLoopUtil.isIoUringGroup(eventLoopGroup)) {
// Set TCP keepalive parameters for IoUring if configured
if (conf.getTcpKeepIdle() > 0) {
bootstrap.option(IoUringChannelOption.TCP_KEEPIDLE, conf.getTcpKeepIdle());
}
if (conf.getTcpKeepIntvl() > 0) {
bootstrap.option(IoUringChannelOption.TCP_KEEPINTVL, conf.getTcpKeepIntvl());
}
if (conf.getTcpKeepCnt() > 0) {
bootstrap.option(IoUringChannelOption.TCP_KEEPCNT, conf.getTcpKeepCnt());
}
}
// if buffer sizes are 0, let OS auto-tune it
if (conf.getClientSendBufferSize() > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.proto;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.uring.IoUring;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.junit.Test;

/**
* Unit tests for TCP keepalive configuration in BookieNettyServer.
* This test focuses on server-side TCP keepalive configuration logic
* without requiring actual network connections.
*/
public class TestBookieNettyServerTcpKeepalive {

/**
* Test ServerConfiguration TCP keepalive getter methods.
*/
@Test
public void testServerConfigurationGetters() {
ServerConfiguration conf = new ServerConfiguration();

// Test default values (should be -1 as per implementation)
assertEquals("Default server TCP keep idle should be -1", -1, conf.getServerTcpKeepIdle());
assertEquals("Default server TCP keep interval should be -1", -1, conf.getServerTcpKeepIntvl());
assertEquals("Default server TCP keep count should be -1", -1, conf.getServerTcpKeepCnt());

// Test setter and getter methods
conf.setServerTcpKeepIdle(60);
conf.setServerTcpKeepIntvl(30);
conf.setServerTcpKeepCnt(5);

assertEquals("Server TCP keep idle should be 60", 60, conf.getServerTcpKeepIdle());
assertEquals("Server TCP keep interval should be 30", 30, conf.getServerTcpKeepIntvl());
assertEquals("Server TCP keep count should be 5", 5, conf.getServerTcpKeepCnt());

// Test that -1 means use system default (as per implementation comments)
conf.setServerTcpKeepIdle(-1);
conf.setServerTcpKeepIntvl(-1);
conf.setServerTcpKeepCnt(-1);

assertEquals("Server TCP keep idle should be -1 for system default", -1, conf.getServerTcpKeepIdle());
assertEquals("Server TCP keep interval should be -1 for system default", -1, conf.getServerTcpKeepIntvl());
assertEquals("Server TCP keep count should be -1 for system default", -1, conf.getServerTcpKeepCnt());
}

/**
* Test TCP keepalive configuration conditional logic for server side.
*/
@Test
public void testServerTcpKeepaliveConditionalLogic() {
ServerConfiguration conf = new ServerConfiguration();

// Test with positive values
conf.setServerTcpKeepIdle(60);
conf.setServerTcpKeepIntvl(30);
conf.setServerTcpKeepCnt(5);

assertTrue("Server TCP keep idle should be > 0", conf.getServerTcpKeepIdle() > 0);
assertTrue("Server TCP keep interval should be > 0", conf.getServerTcpKeepIntvl() > 0);
assertTrue("Server TCP keep count should be > 0", conf.getServerTcpKeepCnt() > 0);

// Test with zero values
conf.setServerTcpKeepIdle(0);
conf.setServerTcpKeepIntvl(0);
conf.setServerTcpKeepCnt(0);

assertFalse("Server TCP keep idle should not be configured for zero", conf.getServerTcpKeepIdle() > 0);
assertFalse("Server TCP keep interval should not be configured for zero", conf.getServerTcpKeepIntvl() > 0);
assertFalse("Server TCP keep count should not be configured for zero", conf.getServerTcpKeepCnt() > 0);

// Test with negative values (system default)
conf.setServerTcpKeepIdle(-1);
conf.setServerTcpKeepIntvl(-1);
conf.setServerTcpKeepCnt(-1);

assertFalse("Server TCP keep idle should not be configured for negative", conf.getServerTcpKeepIdle() > 0);
assertFalse("Server TCP keep interval should not be configured for negative", conf.getServerTcpKeepIntvl() > 0);
assertFalse("Server TCP keep count should not be configured for negative", conf.getServerTcpKeepCnt() > 0);

// Test partial configuration
conf.setServerTcpKeepIdle(60);
conf.setServerTcpKeepIntvl(0);
conf.setServerTcpKeepCnt(5);

assertTrue("Server TCP keep idle should be configured", conf.getServerTcpKeepIdle() > 0);
assertFalse("Server TCP keep interval should not be configured", conf.getServerTcpKeepIntvl() > 0);
assertTrue("Server TCP keep count should be configured", conf.getServerTcpKeepCnt() > 0);
}

/**
* Test EventLoopGroup type detection logic.
*/
@Test
public void testEventLoopGroupTypeDetection() {
// Test NIO event loop group detection
NioEventLoopGroup nioGroup = new NioEventLoopGroup(1);
assertFalse("NIO event loop group should not be detected as Epoll",
nioGroup.getClass().getName().contains("Epoll"));
assertFalse("NIO event loop group should not be detected as IoUring",
EventLoopUtil.isIoUringGroup(nioGroup));
nioGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);

// Test DefaultEventLoopGroup detection
DefaultEventLoopGroup defaultGroup = new DefaultEventLoopGroup(1);
assertFalse("Default event loop group should not be detected as Epoll",
defaultGroup.getClass().getName().contains("Epoll"));
assertFalse("Default event loop group should not be detected as IoUring",
EventLoopUtil.isIoUringGroup(defaultGroup));
defaultGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);

// Test Epoll support detection: use Epoll.isAvailable() to verify that the native
// library can actually be loaded, not just that the class is on the classpath.
boolean isEpollSupported = Epoll.isAvailable();

// Test IoUring support detection via runtime availability check.
boolean isIoUringSupported = IoUring.isAvailable();

System.out.println("Epoll support detected: " + isEpollSupported);
System.out.println("IoUring support detected: " + isIoUringSupported);
System.out.println("Operating system: " + System.getProperty("os.name"));

// The important thing is that the detection logic works correctly
// The actual result depends on the platform
assertTrue("EventLoopUtil should be available", EventLoopUtil.class != null);
}

/**
* Test TCP keepalive parameter validation logic.
*/
@Test
public void testTcpKeepaliveParameterValidation() {
ServerConfiguration conf = new ServerConfiguration();

// Test valid parameter ranges
conf.setServerTcpKeepIdle(1);
conf.setServerTcpKeepIntvl(1);
conf.setServerTcpKeepCnt(1);

assertTrue("Minimum TCP keep idle should be valid", conf.getServerTcpKeepIdle() > 0);
assertTrue("Minimum TCP keep interval should be valid", conf.getServerTcpKeepIntvl() > 0);
assertTrue("Minimum TCP keep count should be valid", conf.getServerTcpKeepCnt() > 0);

// Test typical production values
conf.setServerTcpKeepIdle(300);
conf.setServerTcpKeepIntvl(60);
conf.setServerTcpKeepCnt(3);

assertEquals("Production TCP keep idle should be 300", 300, conf.getServerTcpKeepIdle());
assertEquals("Production TCP keep interval should be 60", 60, conf.getServerTcpKeepIntvl());
assertEquals("Production TCP keep count should be 3", 3, conf.getServerTcpKeepCnt());

// Test boundary values
conf.setServerTcpKeepIdle(Integer.MAX_VALUE);
conf.setServerTcpKeepIntvl(Integer.MAX_VALUE);
conf.setServerTcpKeepCnt(Integer.MAX_VALUE);

assertTrue("Maximum TCP keep idle should be valid", conf.getServerTcpKeepIdle() > 0);
assertTrue("Maximum TCP keep interval should be valid", conf.getServerTcpKeepIntvl() > 0);
assertTrue("Maximum TCP keep count should be valid", conf.getServerTcpKeepCnt() > 0);
}
}
Loading
Loading