diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/ClientConnectionManagerImpl.java b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/ClientConnectionManagerImpl.java index d016989a72241..2f3ac2979f140 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/ClientConnectionManagerImpl.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/ClientConnectionManagerImpl.java @@ -19,6 +19,7 @@ import com.hazelcast.client.AuthenticationException; import com.hazelcast.client.ClientExtension; import com.hazelcast.client.ClientTypes; +import com.hazelcast.client.HazelcastClientNotActiveException; import com.hazelcast.client.config.ClientConfig; import com.hazelcast.client.config.ClientNetworkConfig; import com.hazelcast.client.config.ClientProperties; @@ -63,6 +64,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -190,6 +192,12 @@ public synchronized void shutdown() { connection.close(); } shutdownSelectors(); + Iterator iterator = connectionsInProgress.values().iterator(); + while (iterator.hasNext()) { + AuthenticationFuture future = iterator.next(); + future.onFailure(new HazelcastClientNotActiveException("Client is shutting down")); + iterator.remove(); + } connectionListeners.clear(); heartbeatListeners.clear(); } @@ -288,6 +296,10 @@ private Connection getConnection(Address target, boolean asOwner) { } private AuthenticationFuture triggerConnect(Address target, boolean asOwner) { + if (!alive) { + throw new HazelcastException("ConnectionManager is not active!!!"); + } + AuthenticationFuture callback = new AuthenticationFuture(); AuthenticationFuture firstCallback = connectionsInProgress.putIfAbsent(target, callback); if (firstCallback == null) { @@ -537,7 +549,10 @@ private void authenticated(Address target, ClientConnection connection) { fireConnectionAddedEvent(connection); } assert oldConnection == null || connection.equals(oldConnection); - connectionsInProgress.remove(target); + if (connectionsInProgress.remove(target) == null) { + destroyConnection(connection, new HazelcastClientNotActiveException("Authentication is cancelled")); + } + } private void failed(Address target, ClientConnection connection, Throwable throwable) { diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/impl/HazelcastClientInstanceImpl.java b/hazelcast-client/src/main/java/com/hazelcast/client/impl/HazelcastClientInstanceImpl.java index 93abaa1b425f9..c9fdfecdebf55 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/impl/HazelcastClientInstanceImpl.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/impl/HazelcastClientInstanceImpl.java @@ -582,11 +582,11 @@ public void shutdown() { public void doShutdown() { proxyManager.destroy(); + connectionManager.shutdown(); clusterService.shutdown(); executionService.shutdown(); partitionService.stop(); transactionManager.shutdown(); - connectionManager.shutdown(); invocationService.shutdown(); listenerService.shutdown(); serializationService.destroy(); diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocationServiceSupport.java b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocationServiceSupport.java index fdc25934c1979..4b739a14ef8d1 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocationServiceSupport.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocationServiceSupport.java @@ -168,8 +168,12 @@ public boolean isShutdown() { public void shutdown() { isShutdown = true; responseThread.interrupt(); - CleanResourcesTask cleanResourcesTask = new CleanResourcesTask(); - cleanResourcesTask.run(); + Iterator iterator = callIdMap.values().iterator(); + while (iterator.hasNext()) { + ClientInvocation invocation = iterator.next(); + iterator.remove(); + invocation.notifyException(new HazelcastClientNotActiveException("Client is shutting down")); + } assert callIdMap.isEmpty(); } @@ -191,8 +195,7 @@ public void run() { continue; } - int pendingPacketCount = connection.getPendingPacketCount(); - if (pendingPacketCount != 0) { + if (connection.getPendingPacketCount() != 0) { long closedTime = connection.getClosedTime(); long elapsed = System.currentTimeMillis() - closedTime; if (elapsed < WAIT_TIME_FOR_PACKETS_TO_BE_CONSUMED_THRESHOLD) { diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClusterListenerSupport.java b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClusterListenerSupport.java index e6b543a67306b..fcf1f60b448d0 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClusterListenerSupport.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClusterListenerSupport.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import static com.hazelcast.client.config.ClientProperty.SHUFFLE_MEMBER_LIST; @@ -52,6 +53,7 @@ public abstract class ClusterListenerSupport implements ConnectionListener, ConnectionHeartbeatListener, ClientClusterService { private static final ILogger LOGGER = Logger.getLogger(ClusterListenerSupport.class); + private static final long TERMINATE_TIMEOUT_SECONDS = 30; protected final HazelcastClientInstanceImpl client; private final Collection addressProviders; @@ -91,6 +93,15 @@ public Address getOwnerConnectionAddress() { public void shutdown() { clusterExecutor.shutdown(); + try { + boolean success = clusterExecutor.awaitTermination(TERMINATE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!success) { + LOGGER.warning("cluster executor awaitTermination could not completed in " + + TERMINATE_TIMEOUT_SECONDS + " seconds"); + } + } catch (InterruptedException e) { + LOGGER.warning("cluster executor await termination is interrupted", e); + } } protected void connectToCluster() throws Exception { @@ -172,6 +183,12 @@ private void connectToOne() throws Exception { private boolean connect(Set triedAddresses) throws Exception { final Collection socketAddresses = getSocketAddresses(); for (InetSocketAddress inetSocketAddress : socketAddresses) { + if (!client.getLifecycleService().isRunning()) { + if (LOGGER.isFinestEnabled()) { + LOGGER.finest("Giving up on retrying to connect to cluster since client is shutdown"); + } + break; + } try { triedAddresses.add(inetSocketAddress); Address address = new Address(inetSocketAddress); diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/spi/impl/ClientInvocationTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/spi/impl/ClientInvocationTest.java index 703ec61dd4839..53ec2108ce687 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/spi/impl/ClientInvocationTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/spi/impl/ClientInvocationTest.java @@ -138,7 +138,7 @@ public void onResponse(Object response) { @Override public void onFailure(Throwable t) { - if (t instanceof HazelcastClientNotActiveException) { + if (t.getCause() instanceof HazelcastClientNotActiveException) { shutdownLatch.countDown(); } errorLatch.countDown();