Skip to content

Commit

Permalink
Revising authentication timeout logic to prevent slow shutdown
Browse files Browse the repository at this point in the history
For authentication timeout, there was a scheduled task introduced
recently:
Pr
Line
127eb14#diff-6b832face5ab9f5e23687eaeb478627eR631

Client needs to wait all scheduled tasks to finish before shutdown.
Authentication timeout was scheduled for 5 seconds(default connection timeout)

Hard timeout is introduced in client invocation to solve same
problem. CleanResources task is now responsible to check that timeout.
This new timeout mechanism is only used by authentication for now.
  • Loading branch information
sancar committed Feb 16, 2018
1 parent 0e55122 commit 68a4dfe
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 19 deletions.
Expand Up @@ -112,7 +112,7 @@ public class ClientConnectionManagerImpl implements ClientConnectionManager, Con


private final ILogger logger;
private final int connectionTimeout;
private final int connectionTimeoutMillis;

private final HazelcastClientInstanceImpl client;
private final SocketInterceptor socketInterceptor;
Expand Down Expand Up @@ -157,7 +157,7 @@ public ClientConnectionManagerImpl(HazelcastClientInstanceImpl client, AddressTr
ClientNetworkConfig networkConfig = client.getClientConfig().getNetworkConfig();

final int connTimeout = networkConfig.getConnectionTimeout();
this.connectionTimeout = connTimeout == 0 ? Integer.MAX_VALUE : connTimeout;
this.connectionTimeoutMillis = connTimeout == 0 ? Integer.MAX_VALUE : connTimeout;

this.executionService = (ClientExecutionServiceImpl) client.getClientExecutionService();
this.socketOptions = networkConfig.getSocketOptions();
Expand Down Expand Up @@ -557,7 +557,7 @@ protected ClientConnection createSocketConnection(final Address address) throws
socket.setReceiveBufferSize(bufferSize);
InetSocketAddress inetSocketAddress = address.getInetSocketAddress();
bindSocketToPort(socket);
socketChannel.socket().connect(inetSocketAddress, connectionTimeout);
socketChannel.socket().connect(inetSocketAddress, connectionTimeoutMillis);

HazelcastProperties properties = client.getProperties();
boolean directBuffer = properties.getBoolean(SOCKET_CLIENT_BUFFER_DIRECT);
Expand Down Expand Up @@ -627,8 +627,8 @@ private void authenticate(final Address target, final ClientConnection connectio
final ClientPrincipal principal = getPrincipal();
ClientMessage clientMessage = encodeAuthenticationRequest(asOwner, client.getSerializationService(), principal);
ClientInvocation clientInvocation = new ClientInvocation(client, clientMessage, null, connection);
clientInvocation.setHardTimeoutMillis(connectionTimeoutMillis);
ClientInvocationFuture invocationFuture = clientInvocation.invokeUrgent();
executionService.schedule(new TimeoutAuthenticationTask(invocationFuture), connectionTimeout, TimeUnit.MILLISECONDS);
invocationFuture.andThen(new AuthCallback(connection, asOwner, target, future));
}

Expand Down Expand Up @@ -720,7 +720,7 @@ public void run() {
return;
}
future.complete(new TimeoutException("Authentication response did not come back in "
+ connectionTimeout + " millis"));
+ connectionTimeoutMillis + " millis"));
}

}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.hazelcast.client.spi.ClientPartitionService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.listener.AbstractClientListenerService;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.util.concurrent.MPSCQueue;
Expand All @@ -47,6 +48,8 @@
import static com.hazelcast.client.spi.properties.ClientProperty.INVOCATION_TIMEOUT_SECONDS;
import static com.hazelcast.instance.OutOfMemoryErrorDispatcher.onOutOfMemory;
import static com.hazelcast.spi.impl.operationservice.impl.AsyncInboundResponseHandler.getIdleStrategy;
import static com.hazelcast.util.Clock.currentTimeMillis;
import static com.hazelcast.util.StringUtil.timeToString;

