Skip to content

Commit

Permalink
dcache-xroot: interrupt doOnOpen thread in the event of channel inactive
Browse files Browse the repository at this point in the history
Motivation:

Since June at FNAL we have encountered a situation
where after the xroot door has been running for a while,
clients begin to be refused connections.  We have
also noticed during this period that some requests
are hanging on open, and that the client quits
(the channel goes inactive), but the thread processing
the Pool Manager selection waits for a hour before
timing out.   Our conjecture is that the latter
situation is contributing to the rejection of
other client connections because if the client
that initiated the transfer keeps timing out
and retrying, it will use up all the threads
available to service open requests (alternately,
several clients could be engaged in something
similar).

For whatever reason ... whether the client
is requesting a file which has not been prestaged,
or for slowness on the part of the Pool Manager ...
the transfer request should not be allowed to
continue once the channel goes inactive.

Modification:

In the request handler, record the thread that is
servicing the open request and then null it out
when the request returns.  Should the channel
go inactive and find the thread not null, it
will interrupt it.  This will break any
synchronous 'get' on the futures from
pool selection and mover start.

Result:

The client may retry, but the previous thread
should be released and made available again,
preventing thread starvation and inability to
allow more clients to connect.

Target: master
Patch: https://rb.dcache.org/r/13638/
Request: 8.1
Request: 8.0
Request: 7.2
Requires-book: no
Acked-by: Dmitry
  • Loading branch information
alrossi authored and mksahakyan committed Sep 6, 2022
1 parent 4122a3b commit 6c1b92b
Showing 1 changed file with 37 additions and 0 deletions.
Expand Up @@ -24,6 +24,7 @@
import static org.dcache.xrootd.protocol.XrootdProtocol.kXR_ArgInvalid;
import static org.dcache.xrootd.protocol.XrootdProtocol.kXR_ArgMissing;
import static org.dcache.xrootd.protocol.XrootdProtocol.kXR_FileNotOpen;
import static org.dcache.xrootd.protocol.XrootdProtocol.kXR_IOError;
import static org.dcache.xrootd.protocol.XrootdProtocol.kXR_InvalidRequest;
import static org.dcache.xrootd.protocol.XrootdProtocol.kXR_NotAuthorized;
import static org.dcache.xrootd.protocol.XrootdProtocol.kXR_Qcksum;
Expand Down Expand Up @@ -202,6 +203,13 @@ public boolean isLoggedIn() {
*/
private final Map<String, String> _queryConfig;

/**
* The thread associated with the open call.
* This is held here in case an inactive event occurs on the channel
* and the thread is waiting for the redirect.
*/
private volatile Thread onOpenThread;

public XrootdRedirectHandler(XrootdDoor door, FsPath rootPath, ExecutorService executor,
Map<String, String> queryConfig,
Map<String, String> appIoQueues) {
Expand Down Expand Up @@ -243,6 +251,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
_log.info("channel inactive event received on {}.", ctx.channel());

/**
* If the doOnOpen call has not yet returned, interrupt its thread.
*/
interruptOnOpenThread();
ctx.fireChannelInactive();
}

Expand Down Expand Up @@ -292,6 +305,13 @@ protected XrootdResponse<OpenRequest> doOnOpen(ChannelHandlerContext ctx, OpenRe
*/

InetSocketAddress localAddress = getDestinationAddress();

/**
* Register this thread, so that it can be interrupted.
* (If and when the above suggestion is implemented, this will no longer be necessary.)
*/
setOnOpenThread();

InetSocketAddress remoteAddress = getSourceAddress();
LoginSessionInfo loginSessionInfo = sessionInfo();

Expand Down Expand Up @@ -476,6 +496,8 @@ protected XrootdResponse<OpenRequest> doOnOpen(ChannelHandlerContext ctx, OpenRe
return withError(ctx, req, kXR_ServerError, "Server shutdown");
} catch (XrootdException e) {
return withError(ctx, req, e.getError(), e.getMessage());
} finally {
unsetOnOpenThread();
}
}

Expand Down Expand Up @@ -1317,4 +1339,19 @@ private LoginSessionInfo sessionInfo() {
return _logins.peek();
}

private synchronized void setOnOpenThread() {
onOpenThread = Thread.currentThread();
}

private synchronized void unsetOnOpenThread() {
onOpenThread = null;
}

private synchronized void interruptOnOpenThread() {
if (onOpenThread != null) {
_log.info("{} called interruptOnOpenThread; interrupting {}.", Thread.currentThread(),
onOpenThread);
onOpenThread.interrupt();
}
}
}

0 comments on commit 6c1b92b

Please sign in to comment.