Skip to content

Commit

Permalink
dcache-xroot: Allow client to reattempt open on pool when I/O stalls
Browse files Browse the repository at this point in the history
Motivation:

This patch is proposed as a palliative for an issue we
have encountered on our production systems at Fermilab.

What appears to happen is that IO on read stalls (this
sometimes reveals itself as a broken pipe exception
after the fact).  After the client waits (default 1 minute),
it attempts a reconnect directly to the pool endpoint
without going back to the door.  Depending on the timing,
the client may find, at its second open attempt,
that the mover has been released (because of either the
late arriving exception or channel inactivity) and is
no longer available.  In this case, the read then
usually fails with the client receiving a "uuid no longer valid"
message.  In other cases, it may encounter a socket
closed error, and, more rarely, it may actually succeed.

While we are not entirely certain what is causing this
to happen (there are some correlations with disk RAID
degeneration and network TX spiking), we would nonetheless
like to consistently allow the client to reconnect
and finish the read when possible.

Modification:

The Netty Mover Channel (inner class in the Netty Transfer Service)
can be shared or exclusive. In HTTP (GET) the mover can evidently
be used to read concurrently at different offsets on different
connections.  In xroot, however, chunked reads are all done on
the same socket, even though the xroot pool opens the channel
in shared mode.

In support of shared mode, the channel uses a reference
count to determine whether it can be closed.  On each
open, the count is incremented, and thus will require
a corresponding close.

Because, as mentioned above, the release of the mover
through the methods channelInactive and/or exceptionCaught
is subject to timing and the order in which this
occurs is not guaranteed with respect to the client
requests to login, open and end the previous session,
a strategy of releasing without closing the mover in
those methods is not reliable.  Nor can we, in general,
expect the client to trigger multiple closes in the
proper order.

But since a reference count > 1 in xroot (as opposed
to HTTP) always denotes a previous failure and
successive attempt at recovery, we can safely force
closure of the mover on the first close request
received.

To implement this new logic, three things have been done:

If the mover channel IO mode is READ (and if any
associated caught exception is an IOException),
we do not release the mover as before, but allow it
to remain in the map, available for a reconnect attempt.

On doOnClose (in response to the client's kXR_close
request), a new Mover Channel releaseAll() method
is called instead of release(). This forces close
on the Mover Channel (a small method has also been
added to the Mover Channel Sync class in support
of this).

In order not to rely on the JTM to clean up
(release) orphaned movers, we have also implemented
a timer. When a the channel goes inactive
but we do not release the mover, a timer is started.
If a reconnect occurs before the timeout, the
timer is canceled. Otherwise, at the timeout,
the mover is released as on close().

We have added debug logging to track the cases
where timers are started and stopped.

Result:

No observable change to the normal semantics
of read or write.  On I/O stall on the pool, with
the client attempting reconnect, the read succeeds
and the client disconnects cleanly.

Target: master
Request: 7.0
Request: 6.2
Request: 6.1
Request: 6.0
Request: 5.2
Patch: https://rb.dcache.org/r/12917/
Acked-by: Dmitry
  • Loading branch information
alrossi committed Mar 15, 2021
1 parent bd03a8a commit da13dda
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FileCorruptedCacheException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
Expand All @@ -38,10 +37,6 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

import diskCacheV111.util.CacheException;
import diskCacheV111.util.FileCorruptedCacheException;

import org.dcache.namespace.FileAttribute;
import org.dcache.pool.movers.NettyTransferService;
import org.dcache.pool.repository.OutOfDiskException;
Expand Down Expand Up @@ -87,6 +82,8 @@
import org.dcache.xrootd.util.FileStatus;
import org.dcache.xrootd.util.OpaqueStringParser;
import org.dcache.xrootd.util.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.dcache.xrootd.protocol.XrootdProtocol.*;

Expand Down Expand Up @@ -126,6 +123,7 @@ public class XrootdPoolRequestHandler extends AbstractXrootdRequestHandler
* the Netty ChannelPipeline, so okay to store stateful information.
*/
private boolean _hasOpenedFiles;