public abstract class AbstractClientInvocationService implements ClientInvocationService {

Expand Down Expand Up @@ -198,34 +201,56 @@ public void run() {
Map.Entry<Long, ClientInvocation> entry = iter.next();
ClientInvocation invocation = entry.getValue();
ClientConnection connection = invocation.getSendConnection();
if (connection == null) {
continue;
if (checkConnectionDead(connection)) {
iter.remove();
invocation.notifyException(newTargetDisconnectedException(connection));
} else if (checkHardTimeout(invocation)) {
iter.remove();
invocation.notifyException(newOperationTimeoutException(invocation));
}

if (connection.isHeartBeating()) {
continue;
}

iter.remove();
}
}

notifyException(invocation, connection);
private boolean checkHardTimeout(ClientInvocation invocation) {
long hardTimeoutMillis = invocation.getHardTimeoutMillis();
if (hardTimeoutMillis == 0) {
return false;
}
long startTimeMillis = invocation.startTimeMillis();
long currentTimeMillis = System.currentTimeMillis();
long timePassed = currentTimeMillis - startTimeMillis;
return timePassed > hardTimeoutMillis;
}

private Exception newOperationTimeoutException(ClientInvocation invocation) {
StringBuilder sb = new StringBuilder();
sb.append(invocation);
sb.append(" timed out because a hard timeout passed ");
sb.append(invocation.getHardTimeoutMillis()).append(" ms. ");
sb.append("Current time: ").append(timeToString(currentTimeMillis())).append(". ");
sb.append("Start time: ").append(timeToString(invocation.startTimeMillis())).append(". ");
sb.append("Total elapsed time: ").append(currentTimeMillis() - invocation.startTimeMillis()).append(" ms. ");
String msg = sb.toString();
return new OperationTimeoutException(msg);
}

private boolean checkConnectionDead(ClientConnection connection) {
return connection != null && !connection.isHeartBeating();
}

private void notifyException(ClientInvocation invocation, ClientConnection connection) {
Exception ex;
/**
private Exception newTargetDisconnectedException(ClientConnection connection) {
/*
* Connection may be closed(e.g. remote member shutdown) in which case the isAlive is set to false or the
* heartbeat failure occurs. The order of the following check matters. We need to first check for isAlive since
* the connection.isHeartBeating also checks for isAlive as well.
*/
if (!connection.isAlive()) {
ex = new TargetDisconnectedException(connection.getCloseReason(), connection.getCloseCause());
return new TargetDisconnectedException(connection.getCloseReason(), connection.getCloseCause());
} else {
ex = new TargetDisconnectedException("Heartbeat timed out to " + connection);
return new TargetDisconnectedException("Heartbeat timed out to " + connection);
}

invocation.notifyException(ex);
}

}
Expand Down
Expand Up @@ -72,6 +72,7 @@ public class ClientInvocation implements Runnable {
private final long startTimeMillis;
private final long retryPauseMillis;
private final String objectName;
private long hardTimeoutMillis;
private volatile ClientConnection sendConnection;
private boolean bypassHeartbeatCheck;
private EventHandler handler;
Expand Down Expand Up @@ -287,6 +288,14 @@ public ClientConnection getSendConnection() {
return sendConnection;
}

public long getHardTimeoutMillis() {
return hardTimeoutMillis;
}

public void setHardTimeoutMillis(long hardTimeoutMillis) {
this.hardTimeoutMillis = hardTimeoutMillis;
}

public static boolean isRetrySafeException(Throwable t) {
return t instanceof IOException
|| t instanceof HazelcastInstanceNotActiveException
Expand Down Expand Up @@ -328,4 +337,8 @@ public String toString() {
+ ", target = " + target
+ ", sendConnection = " + sendConnection + '}';
}

public long startTimeMillis() {
return this.startTimeMillis;
}
}

0 comments on commit 68a4dfe

Please sign in to comment.