Skip to content

Commit

Permalink
Group test fixes (#208)
Browse files Browse the repository at this point in the history
* Rewrite group recovery test to ensure final membership checks account for sequential consistency in membership change events.

* Fix group test log level.

* Fix group recovery test name.

* Ensure leave events are triggered for local group members.

* Ensure local membership is consistent prior to new join events being received when recovering group members.

* Reset group membership test log levels.
  • Loading branch information
kuujo committed Dec 27, 2016
1 parent fb004a7 commit ca13fc7
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 45 deletions.
61 changes: 35 additions & 26 deletions groups/src/main/java/io/atomix/group/internal/MembershipGroup.java
Expand Up @@ -31,10 +31,7 @@
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceType; import io.atomix.resource.ResourceType;


import java.util.Collection; import java.util.*;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
Expand All @@ -60,7 +57,7 @@ public class MembershipGroup extends AbstractResource<DistributedGroup> implemen
private final Map<String, AbstractGroupMember> members = new ConcurrentHashMap<>(); private final Map<String, AbstractGroupMember> members = new ConcurrentHashMap<>();
private final MessageProducerService producerService; private final MessageProducerService producerService;
private final MessageConsumerService consumerService; private final MessageConsumerService consumerService;
private final Map<String, GroupCommands.Join> joinCommands = new ConcurrentHashMap<>(); private final Map<String, GroupCommands.Join> localJoins = new ConcurrentHashMap<>();


public MembershipGroup(CopycatClient client, Properties options) { public MembershipGroup(CopycatClient client, Properties options) {
super(client, new ResourceType(DistributedGroup.class), options); super(client, new ResourceType(DistributedGroup.class), options);
Expand Down Expand Up @@ -135,7 +132,7 @@ private CompletableFuture<LocalMember> join(String memberId, boolean persistent,
AbstractGroupMember member = members.get(info.memberId()); AbstractGroupMember member = members.get(info.memberId());
if (member == null || !(member instanceof LocalGroupMember)) { if (member == null || !(member instanceof LocalGroupMember)) {
member = new LocalGroupMember(info, this, producerService, consumerService); member = new LocalGroupMember(info, this, producerService, consumerService);
joinCommands.put(memberId, cmd); localJoins.put(memberId, cmd);
members.put(info.memberId(), member); members.put(info.memberId(), member);
} }
return (LocalGroupMember) member; return (LocalGroupMember) member;
Expand All @@ -150,8 +147,11 @@ public Listener<GroupMember> onJoin(Consumer<GroupMember> listener) {
@Override @Override
public CompletableFuture<Void> remove(String memberId) { public CompletableFuture<Void> remove(String memberId) {
return client.submit(new GroupCommands.Leave(memberId)).thenRun(() -> { return client.submit(new GroupCommands.Leave(memberId)).thenRun(() -> {
joinCommands.remove(memberId); localJoins.remove(memberId);
members.remove(memberId); GroupMember member = members.remove(memberId);
if (member != null) {
leaveListeners.accept(member);
}
}); });
} }


Expand All @@ -176,31 +176,40 @@ public CompletableFuture<DistributedGroup> open() {


@Override @Override
protected CompletableFuture<Void> recover(Integer attempt) { protected CompletableFuture<Void> recover(Integer attempt) {
final int persistentCount = // When recovering the membership group, we need to ensure that all local non-persistent members are
(int) joinCommands.values().stream().filter(v -> !v.persist()).count(); // removed from the group prior to fetching the group membership from the cluster again, and prior to
final CompletableFuture[] closeFutures = new CompletableFuture[persistentCount]; // adding recovered members that the membership list is updated to ensure the list is consistent
final CompletableFuture[] joinFutures = new CompletableFuture[joinCommands.size()]; // when join event handlers are called.
int i = 0; Map<String, GroupCommands.Join> joins = new HashMap<>(localJoins);
for (GroupCommands.Join cmd : joinCommands.values()) {
if (cmd.persist()) {
joinFutures[i] = join(cmd.member(), cmd.persist(), cmd.metadata());
} else {
closeFutures[i] = remove(cmd.member());
joinFutures[i] = join(cmd.metadata());
}
i++;
}
members.clear();
return sync() return sync()
.thenCompose(v -> CompletableFuture.allOf(closeFutures)) .thenCompose(v -> {
.thenCompose(v -> CompletableFuture.allOf(joinFutures)); List<CompletableFuture> futures = new ArrayList<>(joins.size());
for (GroupCommands.Join join : joins.values()) {
if (!join.persist()) {
futures.add(remove(join.member()));
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
})
.thenCompose(v -> sync())
.thenCompose(v -> {
List<CompletableFuture> futures = new ArrayList<>(joins.size());
for (GroupCommands.Join join : joins.values()) {
if (join.persist()) {
futures.add(join(join.member(), join.persist(), join.metadata()));
} else {
futures.add(join(join.metadata()));
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
});
} }


/** /**
* Synchronizes the membership group. * Synchronizes the membership group.
*/ */
private CompletableFuture<Void> sync() { private CompletableFuture<Void> sync() {
return client.submit(new GroupCommands.Listen()).thenAccept(status-> { return client.submit(new GroupCommands.Listen()).thenAccept(status -> {
for (GroupMemberInfo info : status.members()) { for (GroupMemberInfo info : status.members()) {
AbstractGroupMember member = this.members.get(info.memberId()); AbstractGroupMember member = this.members.get(info.memberId());
if (member == null) { if (member == null) {
Expand Down
108 changes: 90 additions & 18 deletions groups/src/test/java/io/atomix/group/DistributedGroupTest.java
Expand Up @@ -23,7 +23,6 @@
import io.atomix.testing.AbstractCopycatTest; import io.atomix.testing.AbstractCopycatTest;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -599,56 +598,129 @@ public void testGroupMessageFailOnLeave() throws Throwable {
await(10000, 2); await(10000, 2);
} }


/**
* Tests that a member already exists when a join event is received.
*/
public void testMemberExistsOnJoinEvent() throws Throwable {
createServers(3);

final CopycatClient client1 = createCopycatClient();
final CopycatClient client2 = createCopycatClient();

final DistributedGroup group1 = createResource(client1, new DistributedGroup.Options());
final DistributedGroup group2 = createResource(client2, new DistributedGroup.Options());

group1.onJoin(m -> {
threadAssertEquals(1, group1.members().size());
resume();
});
group2.onJoin(m -> {
threadAssertEquals(1, group2.members().size());
resume();
});

group1.join().thenRun(this::resume);

await(5000, 3);
}

/**
* Tests that the local onLeave handler is called when a member leaves the group.
*/
public void testLocalOnLeave() throws Throwable {
createServers(3);

final CopycatClient client1 = createCopycatClient();
final CopycatClient client2 = createCopycatClient();

final DistributedGroup group1 = createResource(client1, new DistributedGroup.Options());
final DistributedGroup group2 = createResource(client2, new DistributedGroup.Options());

// Group join handlers
group1.onJoin(m -> resume());
group2.onJoin(m -> resume());

// Join both members to the group
final LocalMember member1 = group1.join().get(5, TimeUnit.SECONDS);
final LocalMember member2 = group2.join().get(5, TimeUnit.SECONDS);

await(5000, 4);

group1.onLeave(m -> {
resume();
});
group2.onLeave(m -> {
resume();
});

member1.leave().thenRun(this::resume);

await(5000, 3);
}

/**
* Tests the recovery of a group resource/member in a group.
*/
public void testRecovery() throws Throwable { public void testRecovery() throws Throwable {
createServers(3); createServers(3);

final CopycatClient client1 = createCopycatClient(); final CopycatClient client1 = createCopycatClient();
final CopycatClient client2 = createCopycatClient(); final CopycatClient client2 = createCopycatClient();

final DistributedGroup group1 = createResource(client1, new DistributedGroup.Options()); final DistributedGroup group1 = createResource(client1, new DistributedGroup.Options());
final DistributedGroup group2 = createResource(client2, new DistributedGroup.Options()); final DistributedGroup group2 = createResource(client2, new DistributedGroup.Options());


// Group1 on join handler
group1.onJoin(m -> { group1.onJoin(m -> {
System.out.println(group1.members().size());
if (group1.members().size() == 2) { if (group1.members().size() == 2) {
resume(); resume();
} }
}); });

// Group2 on join handler
group2.onJoin(m -> { group2.onJoin(m -> {
System.out.println(group2.members().size());
if (group2.members().size() == 2) { if (group2.members().size() == 2) {
resume(); resume();
} }
}); });


final CountDownLatch leave = new CountDownLatch(1); // Join both members to the group
group1.onLeave(m -> leave.countDown()); final LocalMember member1 = group1.join().get(5, TimeUnit.SECONDS);
final LocalMember member2 = group2.join().get(5, TimeUnit.SECONDS);


group1.join().get(10, TimeUnit.SECONDS); // Wait for both group instances to receive join events for both members
final LocalMember member2 = group2.join().get(10, TimeUnit.SECONDS);
await(5000, 2); await(5000, 2);


// Set a recovery handler for group1
group1.onRecovery(attempt -> { group1.onRecovery(attempt -> {
threadAssertEquals(1, attempt); threadAssertEquals(1, attempt);
resume(); resume();
}); });


final long startExpire = System.currentTimeMillis(); // Expire client 1's session, which should cause group1 to expire
((ClientSession) client1.session()).expire().whenComplete((v, e) -> { ((ClientSession) client1.session()).expire().whenComplete((v, e) -> {
threadAssertNull(e); threadAssertNull(e);
resume(); resume();
}); });


await(5000, 2); // Wait for the client's session to expire and be recovered
await(5000, 4);


// wait for the old session to expire in the state machine // Ensure one member remains once a node is removed
final long remainingExpire = 5000 - (System.currentTimeMillis() - startExpire); group1.onLeave(m -> {
Thread.sleep(Math.max(0, remainingExpire)); threadAssertEquals(1, group1.members().size());
resume();
});
group2.onLeave(m -> {
threadAssertEquals(1, group2.members().size());
resume();
});


// Remove member2 from the group
member2.leave().thenRun(this::resume); member2.leave().thenRun(this::resume);
await(5000, 1); await(5000, 3);

// recovered client should receive events via new session
threadAssertTrue(leave.await(5, TimeUnit.SECONDS));
threadAssertEquals(1, group1.members().size());

// recovered client should rejoin the cluster
threadAssertEquals(1, group2.members().size());
} }

} }
2 changes: 1 addition & 1 deletion groups/src/test/resources/logback.xml
Expand Up @@ -24,4 +24,4 @@
<root level="INFO"> <root level="INFO">
<appender-ref ref="STDOUT" /> <appender-ref ref="STDOUT" />
</root> </root>
</configuration> </configuration>

0 comments on commit ca13fc7

Please sign in to comment.