/**
* Address of the door. Enables us to redirect the client back if an
* operation should better be performed at the door.
Expand Down Expand Up @@ -170,13 +168,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
}

/**
* @throws IOException closing the server socket that handles the
* connection fails
*/
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception
{
/* close leftover descriptors */
for (FileDescriptor descriptor : _descriptors) {
Expand All @@ -191,8 +184,20 @@ public void channelInactive(ChannelHandlerContext ctx)
} else if (descriptor.getChannel().getIoMode().contains(StandardOpenOption.WRITE)) {
descriptor.getChannel().release(new CacheException(
"Client disconnected without closing file."));
} else {
} else if (!descriptor.getChannel().getIoMode().contains(StandardOpenOption.READ)) {
descriptor.getChannel().release();
} else {
/*
* Because IO stall during a read may trigger the xrootd client
* to attempt, after a timeout, to reconnect by opening another socket,
* we would like not to reject it on the basis of a missing mover. Thus in the
* case that the file descriptor maps to a READ mover channel, we leave the
* mover in the map held by the transfer service. We start a timer
* in case there is no reconnect, in which case the channel is then released.
*/
_server.scheduleReconnectTimerForMover(descriptor);
_log.debug("{} channeInactive, starting timer for reconnect with mover {}.",
ctx.channel(), descriptor.getChannel().getMoverUuid());
}
}
}
Expand All @@ -208,10 +213,23 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
if (descriptor != null) {
if (descriptor.isPersistOnSuccessfulClose()) {
descriptor.getChannel().release(new FileCorruptedCacheException(
"File was opened with Persist On Successful Close and client was disconnected due to an error: " +
"File was opened with Persist On Successful Close and client was "
+ "disconnected due to an error: " +
t.getMessage(), t));
} else {
} else if (!(descriptor.getChannel().getIoMode().contains(StandardOpenOption.READ)
&& t instanceof IOException)) {
descriptor.getChannel().release(t);
} else {
/*
* Analogously to the exclusion of READ channels in the
* channelInactive method (see explanation above).
*
* Here we limit the exclusion to an actual instance of IOException on READ
* (the stall could present itself eventually as a broken pipe exception).
*/
_server.scheduleReconnectTimerForMover(descriptor);
_log.debug("{} exceptionCaught ({}), starting timer for reconnect with mover {}.",
ctx.channel(), t.toString(), descriptor.getChannel().getMoverUuid());
}

if (descriptor instanceof TpcWriteDescriptor) {
Expand Down Expand Up @@ -301,6 +319,11 @@ protected XrootdResponse<OpenRequest> doOnOpen(ChannelHandlerContext ctx,
});
}

/*
* Stop any timer in case this is a reconnect.
*/
_server.cancelReconnectTimerForMover(uuid);

try {
FileDescriptor descriptor;
boolean isWrite = file.getIoMode().contains(StandardOpenOption.WRITE);
Expand Down Expand Up @@ -664,7 +687,37 @@ protected XrootdResponse<CloseRequest> doOnClose(ChannelHandlerContext ctx, Clos
"open file.");
}

