Skip to content

Commit

Permalink
pool: close dcap accepter thread and server socket after client connects
Browse files Browse the repository at this point in the history
Motivation:

The first time a dcap client elects to transfer a file using "active
mode" (client connecting to pool), the dcap mover will request a server
socket and a thread be created to handle client connections.  These
resources are shared between successive "active" dcap movers, across all
pools in the JVM.

There is currently no mechanism to notify when the shared resources (the
thread and socket) are no longer needed.  This means that a pool that has
been selected for an active dcap transfer will not shut down cleanly when
notified, but requires the CellNucleus to interrupt the accepter thread
using the fall-back mechanism.

Modification:

Update dcap mover to acquire a closeable object that represents the open
server socket and accepter thread.  The mover indicates when this is no
longer needed by closing the object.  A reference counter then allows the
thread and server socket be closed when no longer needed.

Result:

Pools that have had an active transfers will close down without
requiring the fallback.  As a side-effect, a pool will keep the server
socket open only as long as it is needed.  Although the pool(s) will
attempt to reuse the same port number over successive transfers, it is
possible that the port number will change over time.

Target: master
Requires-notes: yes
Requires-book: no
Patch: https://rb.dcache.org/r/9673/
Acked-by: Tigran Mkrtchyan
  • Loading branch information
paulmillar committed Sep 1, 2016
1 parent 5679b20 commit 135522e
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 237 deletions.
Expand Up @@ -32,6 +32,7 @@
import dmg.cells.nucleus.CellPath;

import org.dcache.net.ProtocolConnectionPool;
import org.dcache.net.ProtocolConnectionPool.Listen;
import org.dcache.net.ProtocolConnectionPoolFactory;
import org.dcache.pool.repository.Allocator;
import org.dcache.pool.repository.RepositoryChannel;
Expand Down Expand Up @@ -83,7 +84,7 @@ public class DCapProtocol_3_nio implements MoverProtocol, ChecksumMover, CellArg


// bind passive dcap to port defined as org.dcache.dcap.port
private static ProtocolConnectionPoolFactory protocolConnectionPoolFactory;
private static ProtocolConnectionPoolFactory factory;

static {
int port = 0;
Expand All @@ -92,7 +93,7 @@ public class DCapProtocol_3_nio implements MoverProtocol, ChecksumMover, CellArg
port = Integer.parseInt(System.getProperty("org.dcache.dcap.port"));
}catch(NumberFormatException e){ /* bad values are ignored */}

protocolConnectionPoolFactory =
factory =
new ProtocolConnectionPoolFactory(port, new DCapChallengeReader());

}
Expand Down Expand Up @@ -398,26 +399,24 @@ public void runIO(FileAttributes fileAttributes,
_bigBuffer.flip();
socketChannel.write(_bigBuffer);
}else{ // passive connection
ProtocolConnectionPool pcp =
protocolConnectionPoolFactory.getConnectionPool(bufferSize.getRecvBufferSize());

InetAddress localAddress = NetworkUtils.
getLocalAddress(dcapProtocolInfo.getSocketAddress().getAddress());
InetSocketAddress socketAddress =
new InetSocketAddress(localAddress,
pcp.getLocalPort());

byte[] challenge = UUID.randomUUID().toString().getBytes();
PoolPassiveIoFileMessage<byte[]> msg = new PoolPassiveIoFileMessage<>("pool",
socketAddress, challenge);
msg.setId(dcapProtocolInfo.getSessionId());
_log.info("waiting for client to connect ({}:{})",
localAddress, pcp.getLocalPort());

CellPath cellpath = dcapProtocolInfo.door();
_cell.sendMessage (new CellMessage(cellpath, msg));
DCapProrocolChallenge dcapChallenge = new DCapProrocolChallenge(_sessionId, challenge);
socketChannel = pcp.getSocket(dcapChallenge);
try (Listen listen = factory.acquireListen(bufferSize.getRecvBufferSize())) {
InetAddress localAddress = NetworkUtils.
getLocalAddress(dcapProtocolInfo.getSocketAddress().getAddress());
InetSocketAddress socketAddress = new InetSocketAddress(localAddress,
listen.getPort());

byte[] challenge = UUID.randomUUID().toString().getBytes();
PoolPassiveIoFileMessage<byte[]> msg = new PoolPassiveIoFileMessage<>("pool",
socketAddress, challenge);
msg.setId(dcapProtocolInfo.getSessionId());
_log.info("waiting for client to connect ({}:{})", localAddress,
listen.getPort());

CellPath cellpath = dcapProtocolInfo.door();
_cell.sendMessage (new CellMessage(cellpath, msg));
DCapProrocolChallenge dcapChallenge = new DCapProrocolChallenge(_sessionId, challenge);
socketChannel = listen.getSocket(dcapChallenge);
}

Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
Expand Down

This file was deleted.

0 comments on commit 135522e

Please sign in to comment.