From 135522e3b97a059d021b29a9e07b7445aa7b6e4d Mon Sep 17 00:00:00 2001 From: Paul Millar Date: Tue, 23 Aug 2016 16:57:30 +0200 Subject: [PATCH] pool: close dcap accepter thread and server socket after client connects Motivation: The first time a dcap client elects to transfer a file using "active mode" (client connecting to pool), the dcap mover will request a server socket and a thread be created to handle client connections. These resources are shared between successive "active" dcap movers, across all pools in the JVM. There is currently no mechanism to notify when the shared resources (the thread and socket) are no longer needed. This means that a pool that has been selected for an active dcap transfer will not shut down cleanly when notified, but requires the CellNucleus to interrupt the accepter thread using the fall-back mechanism. Modification: Update dcap mover to acquire a closeable object that represents the open server socket and accepter thread. The mover indicates when this is no longer needed by closing the object. A reference counter then allows the thread and server socket be closed when no longer needed. Result: Pools that have had an active transfers will close down without requiring the fallback. As a side-effect, a pool will keep the server socket open only as long as it is needed. Although the pool(s) will attempt to reuse the same port number over successive transfers, it is possible that the port number will change over time. Target: master Requires-notes: yes Requires-book: no Patch: https://rb.dcache.org/r/9673/ Acked-by: Tigran Mkrtchyan --- .../pool/movers/DCapProtocol_3_nio.java | 43 ++- .../net/DummyProtocolConnectionPool.java | 81 ----- .../dcache/net/ProtocolConnectionPool.java | 295 +++++++++++------- .../net/ProtocolConnectionPoolFactory.java | 24 +- 4 files changed, 206 insertions(+), 237 deletions(-) delete mode 100644 modules/dcache/src/main/java/org/dcache/net/DummyProtocolConnectionPool.java diff --git a/modules/dcache-dcap/src/main/java/org/dcache/pool/movers/DCapProtocol_3_nio.java b/modules/dcache-dcap/src/main/java/org/dcache/pool/movers/DCapProtocol_3_nio.java index 9d86c271466..c87420984b6 100644 --- a/modules/dcache-dcap/src/main/java/org/dcache/pool/movers/DCapProtocol_3_nio.java +++ b/modules/dcache-dcap/src/main/java/org/dcache/pool/movers/DCapProtocol_3_nio.java @@ -32,6 +32,7 @@ import dmg.cells.nucleus.CellPath; import org.dcache.net.ProtocolConnectionPool; +import org.dcache.net.ProtocolConnectionPool.Listen; import org.dcache.net.ProtocolConnectionPoolFactory; import org.dcache.pool.repository.Allocator; import org.dcache.pool.repository.RepositoryChannel; @@ -83,7 +84,7 @@ public class DCapProtocol_3_nio implements MoverProtocol, ChecksumMover, CellArg // bind passive dcap to port defined as org.dcache.dcap.port - private static ProtocolConnectionPoolFactory protocolConnectionPoolFactory; + private static ProtocolConnectionPoolFactory factory; static { int port = 0; @@ -92,7 +93,7 @@ public class DCapProtocol_3_nio implements MoverProtocol, ChecksumMover, CellArg port = Integer.parseInt(System.getProperty("org.dcache.dcap.port")); }catch(NumberFormatException e){ /* bad values are ignored */} - protocolConnectionPoolFactory = + factory = new ProtocolConnectionPoolFactory(port, new DCapChallengeReader()); } @@ -398,26 +399,24 @@ public void runIO(FileAttributes fileAttributes, _bigBuffer.flip(); socketChannel.write(_bigBuffer); }else{ // passive connection - ProtocolConnectionPool pcp = - protocolConnectionPoolFactory.getConnectionPool(bufferSize.getRecvBufferSize()); - - InetAddress localAddress = NetworkUtils. - getLocalAddress(dcapProtocolInfo.getSocketAddress().getAddress()); - InetSocketAddress socketAddress = - new InetSocketAddress(localAddress, - pcp.getLocalPort()); - - byte[] challenge = UUID.randomUUID().toString().getBytes(); - PoolPassiveIoFileMessage msg = new PoolPassiveIoFileMessage<>("pool", - socketAddress, challenge); - msg.setId(dcapProtocolInfo.getSessionId()); - _log.info("waiting for client to connect ({}:{})", - localAddress, pcp.getLocalPort()); - - CellPath cellpath = dcapProtocolInfo.door(); - _cell.sendMessage (new CellMessage(cellpath, msg)); - DCapProrocolChallenge dcapChallenge = new DCapProrocolChallenge(_sessionId, challenge); - socketChannel = pcp.getSocket(dcapChallenge); + try (Listen listen = factory.acquireListen(bufferSize.getRecvBufferSize())) { + InetAddress localAddress = NetworkUtils. + getLocalAddress(dcapProtocolInfo.getSocketAddress().getAddress()); + InetSocketAddress socketAddress = new InetSocketAddress(localAddress, + listen.getPort()); + + byte[] challenge = UUID.randomUUID().toString().getBytes(); + PoolPassiveIoFileMessage msg = new PoolPassiveIoFileMessage<>("pool", + socketAddress, challenge); + msg.setId(dcapProtocolInfo.getSessionId()); + _log.info("waiting for client to connect ({}:{})", localAddress, + listen.getPort()); + + CellPath cellpath = dcapProtocolInfo.door(); + _cell.sendMessage (new CellMessage(cellpath, msg)); + DCapProrocolChallenge dcapChallenge = new DCapProrocolChallenge(_sessionId, challenge); + socketChannel = listen.getSocket(dcapChallenge); + } Socket socket = socketChannel.socket(); socket.setKeepAlive(true); diff --git a/modules/dcache/src/main/java/org/dcache/net/DummyProtocolConnectionPool.java b/modules/dcache/src/main/java/org/dcache/net/DummyProtocolConnectionPool.java deleted file mode 100644 index 8c9d1f800db..00000000000 --- a/modules/dcache/src/main/java/org/dcache/net/DummyProtocolConnectionPool.java +++ /dev/null @@ -1,81 +0,0 @@ -package org.dcache.net; - -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; -import java.nio.channels.SocketChannel; - - -public class DummyProtocolConnectionPool implements ChallengeReader { - - - @Override - public Object getChallenge(SocketChannel socketChannel) { - - Object challenge = null; - - try { - Socket socket = socketChannel.socket(); - - InputStream is = socket.getInputStream(); - byte[] buf = new byte[64]; - int b; - int i = 0; - while(i _acceptedSockets = new HashMap<>(); private final ChallengeReader _challengeReader; - private boolean _stop; + private final int _receiveBufferSize; + private final PortRange _portRange; + + private int _port; + private ServerSocketChannel _serverChannel; + private long _activity; + private Thread _thread; + + /** + * Represent a "promise" to listen for incoming connections until closed. + * This object is not thread-safe. + */ + public class Listen implements Closeable + { + private boolean _released; + + /** + * Indicate that promise is no longer needed. + */ + @Override + public void close() + { + if (!_released) { + release(); + _released = true; + } + } + + /** + * Get TCP port number used by this connection pool. + * @return port number + */ + public int getPort() + { + return ProtocolConnectionPool.this.getPort(); + } + + /** + * Get a {@link SocketChannel} identified by challenge. The + * caller will block until client is connected and challenge exchange is done. + * + * @param challenge the identifier the client is required to present + * @return {@link SocketChannel} connected to client + * @throws InterruptedException if current thread was interrupted + */ + public SocketChannel getSocket(Object challenge) throws InterruptedException + { + checkState(!_released); + + assert _activity > 0; + assert _thread != null; + assert _serverChannel != null; + + synchronized (_acceptedSockets) { + while (!_acceptedSockets.containsKey(challenge)) { + _acceptedSockets.wait(); + } + return _acceptedSockets.remove(challenge); + } + } + } /** * Create a new ProtocolConnectionPool on specified TCP port. If listenPort @@ -29,157 +96,147 @@ public class ProtocolConnectionPool extends Thread { * property is set. The {@link ChallengeReader} is used to associate connections * with clients. * - * @param listenPort - * @param challengeReader - * @throws IOException + * @param port the port on which to listen; 0 use a default range + * @param bufferSize the size of the receive buffer; 0 implies a default value. + * @param reader the ChallengeReader to extract challenges. */ - ProtocolConnectionPool(int listenPort, int receiveBufferSize, - ChallengeReader challengeReader) - throws IOException + ProtocolConnectionPool(int port, int bufferSize, ChallengeReader reader) { - super("ProtocolConnectionPool"); - _challengeReader = challengeReader; - _serverSocketChannel = ServerSocketChannel.open(); - if (receiveBufferSize > 0) { - _serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize); - } + _challengeReader = reader; + _receiveBufferSize = bufferSize; - PortRange portRange; - if (listenPort != 0) { - portRange = new PortRange(listenPort); + if (port != 0) { + _portRange = new PortRange(port); } else { String dcachePorts = System.getProperty("org.dcache.net.tcp.portrange"); if (dcachePorts != null) { - portRange = PortRange.valueOf(dcachePorts); + _portRange = PortRange.valueOf(dcachePorts); } else { - portRange = new PortRange(0); + _portRange = new PortRange(0); + } + } + } + + private ServerSocketChannel open() throws IOException + { + ServerSocketChannel channel = ServerSocketChannel.open(); + ServerSocket socket = channel.socket(); + + if (_receiveBufferSize > 0) { + socket.setReceiveBufferSize(_receiveBufferSize); + } + + if (_port != 0) { + try { + socket.bind(new InetSocketAddress((InetAddress)null, _port)); + } catch (IOException e) { + _logSocketIO.debug("Failed to bind to existing port: {}", e.toString()); } } - portRange.bind(_serverSocketChannel.socket()); - if (_logSocketIO.isDebugEnabled()) { - _logSocketIO.debug("Socket BIND local = " + _serverSocketChannel.socket().getInetAddress() + ":" + _serverSocketChannel.socket().getLocalPort()); + if (!socket.isBound()) { + _port = _portRange.bind(socket); + } + + _logSocketIO.debug("Socket BIND local = {}:{}", socket.getInetAddress(), + _port); + + return channel; + } + + private void close(ServerSocketChannel channel) + { + _logSocketIO.debug("Socket SHUTDOWN local = {}:{}", + channel.socket().getInetAddress(), + channel.socket().getLocalPort()); + try { + channel.close(); + } catch (IOException e) { + _logSocketIO.warn("Failed to close socket: {}", e.toString()); } + } + private synchronized int getPort() + { + return _port; } /** - * Get a {@link SocketChannel} identified by chllenge. The - * caller will block until client is connected and challenge exchange is done. - * - * @param challenge - * @return {@link SocketChannel} connected to client - * @throws InterruptedException if current thread was interrupted + * Acquire a "promise" to accept an incoming connection. This may trigger + * opening a TCP port for incoming connections and starting a thread that + * will handle incoming connections. */ - public SocketChannel getSocket(Object challenge) throws InterruptedException { + public synchronized Listen acquire() throws IOException + { + if (_serverChannel == null) { + _serverChannel = open(); + } - synchronized (_acceptedSockets) { + if (_thread == null) { + _thread = new Thread(this, "ProtocolConnectionPool"); + _thread.start(); + } + _activity++; + return new Listen(); + } + + private synchronized void release() + { + if (_activity == 1) { + if (_thread != null) { + _thread.interrupt(); + _thread = null; + } - while (_acceptedSockets.isEmpty() || !_acceptedSockets.containsKey(challenge)) { - _acceptedSockets.wait(); + if (_serverChannel != null) { + close(_serverChannel); + _serverChannel = null; } - return _acceptedSockets.remove(challenge); } - } - /** - * Get TCP port number used by this connection pool. - * @return port number - */ - public int getLocalPort() { - return _serverSocketChannel.socket().getLocalPort(); + if (_activity > 0) { + _activity--; + } } @Override public void run() { + ServerSocketChannel serverChannel; + synchronized (this) { + serverChannel = _serverChannel; + } try { - while (!_stop) { - SocketChannel newSocketChannel = _serverSocketChannel.accept(); - if (_logSocketIO.isDebugEnabled()) { - _logSocketIO.debug("Socket OPEN (ACCEPT) remote = " + newSocketChannel.socket().getInetAddress() + ":" + newSocketChannel.socket().getPort() + - " local = " + newSocketChannel.socket().getLocalAddress() + ":" + newSocketChannel.socket().getLocalPort()); - } - Object challenge = _challengeReader.getChallenge(newSocketChannel); + while (true) { + SocketChannel channel = serverChannel.accept(); + _logSocketIO.debug("Socket OPEN (ACCEPT) remote = {}:{} local = {}:{}", + channel.socket().getInetAddress(), channel.socket().getPort(), + channel.socket().getLocalAddress(), channel.socket().getLocalPort()); + Object challenge = _challengeReader.getChallenge(channel); if (challenge == null) { // Unable to read challenge....skip connection - if (_logSocketIO.isDebugEnabled()) { - _logSocketIO.debug("Socket CLOSE (no challenge) remote = " + newSocketChannel.socket().getInetAddress() + ":" + newSocketChannel.socket().getPort() + - " local = " + newSocketChannel.socket().getLocalAddress() + ":" + newSocketChannel.socket().getLocalPort()); + _logSocketIO.debug("Socket CLOSE (no challenge) remote = {}:{} local = {}:{}", + channel.socket().getInetAddress(), channel.socket().getPort(), + channel.socket().getLocalAddress(), channel.socket().getLocalPort()); + try { + channel.close(); + } catch (IOException e) { + _logSocketIO.info("Failed to close client socket: {}", + channel.socket()); + } + } else { + synchronized (_acceptedSockets) { + _acceptedSockets.put(challenge, channel); + _acceptedSockets.notifyAll(); } - newSocketChannel.close(); - continue; - } - - synchronized (_acceptedSockets) { - _acceptedSockets.put(challenge, newSocketChannel); - _acceptedSockets.notifyAll(); - Thread.yield(); } - } - } catch (ClosedByInterruptException e) { - // Shutdown while waiting for client to connect. + } catch (AsynchronousCloseException e) { + // Ignore thread stopped by interrupting or by closing the channel } catch (IOException e) { _logSocketIO.error("Accept loop", e); - try { - _logSocketIO.debug("Socket SHUTDOWN local = {}:{}", - _serverSocketChannel.socket().getInetAddress(), - _serverSocketChannel.socket().getLocalPort()); - _serverSocketChannel.close(); - } catch (IOException ignored) { - } } } -} -/* - * $Log: not supported by cvs2svn $ - * Revision 1.4 2007/05/24 13:51:12 tigran - * merge of 1.7.1 and the head - * - * Revision 1.1.2.3.2.2 2007/03/01 14:02:40 tigran - * fixed local port in debug message - * - * Revision 1.1.2.3.2.1 2007/02/16 22:20:35 tigran - * paranoid network traceing: - * - * all binds, accepts, connects and closees in dcap and FTP (nio code only) - * used logging category: - * - * private static Logger _logSocketIO = LoggerFactory.getLogger("logger.dev.org.dcache.io.socket"); - * - * TODO: find and trace others as well - * all calls surrounded with - * if ( _logSocketIO.isDebugEnabled ){ - * .... - * } - * so no performance penalty if debug switched off - * log example (passive dcap): - * - * 16 Feb 2007 22:51:33 logger.dev.org.dcache.io.socket org.dcache.net.ProtocolConnectionPool.(ProtocolConnectionPool.java:66) Socket BIND local = /0.0.0.0:33115 - * 16 Feb 2007 22:51:33 logger.dev.org.dcache.io.socket org.dcache.net.ProtocolConnectionPool.run(ProtocolConnectionPool.java:118) Socket OPEN (ACCEPT) remote = /127.0.0.2:11930 local = /127.0.0.2:11930 - * 16 Feb 2007 22:51:33 logger.dev.org.dcache.io.socket diskCacheV111.movers.DCapProtocol_3_nio.runIO(DCapProtocol_3_nio.java:766) Socket CLOSE remote = /127.0.0.2:11930 local = /127.0.0.2:11930 - * 16 Feb 2007 22:51:57 logger.dev.org.dcache.io.socket org.dcache.net.ProtocolConnectionPool.run(ProtocolConnectionPool.java:118) Socket OPEN (ACCEPT) remote = /127.0.0.2:11934 local = /127.0.0.2:11934 - * 16 Feb 2007 22:51:57 logger.dev.org.dcache.io.socket diskCacheV111.movers.DCapProtocol_3_nio.runIO(DCapProtocol_3_nio.java:766) Socket CLOSE remote = /127.0.0.2:11934 local = /127.0.0.2:11934 - * 16 Feb 2007 22:52:06 logger.dev.org.dcache.io.socket org.dcache.net.ProtocolConnectionPool.run(ProtocolConnectionPool.java:144) Socket SHUTDOWN local = /0.0.0.0:33115 - * - * Revision 1.1.2.3 2006/10/04 09:57:06 tigran - * fixed first/last port range index - * - * Revision 1.1.2.2 2006/08/22 13:43:34 tigran - * added port range for passive DCAP - * rmoved System.out - * - * Revision 1.2 2006/07/21 12:07:53 tigran - * added port range support for multple pools on one host - * to enable port range follofing java properies have to be defined: - * - * i) org.dcache.dcap.port=0 - * ii) org.dcache.net.tcp.portrange=: - * - * Revision 1.1 2006/07/18 09:06:04 tigran - * added protocol connection pool - * - */ +} \ No newline at end of file diff --git a/modules/dcache/src/main/java/org/dcache/net/ProtocolConnectionPoolFactory.java b/modules/dcache/src/main/java/org/dcache/net/ProtocolConnectionPoolFactory.java index 7ee7f735435..c5a781a036e 100644 --- a/modules/dcache/src/main/java/org/dcache/net/ProtocolConnectionPoolFactory.java +++ b/modules/dcache/src/main/java/org/dcache/net/ProtocolConnectionPoolFactory.java @@ -5,6 +5,8 @@ import java.io.IOException; +import org.dcache.net.ProtocolConnectionPool.Listen; + public class ProtocolConnectionPoolFactory { @@ -19,21 +21,13 @@ public ProtocolConnectionPoolFactory(int port, ChallengeReader challengeReader) _challengeReader = challengeReader; } - public ProtocolConnectionPool getConnectionPool(int receiveBufferSize) - throws IOException + public Listen acquireListen(int receiveBufferSize) throws IOException { - synchronized(_initLock){ - if( _protocolConnectionPool == null ) { - _protocolConnectionPool = new ProtocolConnectionPool(_port, receiveBufferSize, _challengeReader); - _protocolConnectionPool.start(); - } - } - - return _protocolConnectionPool; + synchronized (_initLock) { + if( _protocolConnectionPool == null ) { + _protocolConnectionPool = new ProtocolConnectionPool(_port, receiveBufferSize, _challengeReader); + } + } + return _protocolConnectionPool.acquire(); } - - } -/* - * $Log: not supported by cvs2svn $ - */