ListenableFuture<Void> future = _descriptors.get(fd).getChannel().release();
/*
* While there is currently no provision in the dCache xroot implementation
* for the concurrent use of the same mover channel for file reads on
* separate sockets (i.e., chunked reading from different offsets are all done
* on the same socket/channel, and each new open at the door is mapped to a new
* mover), the mover is nevertheless opened here on the pool in non-exclusive mode.
*
* What this legacy choice implies is that the xroot mover could potentially be reused
* while open, and in fact we wish to provide for this on READs by not releasing the mover
* should one of the channels using it go inactive or catch an IOException.
*
* Since for dCache xrootd (unlike HTTP), multiple sockets using the same mover thus
* always denotes a fail/retry or timeout/retry attempt, the client will have considered
* the preceding attempt null and will send an end session request after logging in on
* a new socket. It will also not request close on the first socket, for obvious
* reasons.
*
* The behavior of the xrootd client thus poses a problem. If we were to map movers to
* session ids and then release them when the client sends the end session request,
* this would still risk closing the mover and removing it from the uuid map before
* the new open request has a chance to increment the mover's reference count (a
* race with the client).
*
* One solution would be implement a "releaseNoClose" semantics on the mover, and use
* that in the channelInactive and exceptionCaught methods; but that choice still
* is subject to timing issues and is not reliable.
*
* The alternative adopted here is to implement a forcible close by releasing
* all references to the mover.
*/
ListenableFuture<Void> future = _descriptors.get(fd).getChannel().releaseAll();
future.addListener(() -> {
try {
Uninterruptibles.getUninterruptibly(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,34 @@
package org.dcache.xrootd.pool;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import diskCacheV111.util.CacheException;
import dmg.cells.nucleus.CellCommandListener;
import dmg.cells.nucleus.CellPath;
import dmg.util.command.Argument;
import dmg.util.command.Command;
import dmg.util.command.Option;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import diskCacheV111.util.CacheException;

import dmg.cells.nucleus.CellCommandListener;
import dmg.cells.nucleus.CellPath;
import dmg.util.command.Argument;
import dmg.util.command.Command;
import dmg.util.command.Option;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.dcache.pool.movers.NettyMover;
import org.dcache.pool.movers.NettyTransferService;
import org.dcache.util.CDCThreadFactory;
Expand All @@ -64,6 +59,9 @@
import org.dcache.xrootd.protocol.XrootdProtocol;
import org.dcache.xrootd.security.SigningPolicy;
import org.dcache.xrootd.stream.ChunkedResponseWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;

/**
* xrootd transfer service.
Expand Down Expand Up @@ -141,9 +139,13 @@ public String call() throws Exception {
private Map<String, String> queryConfig;
private NioEventLoopGroup thirdPartyClientGroup;
private ScheduledExecutorService thirdPartyShutdownExecutor;

private SigningPolicy signingPolicy;
private long tpcServerResponseTimeout;
private TimeUnit tpcServerResponseTimeoutUnit;
private Map<String, Timer> reconnectTimers;
private long readReconnectTimeout;
private TimeUnit readReconnectTimeoutUnit;

public XrootdTransferService()
{
Expand Down Expand Up @@ -172,6 +174,7 @@ public synchronized void init() {
.setNameFormat("xrootd-tpc-client-%d")
.build();
thirdPartyClientGroup = new NioEventLoopGroup(0, new CDCThreadFactory(factory));
reconnectTimers = new HashMap<>();
}

@Required
Expand All @@ -180,6 +183,65 @@ public void setAccessLogPlugins(List<ChannelHandlerFactory> plugins)
this.accessLogPlugins = plugins;
}

/**
* Stop the timer, presumably because the client has reconnected.
*
* @param uuid of the mover (channel)
*/
public synchronized void cancelReconnectTimerForMover(UUID uuid)
{
Timer timer = reconnectTimers.remove(uuid.toString());
if (timer != null) {
LOGGER.debug("timer for {} cancelled.", uuid);
timer.cancel();
}
}

/**
* Because IO stall during a read may trigger the xrootd client
* to attempt, after a timeout, to reconnect by opening another socket,
* we would like not to reject it on the basis of a missing mover. Thus in the
* case that the file descriptor maps to a READ mover channel, we leave the
* mover in the map held by the transfer service and we start a timer.
* If the client fails to reconnect before expiration, the channel is released.
*
* @param descriptor referencing the mover (channel)
*/
public synchronized void scheduleReconnectTimerForMover(FileDescriptor descriptor)
{
NettyMoverChannel channel = descriptor.getChannel();
UUID key = channel.getMoverUuid();
/*
* Make sure no timer exists associated with this mover.
* This might happen if both channel inactive and exception caught
* calls trigger this method in rapid succession.
*/
cancelReconnectTimerForMover(key);
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
LOGGER.debug("timer for {} expired, releasing channel.", key);
channel.releaseAll();
removeReadReconnectTimer(key);
}
};
reconnectTimers.put(key.toString(), timer);
timer.schedule(task, readReconnectTimeoutUnit.toMillis(readReconnectTimeout));
}

@Required
public void setReadReconnectTimeout(long readReconnectTimeout)
{
this.readReconnectTimeout = readReconnectTimeout;
}

@Required
public void setReadReconnectTimeoutUnit(TimeUnit readReconnectTimeoutUnit)
{
this.readReconnectTimeoutUnit = readReconnectTimeoutUnit;
}

@Required
public void setPlugins(List<ChannelHandlerFactory> plugins)
{
Expand Down Expand Up @@ -326,4 +388,9 @@ public synchronized void shutdown() {
Thread.currentThread().interrupt();
}
}

private synchronized void removeReadReconnectTimer(UUID key)
{
reconnectTimers.remove(key.toString());
}
}
Loading

0 comments on commit da13dda

Please sign in to comment.