Skip to content

Commit

Permalink
dcache-xroot: fix memory leak, race and other small issues with recon…
Browse files Browse the repository at this point in the history
…nect to pool

Motivation:

Patch: https://rb.dcache.org/r/12917/
Committed: master@5b1329b31d34184e5266bf24ede5f13c5e9c4f9d

attempted to fix a reconnect issue on the pools.
This patch corrects a slow memory leak and a potential
mishandling of a race condition between client and server.
It also makes a better effort to prevent/clean up
orphaned timers.

Modification:

1.  do not call release ALL on expiration or cancel, only
    on close;
2.  call timer.cancel in the TimerTask run method; otherwise,
    the Timer run method goes into indefinite wait and
    it/its Thread will not be garbage collected;
3.  cancel the timer on close;
4.  do not start the timer if the mover has already been
    released, i.e., removed from the mover map.

Also added a little more (debug) logging and renames
two methods.

Result:

Bugs eliminated.

Target: master
Request: 7.0
Request: 6.2
Request: 6.1
Request: 6.0
Request: 5.2
Patch: https://rb.dcache.org/r/12964/
Acked-by: Lea
  • Loading branch information
alrossi committed Apr 15, 2021
1 parent bb8db47 commit 3811a75
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ protected XrootdResponse<OpenRequest> doOnOpen(ChannelHandlerContext ctx,
/*
* Stop any timer in case this is a reconnect.
*/
_server.cancelReconnectTimerForMover(uuid);
_server.cancelReconnectTimeoutForMover(uuid);
_log.debug("doOnOpen, called cancel on reconnect timers for {}", uuid);

XrootdProtocolInfo protocolInfo = file.getProtocolInfo();

Expand Down Expand Up @@ -773,7 +774,16 @@ protected XrootdResponse<CloseRequest> doOnClose(ChannelHandlerContext ctx, Clos
* The alternative adopted here is to implement a forcible close by releasing
* all references to the mover.
*/
ListenableFuture<Void> future = _descriptors.get(fd).getChannel().releaseAll();
NettyTransferService<XrootdProtocolInfo>.NettyMoverChannel channel
= _descriptors.get(fd).getChannel();

/*
* Stop any timer in case this is a reconnect.
*/
_server.cancelReconnectTimeoutForMover(channel.getMoverUuid());
_log.debug("doOnClose, called cancel on reconnect timers for {}", channel.getMoverUuid());

ListenableFuture<Void> future = channel.releaseAll();
future.addListener(() -> {
try {
Uninterruptibles.getUninterruptibly(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ public void setAccessLogPlugins(List<ChannelHandlerFactory> plugins)
*
* @param uuid of the mover (channel)
*/
public synchronized void cancelReconnectTimerForMover(UUID uuid)
public synchronized void cancelReconnectTimeoutForMover(UUID uuid)
{
Timer timer = reconnectTimers.remove(uuid.toString());
if (timer != null) {
LOGGER.debug("timer for {} cancelled.", uuid);
timer.cancel();
LOGGER.debug("cancelReconnectTimeoutForMover, timer cancelled for {}.", uuid);
}
}

Expand All @@ -216,23 +216,27 @@ public synchronized void scheduleReconnectTimerForMover(FileDescriptor descripto
{
NettyMoverChannel channel = descriptor.getChannel();
UUID key = channel.getMoverUuid();
/*
* Make sure no timer exists associated with this mover.
* This might happen if both channel inactive and exception caught
* calls trigger this method in rapid succession.
*/
cancelReconnectTimerForMover(key);
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
LOGGER.debug("timer for {} expired, releasing channel.", key);
channel.releaseAll();
removeReadReconnectTimer(key);
}
};
reconnectTimers.put(key.toString(), timer);
timer.schedule(task, readReconnectTimeoutUnit.toMillis(readReconnectTimeout));
cancelReconnectTimeoutForMover(key);
if (uuids.containsKey(key)) {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
channel.release();
removeReadReconnectTimer(key);
timer.cancel();
LOGGER.debug("reconnect timer expired for {}; " +
"channel was released and timer cancelled.",
key);
}
};
reconnectTimers.put(key.toString(), timer);
timer.schedule(task, readReconnectTimeoutUnit.toMillis(readReconnectTimeout));
} else {
LOGGER.debug("setReconnectTimeoutForMover for {}; " +
"mover no longer accessible; skipping.",
key);
}
}

@Required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public abstract class NettyTransferService<P extends ProtocolInfo>
private NettyPortRange portRange;

/** UUID to channel map. */
private final ConcurrentMap<UUID, NettyMoverChannel> uuids = Maps.newConcurrentMap();
protected final ConcurrentMap<UUID, NettyMoverChannel> uuids = Maps.newConcurrentMap();

/** Server name. */
private final String name;
Expand Down

0 comments on commit 3811a75

Please sign in to comment.