Skip to content

Commit

Permalink
Hazelcast listeners are lost during cluster disconnects (hazelcast#8848)
Browse files Browse the repository at this point in the history
* Fixes the client listener registration if the client owner connection is lost and reconnected (e.g. due to heartbeat or network problems).

Authentication response returns a list of members at which the client is unregistered so that the listener service acts upon this list. The smart listener service re-registers the listeners to these unregistered members. The non-smart listener service re-registers the listeners only if the reconnected member is not the same as the previous member or the previous member client resources were cleaned up.

The ClientDisconnectionOperation also acts smart so that it only cleans client resources if the dead member uuid is the current owner member uuid of the client and there is no alive connection endpoint to the client.

* Forward port of fixes at hazelcast#9183
  • Loading branch information
ihsandemir authored and emrahkocaman committed Nov 25, 2016
1 parent e2b8b3e commit 40ddd79
Show file tree
Hide file tree
Showing 55 changed files with 3,704 additions and 3,010 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.client.connection.ClientConnectionManager;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.core.LifecycleService;
import com.hazelcast.core.Member;
import com.hazelcast.instance.BuildInfo;
import com.hazelcast.internal.metrics.DiscardableMetricsProvider;
import com.hazelcast.internal.metrics.MetricsRegistry;
Expand All @@ -43,6 +44,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -82,6 +84,13 @@ public class ClientConnection implements SocketConnection, DiscardableMetricsPro
private int connectedServerVersion = BuildInfo.UNKNOWN_HAZELCAST_VERSION;
private String connectedServerVersionString;

/**
* The list of members from which the client resources are deleted at the time of connection authentication. The client may
* use this list to re-register the client resources such the listeners to these members.
*/
private List<Member> clientUnregisteredMembers;


public ClientConnection(HazelcastClientInstanceImpl client,
IOThreadingModel ioThreadingModel,
int connectionId,
Expand Down Expand Up @@ -388,4 +397,12 @@ public int getConnectedServerVersion() {
public String getConnectedServerVersionString() {
return connectedServerVersionString;
}

public List<Member> getClientUnregisteredMembers() {
return clientUnregisteredMembers;
}

public void setClientUnregisteredMembers(List<Member> clientUnregisteredMembers) {
this.clientUnregisteredMembers = clientUnregisteredMembers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ public void run() {
}
final long now = Clock.currentTimeMillis();
for (final ClientConnection connection : connections.values()) {
if (!connection.isAlive()) {
continue;
}

if (now - connection.lastReadTimeMillis() > heartbeatTimeout) {
if (connection.isHeartBeating()) {
logger.warning("Heartbeat failed to connection : " + connection);
Expand All @@ -437,18 +441,22 @@ public void run() {
}
if (now - connection.lastReadTimeMillis() > heartbeatInterval) {
ClientMessage request = ClientPingCodec.encodeRequest();
ClientInvocation clientInvocation = new ClientInvocation(client, request, connection);
final ClientInvocation clientInvocation = new ClientInvocation(client, request, connection);
clientInvocation.setBypassHeartbeatCheck(true);
connection.onHeartbeatRequested();
clientInvocation.invokeUrgent().andThen(new ExecutionCallback<ClientMessage>() {
@Override
public void onResponse(ClientMessage response) {
connection.onHeartbeatReceived();
if (connection.isAlive()) {
connection.onHeartbeatReceived();
}
}

@Override
public void onFailure(Throwable t) {
logger.warning("Error receiving heartbeat for connection: " + connection, t);
if (connection.isAlive()) {
logger.warning("Error receiving heartbeat for connection: " + connection, t);
}
}
});
} else {
Expand Down Expand Up @@ -500,17 +508,7 @@ private void authenticate(final Address target, final ClientConnection connectio
ownerUuid = principal.getOwnerUuid();
}

ClientMessage clientMessage;
if (credentials.getClass().equals(UsernamePasswordCredentials.class)) {
UsernamePasswordCredentials cr = (UsernamePasswordCredentials) credentials;
clientMessage = ClientAuthenticationCodec
.encodeRequest(cr.getUsername(), cr.getPassword(), uuid, ownerUuid, asOwner, ClientTypes.JAVA,
serializationVersion, BuildInfoProvider.getBuildInfo().getVersion());
} else {
Data data = ss.toData(credentials);
clientMessage = ClientAuthenticationCustomCodec.encodeRequest(data, uuid, ownerUuid,
asOwner, ClientTypes.JAVA, serializationVersion, BuildInfoProvider.getBuildInfo().getVersion());
}
ClientMessage clientMessage = encodeAuthenticationRequest(asOwner, ss, serializationVersion, uuid, ownerUuid);
ClientInvocation clientInvocation = new ClientInvocation(client, clientMessage, connection);
ClientInvocationFuture future = clientInvocation.invokeUrgent();
future.andThen(new ExecutionCallback<ClientMessage>() {
Expand All @@ -526,6 +524,7 @@ public void onResponse(ClientMessage response) {
clusterService.setPrincipal(new ClientPrincipal(result.uuid, result.ownerUuid));
}
connection.setConnectedServerVersion(result.serverHazelcastVersion);
connection.setClientUnregisteredMembers(result.clientUnregisteredMembers);
authenticated(target, connection);
callback.onSuccess(connection, asOwner);
break;
Expand All @@ -551,6 +550,22 @@ public void onFailure(Throwable t) {
}, executionService.getInternalExecutor());
}

private ClientMessage encodeAuthenticationRequest(boolean asOwner, SerializationService ss, byte serializationVersion,
String uuid, String ownerUuid) {
ClientMessage clientMessage;
if (credentials.getClass().equals(UsernamePasswordCredentials.class)) {
UsernamePasswordCredentials cr = (UsernamePasswordCredentials) credentials;
clientMessage = ClientAuthenticationCodec
.encodeRequest(cr.getUsername(), cr.getPassword(), uuid, ownerUuid, asOwner, ClientTypes.JAVA,
serializationVersion, BuildInfoProvider.getBuildInfo().getVersion());
} else {
Data data = ss.toData(credentials);
clientMessage = ClientAuthenticationCustomCodec.encodeRequest(data, uuid, ownerUuid,
asOwner, ClientTypes.JAVA, serializationVersion, BuildInfoProvider.getBuildInfo().getVersion());
}
return clientMessage;
}

private class InitConnectionTask implements Runnable {

private final Address target;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.client.spi;

import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.spi.impl.ListenerMessageCodec;

/**
Expand All @@ -29,4 +30,10 @@ public interface ClientListenerService {
String registerListener(ListenerMessageCodec listenerMessageCodec, EventHandler handler);

boolean deregisterListener(String registrationId);

/**
* Do the appropriate action based on connection to the cluster
* @param clientConnection The connection that the client is using for cluster connection.
*/
void onClusterConnect(ClientConnection clientConnection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,25 @@ private boolean connect(Set<InetSocketAddress> triedAddresses) throws Exception
}
break;
}
Connection connection = null;
try {
triedAddresses.add(inetSocketAddress);
Address address = new Address(inetSocketAddress);
if (logger.isFinestEnabled()) {
logger.finest("Trying to connect to " + address);
}
Connection connection = connectionManager.getOrConnect(address, true);
connection = connectionManager.getOrConnect(address, true);
ownerConnectionAddress = connection.getEndPoint();
clientMembershipListener.listenMembershipEvents(ownerConnectionAddress);
client.getListenerService().onClusterConnect((ClientConnection) connection);
fireConnectionEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
return true;
} catch (Exception e) {
Level level = e instanceof AuthenticationException ? Level.WARNING : Level.FINEST;
logger.log(level, "Exception during initial connection to " + inetSocketAddress, e);
if (null != connection) {
connection.close("Could not connect to " + inetSocketAddress + " as owner", e);
}
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.hazelcast.client.spi.impl.listener;

import com.hazelcast.client.spi.impl.ListenerMessageCodec;
import com.hazelcast.nio.Address;
import com.hazelcast.core.Member;

import static com.hazelcast.util.Preconditions.isNotNull;

Expand All @@ -26,13 +26,13 @@
*/
public class ClientEventRegistration {

private Address subscriber;
private Member subscriber;
private final String serverRegistrationId;
private final long callId;
private final ListenerMessageCodec codec;

public ClientEventRegistration(String serverRegistrationId,
long callId, Address subscriber, ListenerMessageCodec codec) {
long callId, Member subscriber, ListenerMessageCodec codec) {
isNotNull(serverRegistrationId, "serverRegistrationId");
this.serverRegistrationId = serverRegistrationId;
this.callId = callId;
Expand All @@ -59,7 +59,7 @@ public String getServerRegistrationId() {
*
* @return subscriber
*/
public Address getSubscriber() {
public Member getSubscriber() {
return subscriber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ public void shutdown() {
ClientExecutionServiceImpl.shutdownExecutor("registrationExecutor", registrationExecutor, logger);
}

public StripedExecutor getEventExecutor() {
return eventExecutor;
}

public void start() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,31 @@

package com.hazelcast.client.spi.impl.listener;

import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.spi.ClientClusterService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.spi.impl.ListenerMessageCodec;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.Member;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.UuidUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

public class ClientNonSmartListenerService extends ClientListenerServiceImpl implements ConnectionListener {
public class ClientNonSmartListenerService extends ClientListenerServiceImpl {

private final Map<ClientRegistrationKey, ClientEventRegistration> registrations
= new ConcurrentHashMap<ClientRegistrationKey, ClientEventRegistration>();
Expand Down Expand Up @@ -81,8 +84,9 @@ private ClientEventRegistration invoke(ClientRegistrationKey registrationKey) th
String registrationId = registrationKey.getCodec().decodeAddResponse(future.get());
handler.onListenerRegister();
Address address = future.getInvocation().getSendConnection().getRemoteEndpoint();
Member member = client.getClientClusterService().getMember(address);
return new ClientEventRegistration(registrationId,
request.getCorrelationId(), address, registrationKey.getCodec());
request.getCorrelationId(), member, registrationKey.getCodec());

}

Expand Down Expand Up @@ -119,37 +123,13 @@ public Boolean call() throws Exception {

@Override
public void start() {
client.getConnectionManager().addConnectionListener(this);
List<Member> allClusterMembers = new ArrayList<Member>(client.getCluster().getMembers());
new ReconnectionHandler(allClusterMembers).run();
}

@Override
public void connectionAdded(Connection connection) {
registrationExecutor.submit(new Runnable() {
@Override
public void run() {
for (ClientRegistrationKey registrationKey : registrations.keySet()) {
try {
ClientEventRegistration registration = invoke(registrationKey);
registrations.put(registrationKey, registration);
} catch (Exception e) {
logger.warning("Listener " + registrationKey + " could not be added ");
}
}
}
});

}

@Override
public void connectionRemoved(Connection connection) {
registrationExecutor.submit(new Runnable() {
@Override
public void run() {
for (Map.Entry<ClientRegistrationKey, ClientEventRegistration> entry : registrations.entrySet()) {
removeEventHandler(entry.getValue().getCallId());
}
}
});
public void onClusterConnect(final ClientConnection clientConnection) {
registrationExecutor.submit(new ReconnectionHandler(clientConnection.getClientUnregisteredMembers()));
}

//For Testing
Expand All @@ -175,4 +155,65 @@ public Collection<ClientEventRegistration> call() throws Exception {
throw ExceptionUtil.rethrow(e);
}
}

private class ReconnectionHandler
implements Runnable {
private List<Member> clientUnregisteredMembers;

public ReconnectionHandler(List<Member> clientUnregisteredMembers) {
this.clientUnregisteredMembers = clientUnregisteredMembers;
}

@Override
public void run() {
if (registrations.isEmpty()) {
return;
}

if (checkReconnectionToTheSameMember()) {
return;
}

reRegisterListeners();
}

private void reRegisterListeners() {
for (Map.Entry<ClientRegistrationKey, ClientEventRegistration> existingRegistrationEntry : registrations.entrySet()) {
ClientRegistrationKey key = null;
try {
ClientEventRegistration existingRegistration = existingRegistrationEntry.getValue();
removeEventHandler(existingRegistration.getCallId());
key = existingRegistrationEntry.getKey();
ClientEventRegistration registration = invoke(key);
registrations.put(key, registration);
} catch (Exception e) {
logger.warning("Listener " + key + " could not be added ");
}
}
}

private boolean checkReconnectionToTheSameMember() {
ClientClusterService clientClusterService = client.getClientClusterService();
Address newOwnerAddress = clientClusterService.getOwnerConnectionAddress();
Member newOwnerMember = clientClusterService.getMember(newOwnerAddress);
ClientEventRegistration firstRegistration = registrations.values().iterator().next();
// Since this is non-smart client, all registrations are made against the same member
Member oldMember = firstRegistration.getSubscriber();
String newOwnerMemberUuid = newOwnerMember.getUuid();
if (newOwnerMemberUuid.equals(oldMember.getUuid())) {
// connected to the same member as the owner
boolean ownerCleanedup = false;
for (Member member : clientUnregisteredMembers) {
if (newOwnerMemberUuid.equals(member.getUuid())) {
ownerCleanedup = true;
break;
}
}
if (!ownerCleanedup) {
return true;
}
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public EventHandler getHandler() {
return handler;
}

public String getUserRegistrationId() {
return userRegistrationId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit 40ddd79

Please sign in to comment.