Skip to content

Commit

Permalink
pool: always bind to TCP ports for netty based movers
Browse files Browse the repository at this point in the history
Motivation:
Currently netty-based moves use late port binding, IOW, the bind will
happen when first mover is started and unbind when no more active movers
there. This has two issues (with the result "No free port within range"):

1) In a situation, where a pool has limited number of ports reserved for
xroot and http moves, the it might be that no free ports available. Thus
some movers will fail to start.

2) the movers might start and stop in unfortunate sequence, so that
   after port unbind, the next client comes before TCP port is released
   y kernel (typically 1m).

Thus having early bind have an advantage of either fast fail, if not
ports available, and keep the port for pool lifetime.

Modification:

Update NettyTransferServie to bind the port on startup. Dropped dependency on
Spring's SmartLifecycle interface as it gives us no additional value.

Result:
more predictable pool operation

Fixes: #7148
Acked-by: Dmitry Litvintsev
Target: master
Require-book: no
Require-notes: yes
  • Loading branch information
kofemann committed Jun 13, 2023
1 parent 326de63 commit 9057f64
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 56 deletions.
Expand Up @@ -41,6 +41,8 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.EnumSet;
Expand Down Expand Up @@ -185,7 +187,7 @@ public ScheduledExecutorService getThirdPartyShutdownExecutor() {
}

@Override
public synchronized void start() {
public synchronized void start() throws IOException {
super.start();
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("xrootd-tpc-client-%d")
Expand Down
Expand Up @@ -68,15 +68,16 @@
import org.dcache.util.ChannelCdcSessionHandlerWrapper;
import org.dcache.util.Checksum;
import org.dcache.util.ChecksumType;
import org.dcache.util.FireAndForgetTask;
import org.dcache.util.NettyPortRange;
import org.dcache.util.NetworkUtils;
import org.dcache.util.TryCatchTemplate;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.SmartLifecycle;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
* Abstract base class for Netty based transfer services. This class provides most methods needed by
Expand All @@ -87,7 +88,7 @@
* the Netty channel to close.
*/
public abstract class NettyTransferService<P extends ProtocolInfo>
implements TransferService<NettyMover<P>>, MoverFactory, CellIdentityAware, SmartLifecycle {
implements TransferService<NettyMover<P>>, MoverFactory, CellIdentityAware {

private static final Logger LOGGER =
LoggerFactory.getLogger(NettyTransferService.class);
Expand Down Expand Up @@ -250,7 +251,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
openChannels.remove(ctx.channel());
conditionallyStopServer();
}
});
}
Expand Down Expand Up @@ -296,8 +296,8 @@ protected synchronized void stopServer() {
/**
* Method used by Spring to tell this bean to start.
*/
@Override
public synchronized void start() {
@PostConstruct
public synchronized void start() throws IOException {
timeoutScheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(name + "-connect-timeout").build());
Expand All @@ -306,48 +306,22 @@ public synchronized void start() {
socketGroup = new NioEventLoopGroup(threads,
new CDCThreadFactory(new ThreadFactoryBuilder().setNameFormat(
name + "-net-%d").build()));
}

@Override
public synchronized boolean isRunning() {
return timeoutScheduler != null;
}

startServer();
}

/**
* Method used by Spring to tell this bean to shutdown synchronously.
*/
@Override
@PreDestroy
public synchronized void stop() {
LOGGER.debug("NettyTransferService#stop started");
initialiseShutdown();
awaitShutdownCompletion();
LOGGER.debug("NettyTransferService#stop completed");
}

/**
* Method used by Spring to tell this bean to shutdown asynchronously.
*
* @param callback The callback that must be called.
*/
@Override
public synchronized void stop(final Runnable callback) {
LOGGER.debug("NettyTransferService#stop (with callback) started");
initialiseShutdown();
LOGGER.debug("NettyTransferService#stop (with callback) shutdown initialised");
Runnable reportShutdownCompleted = new FireAndForgetTask(() ->
{
LOGGER.debug("NettyTransferService#stop (with callback) waiting thread started");
try {
awaitShutdownCompletion();
LOGGER.debug("NettyTransferService#stop (with callback) shutdown completed");
} finally {
callback.run();
}
});
new Thread(reportShutdownCompleted, name + "-async-shutdown").start();
}

protected void initialiseShutdown() {
stopServer();
timeoutScheduler.shutdown();
Expand All @@ -374,24 +348,6 @@ private void awaitShutdownCompletion() {
}
}

/**
* Start server if there are any registered channels.
*/
protected synchronized void conditionallyStartServer() throws IOException {
if (!uuids.isEmpty()) {
startServer();
}
}

/**
* Stop server if there are no channels.
*/
protected synchronized void conditionallyStopServer() {
if (openChannels.isEmpty() && uuids.isEmpty()) {
stopServer();
}
}

/**
* @return The address to which the server channel was last bound.
*/
Expand Down Expand Up @@ -424,7 +380,7 @@ public void execute()
if (uuids.putIfAbsent(uuid, channel) != null) {
throw new IllegalStateException("UUID conflict");
}
conditionallyStartServer();

setCancellable(channel);
InetAddress localIP =
NetworkUtils.getLocalAddress(((IpProtocolInfo)mover.getProtocolInfo()).getSocketAddress().getAddress());
Expand Down Expand Up @@ -673,7 +629,6 @@ protected void closeMoverChannel(NettyMover<P> mover, Optional<Throwable> error)
} else {
channel.done();
}
conditionallyStopServer();
}
}

Expand Down

0 comments on commit 9057f64

Please sign in to comment.