Skip to content

Commit

Permalink
pool: Unify channel handling between xrootd and http
Browse files Browse the repository at this point in the history
Moves the tracking of Netty channels into the base class. This changes
the behaviour of the HTTP mover such that the server doesn't stop
listening until all clients have disconnected.

I renamed some of the methods again to avoid overloading the channel
concept (mover channel vs Netty channel).

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: https://rb.dcache.org/r/7823/
  • Loading branch information
gbehrmann committed Feb 18, 2015
1 parent f356892 commit 7f3fae9
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 151 deletions.
Expand Up @@ -35,6 +35,7 @@
import diskCacheV111.util.FileCorruptedCacheException;

import org.dcache.namespace.FileAttribute;
import org.dcache.pool.movers.AbstractNettyTransferService;
import org.dcache.pool.movers.IoMode;
import org.dcache.pool.movers.MoverChannel;
import org.dcache.pool.repository.RepositoryChannel;
Expand Down Expand Up @@ -120,21 +121,17 @@ public class XrootdPoolRequestHandler extends AbstractXrootdRequestHandler
/**
* The server on which this request handler is running.
*/
private XrootdTransferService _server;

public XrootdPoolRequestHandler(XrootdTransferService server) {
_server = server;
}
private AbstractNettyTransferService<XrootdProtocolInfo> _server;

/**
* @throws IOException opening a server socket to handle the connection
* fails
* Maximum size of frame used for xrootd replies.
*/
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception
private final int _maxFrameSize;

public XrootdPoolRequestHandler(AbstractNettyTransferService<XrootdProtocolInfo> server, int maxFrameSize)
{
_server.clientConnected();
_server = server;
_maxFrameSize = maxFrameSize;
}

