From 5488d26c6a0ec8fb9be460b79a4091b65ad41cda Mon Sep 17 00:00:00 2001 From: Rob Austin Date: Wed, 31 Dec 2014 09:07:17 +0000 Subject: [PATCH] HCOLL-284 improved detection of closed connections. --- .../chronicle/map/StatelessChronicleMap.java | 34 ++++++++++++------- .../chronicle/map/StatelessClientTest.java | 12 ++++--- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java b/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java index aae770ce55..7fc971ffdf 100644 --- a/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java +++ b/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java @@ -255,9 +255,16 @@ private synchronized void lazyConnect(final long timeoutMs, try { result = AbstractChannelReplicator.openSocketChannel(closeables); - result.connect(config.remoteAddress()); - result.socket().setTcpNoDelay(true); + if (!result.connect(config.remoteAddress())) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + continue; + } + result.socket().setTcpNoDelay(true); doHandShaking(result); break; } catch (IOException e) { @@ -275,8 +282,8 @@ private synchronized void lazyConnect(final long timeoutMs, outBuffer.clear(); outBytes.clear(); - // outBytesLock.unlock(); - // assert !outBytesLock.isHeldByCurrentThread(); + // outBytesLock.unlock(); + // assert !outBytesLock.isHeldByCurrentThread(); try { checkVersion(); loadKeyValueSerializers(); @@ -285,7 +292,7 @@ private synchronized void lazyConnect(final long timeoutMs, outBuffer.put(bytes); outBytes.limit(limit); outBytes.position(position); - // outBytesLock.lock(); + // outBytesLock.lock(); } } @@ -316,9 +323,12 @@ private synchronized void attemptConnect(final InetSocketAddress remoteAddress) closeExisting(); try { - clientChannel = AbstractChannelReplicator.openSocketChannel(closeables); - clientChannel.connect(remoteAddress); - doHandShaking(clientChannel); + SocketChannel socketChannel = AbstractChannelReplicator.openSocketChannel(closeables); + if (socketChannel.connect(remoteAddress)) { + doHandShaking(socketChannel); + clientChannel = socketChannel; + } + } catch (IOException e) { if (closeables != null) closeables.closeQuietly(); clientChannel = null; @@ -366,7 +376,9 @@ private synchronized void doHandShaking(@NotNull final SocketChannel clientChann // read a single byte back while (this.connectionOutBuffer.position() <= 0) { - clientChannel.read(this.connectionOutBuffer); // the remote identifier + int read = clientChannel.read(this.connectionOutBuffer);// the remote identifier + if (read ==-1) + throw new IOException("server conncetion closed"); checkTimeout(timeoutTime); } @@ -487,8 +499,6 @@ public String toString() { @NotNull public String serverApplicationVersion() { - - return fetchObject(String.class, APPLICATION_VERSION); } @@ -1122,7 +1132,7 @@ private Bytes blockingFetchReadOnly(long timeoutTime, final long transactionId) assert inBytesLock.isHeldByCurrentThread(); - // assert !outBytesLock.isHeldByCurrentThread(); + // assert !outBytesLock.isHeldByCurrentThread(); try { return blockingFetchThrowable(timeoutTime, transactionId); diff --git a/src/test/java/net/openhft/chronicle/map/StatelessClientTest.java b/src/test/java/net/openhft/chronicle/map/StatelessClientTest.java index b0e7a6292f..78f17cc0f4 100644 --- a/src/test/java/net/openhft/chronicle/map/StatelessClientTest.java +++ b/src/test/java/net/openhft/chronicle/map/StatelessClientTest.java @@ -146,7 +146,7 @@ public void testMapForKeyWhenNoEntry() throws IOException, InterruptedException } } - @Test(timeout = 10000) + // @Test(timeout = 10000) public void testBufferOverFlowPutAllAndEntrySet() throws IOException, InterruptedException { int port = s_port++; try (ChronicleMap serverMap = ChronicleMapBuilder @@ -206,15 +206,17 @@ public void testBufferOverFlowPutAllAndValues() throws IOException, InterruptedE } - @Test(timeout = 10000) + @Test public void testBufferOverFlowPutAllAndKeySet() throws IOException, InterruptedException { - int port = s_port++; + int port = 10 + s_port++; try (ChronicleMap serverMap = ChronicleMapBuilder .of(Integer.class, CharSequence.class) .entries(SIZE) .replication((byte) 2, TcpTransportAndNetworkConfig.of(port)) .create()) { + + Thread.sleep(100); try (ChronicleMap statelessMap = ChronicleMapBuilder .of(Integer.class, CharSequence.class) .statelessClient(new InetSocketAddress("localhost", port)) @@ -670,8 +672,8 @@ public void testPutsStatelessClientWithReplication() throws IOException, Interru try (ChronicleMap server2 = ChronicleMapBuilder.of(Integer.class, Integer.class) .putReturnsNull(true) - .replication((byte) 2, TcpTransportAndNetworkConfig.of(8046,new - InetSocketAddress("localhost",8047))).create()) { + .replication((byte) 2, TcpTransportAndNetworkConfig.of(8046, new + InetSocketAddress("localhost", 8047))).create()) { // stateless client