Skip to content

Commit

Permalink
Compatibilit fix. Reverted the ReAuthenticationOperation not to retur…
Browse files Browse the repository at this point in the history
…n any response and changed the smart listener to reregister all the listeners on cluster reconnect.

The Smart listener is not using the cleanedup member list and this list is not beign populated for backward compatibility.
  • Loading branch information
ihsandemir committed Nov 4, 2016
1 parent 3c6149d commit b7e82b0
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,6 @@ private void authenticated(Address target, ClientConnection connection) {
destroyConnection(oldConnection, "Replaced by newer connection", null);
}
}
assert oldConnection == null || connection.equals(oldConnection);
connectionsInProgress.remove(target);
logger.info("Authenticated with server " + connection.getRemoteEndpoint() + ", server version:" + connection
.getConnectedServerVersionString() + " Local address: " + connection.getLocalSocketAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,15 @@
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.instance.BuildInfo;
import com.hazelcast.nio.Address;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.UuidUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -107,35 +104,24 @@ private String register(ClientRegistrationKey registrationKey) {

@Override
public void onClusterConnect(final ClientConnection clientConnection) {
registrationExecutor.submit(new Runnable() {
@Override
public void run() {
Collection<Member> newMemberList = client.getClientClusterService().getMemberList();
if (registrations.isEmpty()) {
try {
registrationExecutor.submit(new Runnable() {
@Override
public void run() {
Collection<Member> newMemberList = client.getClientClusterService().getMemberList();
members.clear();
members.addAll(newMemberList);
return;
}
List<Member> removedMembers = new ArrayList<Member>();
for (Member member : members) {
if (!newMemberList.contains(member)) {
removedMembers.add(member);
}
}
List<Member> newMembers = new ArrayList<Member>();
for (Member member : newMemberList) {
if (!members.contains(member)) {
newMembers.add(member);
}
}

members.clear();
members.addAll(newMemberList);
if (registrations.isEmpty()) {
return;
}

updateRegistrations(clientConnection, removedMembers, newMembers);
ensureConnectionsToAllServers();
}
});
reRegisterAll();
}
}).get();
} catch (Exception e) {
ExceptionUtil.rethrow(e);
}
}

private void invoke(ClientRegistrationKey registrationKey, Member member) throws Exception {
Expand Down Expand Up @@ -165,7 +151,7 @@ public boolean deregisterListener(final String userRegistrationId) {
@Override
public Boolean call() throws Exception {
ClientRegistrationKey key = new ClientRegistrationKey(userRegistrationId);
return deregister(key);
return deregister(key, getMemberUuids());
}
});

Expand All @@ -175,7 +161,7 @@ public Boolean call() throws Exception {
}
}

private Boolean deregister(ClientRegistrationKey key) {
private Boolean deregister(ClientRegistrationKey key, Set<String> memberUuids) {
Map<Member, ClientEventRegistration> registrationMap = registrations.get(key);
if (registrationMap == null) {
return false;
Expand All @@ -184,10 +170,14 @@ private Boolean deregister(ClientRegistrationKey key) {
for (ClientEventRegistration registration : registrationMap.values()) {
Member subscriber = registration.getSubscriber();
try {
ListenerMessageCodec listenerMessageCodec = registration.getCodec();
String serverRegistrationId = registration.getServerRegistrationId();
ClientMessage request = listenerMessageCodec.encodeRemoveRequest(serverRegistrationId);
new ClientInvocation(client, request, subscriber.getAddress()).invoke().get();
// We need to compare uuids since the member may reconnect to the restarted member which restarted at the same
// address and member.equals return true but they are actually different instances.
if (memberUuids.contains(subscriber.getUuid())) {
ListenerMessageCodec listenerMessageCodec = registration.getCodec();
String serverRegistrationId = registration.getServerRegistrationId();
ClientMessage request = listenerMessageCodec.encodeRemoveRequest(serverRegistrationId);
new ClientInvocation(client, request, subscriber.getAddress()).invoke().get();
}
removeEventHandler(registration.getCallId());
registrationMap.remove(subscriber);
} catch (Exception e) {
Expand All @@ -202,6 +192,14 @@ private Boolean deregister(ClientRegistrationKey key) {
return successful;
}

private Set<String> getMemberUuids() {
Set<String> memberUuids = new HashSet<String>(members.size());
for (Member m : members) {
memberUuids.add(m.getUuid());
}
return memberUuids;
}

@Override
public void start() {
membershipListenerId = clusterService.addMembershipListener(this);
Expand Down Expand Up @@ -329,65 +327,15 @@ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
public void init(final InitialMembershipEvent event) {
}

private void updateRegistrations(ClientConnection clientConnection, List<Member> removedMembers, List<Member> newMembers) {
/**
* The servers prior to 3.7.3 do not send the unregistered members list, hence we always need to reregister all
* existing members to ensure that the listeners work.
*/
if (clientConnection.getConnectedServerVersion() == BuildInfo.UNKNOWN_HAZELCAST_VERSION) {
reRegisterAll();
return;
}

List<Member> clientUnregisteredMembers = clientConnection.getClientUnregisteredMembers();
// remove the registrations for the members who left the cluster
for (Member member : removedMembers) {
for (Map<Member, ClientEventRegistration> registrationMap : registrations.values()) {
removeRegistrationLocally(member, registrationMap);
}
// This member should not exist as a cleanedup member since it is already removed
clientUnregisteredMembers.remove(member);
}

for (Member member : clientUnregisteredMembers) {
reRegister(member);
newMembers.remove(member);
}

for (Member member : newMembers) {
register(member);
}
}

private void reRegister(Member member) {
register(member, true);
}

private void register(Member member) {
register(member, false);
}

private void register(Member member, boolean removeLocally) {
for (Map.Entry<ClientRegistrationKey, Map<Member, ClientEventRegistration>> entry : registrations.entrySet()) {
ClientRegistrationKey registrationKey = entry.getKey();
if (removeLocally) {
Map<Member, ClientEventRegistration> registrationMap = entry.getValue();
removeRegistrationLocally(member, registrationMap);
}

try {
invoke(registrationKey, member);
} catch (Exception e) {
logger.warning("Listener " + registrationKey + " could not be added to the new member " + member, e);
}
}
}

private void reRegisterAll() {
for (ClientRegistrationKey key : registrations.keySet()) {
deregister(key);
logger.finest("Reregistering listener " + key + " to the cluster.");

deregister(key, getMemberUuids());

register(key);

logger.finest("Reregistered listener " + key + " to the cluster.");
}
}

Expand All @@ -409,6 +357,10 @@ private void ensureConnectionsToAllServers() {
}

Address ownerConnectionAddress = clusterService.getOwnerConnectionAddress();
if (null == ownerConnectionAddress) {
return;
}

for (Member member : members) {
try {
getOrConnect(member, ownerConnectionAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
public class ClientReAuthOperation extends AbstractOperation implements UrgentSystemOperation, AllowedDuringPassiveState {

private String clientUuid;
private boolean clientDisconnectOperationRun;

public ClientReAuthOperation() {
}
Expand All @@ -50,17 +49,11 @@ public void run() throws Exception {
endpoint.authenticated(principal);
}
engine.addOwnershipMapping(clientUuid, memberUuid);
clientDisconnectOperationRun = endpoints.isEmpty();
}

@Override
public boolean returnsResponse() {
return Boolean.TRUE;
}

@Override
public Object getResponse() {
return clientDisconnectOperationRun;
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import com.hazelcast.client.ClientEndpoint;
import com.hazelcast.client.ClientTypes;
import com.hazelcast.client.impl.ClientEndpointImpl;
import com.hazelcast.client.impl.ClientEngineImpl;
import com.hazelcast.client.impl.client.ClientPrincipal;
import com.hazelcast.client.impl.operations.ClientReAuthOperation;
import com.hazelcast.client.impl.protocol.AuthenticationStatus;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.config.GroupConfig;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
Expand All @@ -37,7 +35,6 @@
import com.hazelcast.security.SecurityContext;
import com.hazelcast.security.UsernamePasswordCredentials;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.util.FutureUtil;
import com.hazelcast.util.UuidUtil;

import javax.security.auth.login.LoginContext;
Expand All @@ -47,15 +44,12 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

/**
* Base authentication task
*/
public abstract class AuthenticationBaseMessageTask<P> extends AbstractCallableMessageTask<P> {
private static final int REAUTHENTICATION_TIMEOUT = ClientEngineImpl.ENDPOINT_REMOVE_DELAY_SECONDS;

protected transient ClientPrincipal principal;
protected transient Credentials credentials;
protected transient byte clientSerializationVersion;
Expand Down Expand Up @@ -195,7 +189,7 @@ private ClientMessage handleAuthenticated() {

principal = new ClientPrincipal(uuid, localMemberUUID);

boolean success = reAuthenticateWithMembers(clientUnregisteredMembers, uuid);
boolean success = reAuthenticateWithMembers(uuid);

if (!success) {
byte status = AuthenticationStatus.CREDENTIALS_FAILED.getId();
Expand Down Expand Up @@ -225,7 +219,7 @@ private ClientMessage handleAuthenticated() {
clientUnregisteredMembers);
}

private boolean reAuthenticateWithMembers(final List<Member> cleanedUpMembers, final String uuid) {
private boolean reAuthenticateWithMembers(final String uuid) {
boolean success = true;

ArrayList<OperationInfo> operationInfos = new ArrayList<OperationInfo>();
Expand All @@ -249,46 +243,10 @@ private boolean reAuthenticateWithMembers(final List<Member> cleanedUpMembers, f
}

if (success) {
if (reAuthLocal()) {
cleanedUpMembers.add(localMember);
}

if (getReAuthenticationResponses(cleanedUpMembers, operationInfos)) {
return false;
}
}

return true;
}

private boolean getReAuthenticationResponses(List<Member> cleanedUpMembers, List<OperationInfo> operationInfos) {
List<Future> futures = new ArrayList<Future>();
for (OperationInfo operationInfo : operationInfos) {
futures.add(operationInfo.getFuture());
}
try {
FutureUtil.waitWithDeadline(futures, REAUTHENTICATION_TIMEOUT, TimeUnit.SECONDS,
FutureUtil.RETHROW_ALL_EXCEPT_MEMBER_LEFT);
} catch (Exception e) {
logger.warning("Cluster reAuthentication failed.", e);
return true;
reAuthLocal();
}

for (OperationInfo operationInfo : operationInfos) {
try {
boolean clientDisconnectOperationRun = (Boolean) operationInfo.getFuture().get();
if (clientDisconnectOperationRun) {
cleanedUpMembers.add(operationInfo.getMember());
}
} catch (MemberLeftException e) {
// this is ok as expected
cleanedUpMembers.add(operationInfo.getMember());
} catch (Exception e) {
logger.warning("Failed to get response for invocation to member:" + operationInfo.getMember(), e);
return true;
}
}
return false;
return success;
}

private void setConnectionType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.spi.impl.operationservice.impl;

import com.hazelcast.client.impl.operations.ClientReAuthOperation;
import com.hazelcast.client.impl.protocol.task.MessageTask;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.core.HazelcastException;
Expand Down

0 comments on commit b7e82b0

Please sign in to comment.