Skip to content

Commit

Permalink
Fixes client shutdown problem
Browse files Browse the repository at this point in the history
Main issue we are trying to fix is to make sure there are
no invocations left after client has shutdown.

The assert(in ClientInvocationServiceSupport) that checks
all invocations are cleared was failing.

One of the reason that an invocation is left there was
clusterService thread. In ClusterService shutdown we will
wait for executor to shutdown completely to make sure, there
is no invocation left triggered by cluster thread.

ConnectionManager close will be done before clusterService
close to make sure, cluster thread can not trigger a new
authentication while trying to shutting down.

It turns out that CleanResources Task logic is indeed not suitable
for using when client is shutting down. Because it was postponing
clearing some invocation for 1 seconds if connection is closed
in last 5 seconds. Much simpler version of clearing invocations
is implemented in place of it.
  • Loading branch information
sancar committed Mar 28, 2016
1 parent e678088 commit b0ece18
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 7 deletions.
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.client.AuthenticationException;
import com.hazelcast.client.ClientExtension;
import com.hazelcast.client.ClientTypes;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.client.config.ClientProperties;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -190,6 +192,12 @@ public synchronized void shutdown() {
connection.close();
}
shutdownSelectors();
Iterator<AuthenticationFuture> iterator = connectionsInProgress.values().iterator();
while (iterator.hasNext()) {
AuthenticationFuture future = iterator.next();
future.onFailure(new HazelcastClientNotActiveException("Client is shutting down"));
iterator.remove();
}
connectionListeners.clear();
heartbeatListeners.clear();
}
Expand Down Expand Up @@ -288,6 +296,10 @@ private Connection getConnection(Address target, boolean asOwner) {
}

private AuthenticationFuture triggerConnect(Address target, boolean asOwner) {
if (!alive) {
throw new HazelcastException("ConnectionManager is not active!!!");
}

AuthenticationFuture callback = new AuthenticationFuture();
AuthenticationFuture firstCallback = connectionsInProgress.putIfAbsent(target, callback);
if (firstCallback == null) {
Expand Down Expand Up @@ -537,7 +549,10 @@ private void authenticated(Address target, ClientConnection connection) {
fireConnectionAddedEvent(connection);
}
assert oldConnection == null || connection.equals(oldConnection);
connectionsInProgress.remove(target);
if (connectionsInProgress.remove(target) == null) {
destroyConnection(connection, new HazelcastClientNotActiveException("Authentication is cancelled"));
}

}

private void failed(Address target, ClientConnection connection, Throwable throwable) {
Expand Down
Expand Up @@ -582,11 +582,11 @@ public void shutdown() {

public void doShutdown() {
proxyManager.destroy();
connectionManager.shutdown();
clusterService.shutdown();
executionService.shutdown();
partitionService.stop();
transactionManager.shutdown();
connectionManager.shutdown();
invocationService.shutdown();
listenerService.shutdown();
serializationService.destroy();
Expand Down
Expand Up @@ -168,8 +168,12 @@ public boolean isShutdown() {
public void shutdown() {
isShutdown = true;
responseThread.interrupt();
CleanResourcesTask cleanResourcesTask = new CleanResourcesTask();
cleanResourcesTask.run();
Iterator<ClientInvocation> iterator = callIdMap.values().iterator();
while (iterator.hasNext()) {
ClientInvocation invocation = iterator.next();
iterator.remove();
invocation.notifyException(new HazelcastClientNotActiveException("Client is shutting down"));
}
assert callIdMap.isEmpty();
}

Expand All @@ -191,8 +195,7 @@ public void run() {
continue;
}

int pendingPacketCount = connection.getPendingPacketCount();
if (pendingPacketCount != 0) {
if (connection.getPendingPacketCount() != 0) {
long closedTime = connection.getClosedTime();
long elapsed = System.currentTimeMillis() - closedTime;
if (elapsed < WAIT_TIME_FOR_PACKETS_TO_BE_CONSUMED_THRESHOLD) {
Expand Down
Expand Up @@ -45,13 +45,15 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import static com.hazelcast.client.config.ClientProperty.SHUFFLE_MEMBER_LIST;

public abstract class ClusterListenerSupport implements ConnectionListener, ConnectionHeartbeatListener, ClientClusterService {

private static final ILogger LOGGER = Logger.getLogger(ClusterListenerSupport.class);
private static final long TERMINATE_TIMEOUT_SECONDS = 30;

protected final HazelcastClientInstanceImpl client;
private final Collection<AddressProvider> addressProviders;
Expand Down Expand Up @@ -91,6 +93,15 @@ public Address getOwnerConnectionAddress() {

public void shutdown() {
clusterExecutor.shutdown();
try {
boolean success = clusterExecutor.awaitTermination(TERMINATE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!success) {
LOGGER.warning("cluster executor awaitTermination could not completed in "
+ TERMINATE_TIMEOUT_SECONDS + " seconds");
}
} catch (InterruptedException e) {
LOGGER.warning("cluster executor await termination is interrupted", e);
}
}

protected void connectToCluster() throws Exception {
Expand Down Expand Up @@ -172,6 +183,12 @@ private void connectToOne() throws Exception {
private boolean connect(Set<InetSocketAddress> triedAddresses) throws Exception {
final Collection<InetSocketAddress> socketAddresses = getSocketAddresses();
for (InetSocketAddress inetSocketAddress : socketAddresses) {
if (!client.getLifecycleService().isRunning()) {
if (LOGGER.isFinestEnabled()) {
LOGGER.finest("Giving up on retrying to connect to cluster since client is shutdown");
}
break;
}
try {
triedAddresses.add(inetSocketAddress);
Address address = new Address(inetSocketAddress);
Expand Down
Expand Up @@ -138,7 +138,7 @@ public void onResponse(Object response) {

@Override
public void onFailure(Throwable t) {
if (t instanceof HazelcastClientNotActiveException) {
if (t.getCause() instanceof HazelcastClientNotActiveException) {
shutdownLatch.countDown();
}
errorLatch.countDown();
Expand Down

0 comments on commit b0ece18

Please sign in to comment.