Skip to content

Commit

Permalink
Merge pull request #14843 from sancar/fix/hotRestartMembership/maint
Browse files Browse the repository at this point in the history
Let Client handle restarted members with same uuid different address
  • Loading branch information
sancar committed Apr 5, 2019
2 parents 9d3a27d + 76e4d24 commit 8aa219b
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,17 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.unmodifiableSet;

class ClientMembershipListener extends ClientAddMembershipListenerCodec.AbstractEventHandler
implements EventHandler<ClientMessage> {
implements EventHandler<ClientMessage> {

private static final int INITIAL_MEMBERS_TIMEOUT_SECONDS = 5;

Expand All @@ -59,7 +57,7 @@ class ClientMembershipListener extends ClientAddMembershipListenerCodec.Abstract

private volatile CountDownLatch initialListFetchedLatch;

public ClientMembershipListener(HazelcastClientInstanceImpl client) {
ClientMembershipListener(HazelcastClientInstanceImpl client) {
this.client = client;
logger = client.getLoggingService().getLogger(ClientMembershipListener.class);
connectionManager = (ClientConnectionManagerImpl) client.getConnectionManager();
Expand All @@ -84,18 +82,14 @@ public void handleMemberEventV10(Member member, int eventType) {

@Override
public void handleMemberListEventV10(Collection<Member> initialMembers) {
Map<String, Member> prevMembers = Collections.emptyMap();
Set<Member> prevMembers = Collections.emptySet();
if (!members.isEmpty()) {
prevMembers = new HashMap<String, Member>(members.size());
for (Member member : members) {
prevMembers.put(member.getUuid(), member);
}
prevMembers = new LinkedHashSet<Member>(members.size());
prevMembers.addAll(members);
members.clear();
}

for (Member initialMember : initialMembers) {
members.add(initialMember);
}
members.addAll(initialMembers);

if (prevMembers.isEmpty()) {
//this means this is the first time client connected to cluster
Expand Down Expand Up @@ -169,20 +163,19 @@ private void fireMembershipEvent(List<MembershipEvent> events) {
}
}

private List<MembershipEvent> detectMembershipEvents(Map<String, Member> prevMembers) {
private List<MembershipEvent> detectMembershipEvents(Set<Member> prevMembers) {
List<MembershipEvent> events = new LinkedList<MembershipEvent>();
Set<Member> eventMembers = unmodifiableSet(members);

List<Member> newMembers = new LinkedList<Member>();
for (Member member : members) {
Member former = prevMembers.remove(member.getUuid());
if (former == null) {
if (!prevMembers.remove(member)) {
newMembers.add(member);
}
}

// removal events should be added before added events
for (Member member : prevMembers.values()) {
for (Member member : prevMembers) {
events.add(new MembershipEvent(client.getCluster(), member, MembershipEvent.MEMBER_REMOVED, eventMembers));
Address address = member.getAddress();
if (clusterService.getMember(address) == null) {
Expand Down Expand Up @@ -231,7 +224,7 @@ public String toString() {
+ '}';
}

public void clearMembers() {
void clearMembers() {
members = new LinkedHashSet<Member>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static com.hazelcast.test.HazelcastTestSupport.assertContains;
import static com.hazelcast.test.HazelcastTestSupport.assertOpenEventually;
import static org.junit.Assert.assertEquals;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class ClientClusterRestartEventTest {

private final TestHazelcastFactory hazelcastFactory = new TestHazelcastFactory();

@Before
public void setUp() {
}

@After
public void tearDown() {
hazelcastFactory.shutdownAll();
}

protected Config newConfig() {
return new Config();
}

private ClientConfig newClientConfig() {
ClientConfig clientConfig = new ClientConfig();
clientConfig.getNetworkConfig().setConnectionAttemptLimit(Integer.MAX_VALUE);
return clientConfig;
}

@Test
public void testSingleMemberRestart() {
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(newConfig());
Member oldMember = instance.getCluster().getLocalMember();
HazelcastInstance client = hazelcastFactory.newHazelcastClient();

final CountDownLatch memberAdded = new CountDownLatch(1);
final CountDownLatch memberRemoved = new CountDownLatch(1);
final AtomicReference<Member> addedMemberReference = new AtomicReference<Member>();
final AtomicReference<Member> removedMemberReference = new AtomicReference<Member>();
client.getCluster().addMembershipListener(new MembershipListener() {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
addedMemberReference.set(membershipEvent.getMember());
memberAdded.countDown();
}

@Override
public void memberRemoved(MembershipEvent membershipEvent) {
removedMemberReference.set(membershipEvent.getMember());
memberRemoved.countDown();
}

@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {

}
});

instance.shutdown();
instance = hazelcastFactory.newHazelcastInstance(newConfig());
Member newMember = instance.getCluster().getLocalMember();

assertOpenEventually(memberRemoved);
assertEquals(oldMember, removedMemberReference.get());

assertOpenEventually(memberAdded);
assertEquals(newMember, addedMemberReference.get());

Set<Member> members = client.getCluster().getMembers();
assertContains(members, newMember);
assertEquals(1, members.size());
}

@Test
public void testMultiMemberRestart() {
HazelcastInstance instance1 = hazelcastFactory.newHazelcastInstance(newConfig());
HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(newConfig());
HazelcastInstance client = hazelcastFactory.newHazelcastClient(newClientConfig());
Member oldMember1 = instance1.getCluster().getLocalMember();
Member oldMember2 = instance2.getCluster().getLocalMember();

final CountDownLatch memberAdded = new CountDownLatch(2);
final Set<Member> addedMembers = Collections.newSetFromMap(new ConcurrentHashMap<Member, Boolean>());
final CountDownLatch memberRemoved = new CountDownLatch(2);
final Set<Member> removedMembers = Collections.newSetFromMap(new ConcurrentHashMap<Member, Boolean>());
client.getCluster().addMembershipListener(new MembershipListener() {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
addedMembers.add(membershipEvent.getMember());
memberAdded.countDown();
}

@Override
public void memberRemoved(MembershipEvent membershipEvent) {
removedMembers.add(membershipEvent.getMember());
memberRemoved.countDown();

}

@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {

}
});

instance1.shutdown();
instance2.shutdown();

instance1 = hazelcastFactory.newHazelcastInstance(newConfig());
instance2 = hazelcastFactory.newHazelcastInstance(newConfig());

Member newMember1 = instance1.getCluster().getLocalMember();
Member newMember2 = instance2.getCluster().getLocalMember();

assertOpenEventually(memberRemoved);
assertEquals(2, removedMembers.size());
assertContains(removedMembers, oldMember1);
assertContains(removedMembers, oldMember2);

assertOpenEventually(memberAdded);
assertEquals(2, addedMembers.size());
assertContains(addedMembers, newMember1);
assertContains(addedMembers, newMember2);

Set<Member> members = client.getCluster().getMembers();
assertContains(members, newMember1);
assertContains(members, newMember2);
assertEquals(2, members.size());

}
}

0 comments on commit 8aa219b

Please sign in to comment.