@Override
Expand Down Expand Up @@ -163,16 +160,14 @@ public void channelInactive(ChannelHandlerContext ctx)
for (FileDescriptor descriptor : _descriptors) {
if (descriptor != null) {
if (descriptor.isPersistOnSuccessfulClose()) {
_server.closeChannel(descriptor.getChannel(), new FileCorruptedCacheException(
_server.closeFile(descriptor.getChannel(), new FileCorruptedCacheException(
"File was opened with Persist On Successful Close and not closed."));
} else {
_server.closeChannel(descriptor.getChannel(), new CacheException(
_server.closeFile(descriptor.getChannel(), new CacheException(
"Client disconnected without closing file."));
}
}
}

_server.clientDisconnected();
}

@Override
Expand All @@ -184,11 +179,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
for (FileDescriptor descriptor : _descriptors) {
if (descriptor != null) {
if (descriptor.isPersistOnSuccessfulClose()) {
_server.closeChannel(descriptor.getChannel(), new FileCorruptedCacheException(
_server.closeFile(descriptor.getChannel(), new FileCorruptedCacheException(
"File was opened with Persist On Successful Close and client was disconnected due to an error: " +
t.getMessage(), t));
} else {
_server.closeChannel(descriptor.getChannel(), (Exception) t);
_server.closeFile(descriptor.getChannel(), (Exception) t);
}
}
}
Expand Down Expand Up @@ -230,7 +225,7 @@ protected XrootdResponse<OpenRequest> doOnOpen(ChannelHandlerContext ctx, OpenRe
throw new XrootdException(kXR_NotAuthorized, "Request lacks the " + UUID_PREFIX + " property.");
}

MoverChannel<XrootdProtocolInfo> file = _server.openChannel(uuid, false);
MoverChannel<XrootdProtocolInfo> file = _server.openFile(uuid, false);
if (file == null) {
_log.warn("No mover found for {}", msg);
throw new XrootdException(kXR_NotAuthorized, UUID_PREFIX + " is no longer valid.");
Expand Down Expand Up @@ -262,7 +257,7 @@ protected XrootdResponse<OpenRequest> doOnOpen(ChannelHandlerContext ctx, OpenRe
return new OpenResponse(msg, fd, null, null, stat);
} finally {
if (file != null) {
_server.closeChannel(file);
_server.closeFile(file);
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -389,7 +384,7 @@ protected Object doOnRead(ChannelHandlerContext ctx, ReadRequest msg)
if (msg.bytesToRead() == 0) {
return withOk(msg);
} else {
return new ChunkedFileDescriptorReadResponse(msg, _server.getMaxFrameSize(), _descriptors.get(fd));
return new ChunkedFileDescriptorReadResponse(msg, _maxFrameSize, _descriptors.get(fd));
}
}

Expand Down Expand Up @@ -425,15 +420,15 @@ protected Object doOnReadV(ChannelHandlerContext ctx, ReadVRequest msg)
int totalBytesToRead = req.BytesToRead() +
ReadVResponse.READ_LIST_HEADER_SIZE;

if (totalBytesToRead > _server.getMaxFrameSize()) {
if (totalBytesToRead > _maxFrameSize) {
_log.warn("Vector read of {} bytes requested, exceeds " +
"maximum frame size of {} bytes!", totalBytesToRead,
_server.getMaxFrameSize());
_maxFrameSize);
throw new XrootdException(kXR_ArgInvalid, "Single readv transfer is too large.");
}
}

return new ChunkedFileDescriptorReadvResponse(msg, _server.getMaxFrameSize(), new ArrayList<>(_descriptors));
return new ChunkedFileDescriptorReadvResponse(msg, _maxFrameSize, new ArrayList<>(_descriptors));
}

/**
Expand Down Expand Up @@ -525,7 +520,7 @@ protected XrootdResponse<CloseRequest> doOnClose(ChannelHandlerContext ctx, Clos
"open file.");
}

_server.closeChannel(_descriptors.set(fd, null).getChannel());
_server.closeFile(_descriptors.set(fd, null).getChannel());
return withOk(msg);
}

Expand All @@ -541,7 +536,7 @@ protected XrootdResponse<QueryRequest> doOnQuery(ChannelHandlerContext ctx, Quer
s.append(0);
break;
case "readv_ior_max":
s.append(_server.getMaxFrameSize() - ReadVResponse.READ_LIST_HEADER_SIZE);
s.append(_maxFrameSize - ReadVResponse.READ_LIST_HEADER_SIZE);
break;
case "readv_iov_max":
s.append(READV_IOV_MAX);
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.dcache.xrootd.pool;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
Expand All @@ -31,7 +30,6 @@
import java.net.SocketException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import diskCacheV111.util.CacheException;

Expand Down Expand Up @@ -90,8 +88,6 @@ public class XrootdTransferService extends AbstractNettyTransferService<XrootdPr
private int maxFrameSize;
private List<ChannelHandlerFactory> plugins;

private int numberClientConnections;

public XrootdTransferService()
{
super("xrootd");
Expand Down Expand Up @@ -141,54 +137,28 @@ protected void sendAddressToDoor(NettyMover<XrootdProtocolInfo> mover, int port,
}

@Override
protected ChannelInitializer newChannelInitializer() {
return new XrootdPoolChannelInitializer();
}

/**
* Only shutdown the server if no client connection left.
*/
@Override
protected synchronized void conditionallyStopServer() {
if (numberClientConnections == 0) {
super.conditionallyStopServer();
}
}

public synchronized void clientConnected()
protected void initChannel(Channel ch) throws Exception
{
numberClientConnections++;
}
super.initChannel(ch);

public synchronized void clientDisconnected() {
numberClientConnections--;
conditionallyStopServer();
}
ChannelPipeline pipeline = ch.pipeline();

private class XrootdPoolChannelInitializer extends ChannelInitializer
{
@Override
protected void initChannel(Channel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("encoder", new XrootdEncoder());
pipeline.addLast("decoder", new XrootdDecoder());
if (LOGGER.isDebugEnabled()) {
pipeline.addLast("logger", new LoggingHandler());
}
pipeline.addLast("handshake",
new XrootdHandshakeHandler(XrootdProtocol.DATA_SERVER));
for (ChannelHandlerFactory plugin: plugins) {
pipeline.addLast("plugin:" + plugin.getName(),
plugin.createHandler());
}
pipeline.addLast("timeout", new IdleStateHandler(0,
0,
clientIdleTimeout,
clientIdleTimeoutUnit));
pipeline.addLast("chunkedWriter", new ChunkedResponseWriteHandler());
pipeline.addLast("transfer", new XrootdPoolRequestHandler(XrootdTransferService.this));
pipeline.addLast("encoder", new XrootdEncoder());
pipeline.addLast("decoder", new XrootdDecoder());
if (LOGGER.isDebugEnabled()) {
pipeline.addLast("logger", new LoggingHandler());
}
pipeline.addLast("handshake",
new XrootdHandshakeHandler(XrootdProtocol.DATA_SERVER));
for (ChannelHandlerFactory plugin: plugins) {
pipeline.addLast("plugin:" + plugin.getName(),
plugin.createHandler());
}
pipeline.addLast("timeout", new IdleStateHandler(0,
0,
clientIdleTimeout,
clientIdleTimeoutUnit));
pipeline.addLast("chunkedWriter", new ChunkedResponseWriteHandler());
pipeline.addLast("transfer", new XrootdPoolRequestHandler(this, maxFrameSize));
}
}
Expand Up @@ -193,9 +193,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception
_logger.debug("HTTP connection from {} closed", ctx.channel().remoteAddress());
for (MoverChannel<HttpProtocolInfo> file: _files) {
if (file == _writeChannel) {
_server.closeChannel(file, new FileCorruptedCacheException("Connection lost before end of file."));
_server.closeFile(file, new FileCorruptedCacheException("Connection lost before end of file."));
} else {
_server.closeChannel(file);
_server.closeFile(file);
}
}
_files.clear();
Expand All @@ -213,7 +213,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
} else {
cause = new CacheException(t.toString(), t);
}
_server.closeChannel(file, cause);
_server.closeFile(file, cause);
}
_files.clear();
ctx.channel().close();
Expand Down Expand Up @@ -473,7 +473,7 @@ private MoverChannel<HttpProtocolInfo> open(HttpRequest request, boolean exclusi
}

UUID uuid = UUID.fromString(uuidList.get(0));
MoverChannel<HttpProtocolInfo> file = _server.openChannel(uuid, exclusive);
MoverChannel<HttpProtocolInfo> file = _server.openFile(uuid, exclusive);
if (file == null) {
throw new IllegalArgumentException("Request is no longer valid. " +
"Please resubmit to door.");
Expand All @@ -499,9 +499,9 @@ private MoverChannel<HttpProtocolInfo> open(HttpRequest request, boolean exclusi
private void close(MoverChannel<HttpProtocolInfo> channel, Exception exception)
{
if (exception == null) {
_server.closeChannel(channel);
_server.closeFile(channel);
} else {
_server.closeChannel(channel, exception);
_server.closeFile(channel, exception);
}
_files.remove(channel);
}
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.dcache.http;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
Expand Down Expand Up @@ -136,39 +135,24 @@ private URI getUri(HttpProtocolInfo protocolInfo, int port, UUID uuid)
}

@Override
protected ChannelInitializer newChannelInitializer()
protected void initChannel(Channel ch) throws Exception
{
return new HttpChannelInitializer();
}
super.initChannel(ch);

/**
* Factory that creates new server handler.
*
* The pipeline can handle HTTP compression and chunked transfers.
*
* @author tzangerl
*
*/
class HttpChannelInitializer extends ChannelInitializer
{
@Override
protected void initChannel(Channel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());

if (LOGGER.isDebugEnabled()) {
pipeline.addLast("logger", new LoggingHandler());
}
pipeline.addLast("idle-state-handler",
new IdleStateHandler(0,
0,
clientIdleTimeout,
clientIdleTimeoutUnit));
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("transfer", new HttpPoolRequestHandler(HttpTransferService.this, chunkSize));
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());

if (LOGGER.isDebugEnabled()) {
pipeline.addLast("logger", new LoggingHandler());
}
pipeline.addLast("idle-state-handler",
new IdleStateHandler(0,
0,
clientIdleTimeout,
clientIdleTimeoutUnit));
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("transfer", new HttpPoolRequestHandler(this, chunkSize));
}
}

0 comments on commit 7f3fae9

Please sign in to comment.