Skip to content

Commit

Permalink
Ensure group member objects remain consistent when joining a group.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Feb 29, 2016
1 parent 957a775 commit f531f28
Showing 1 changed file with 109 additions and 54 deletions.
163 changes: 109 additions & 54 deletions coordination/src/main/java/io/atomix/coordination/DistributedGroup.java
Expand Up @@ -28,9 +28,11 @@
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -202,7 +204,7 @@ public static Config config() {
private final Listeners<GroupMember> leaveListeners = new Listeners<>();
private final Listeners<Long> termListeners = new Listeners<>();
private final Listeners<GroupMember> electionListeners = new Listeners<>();
private final Map<String, InternalLocalGroupMember> localMembers = new ConcurrentHashMap<>();
private final Set<String> joining = new CopyOnWriteArraySet<>();
private final Map<String, GroupMember> members = new ConcurrentHashMap<>();
private volatile String leader;
private volatile long term;
Expand All @@ -214,50 +216,13 @@ public DistributedGroup(CopycatClient client, Resource.Options options) {
@Override
public CompletableFuture<DistributedGroup> open() {
return super.open().thenApply(result -> {
client.<String>onEvent("join", memberId -> {
GroupMember member = members.computeIfAbsent(memberId, InternalGroupMember::new);
for (Listener<GroupMember> listener : joinListeners) {
listener.accept(member);
}
});

client.<String>onEvent("leave", memberId -> {
GroupMember member = members.remove(memberId);
if (member != null) {
for (Listener<GroupMember> listener : leaveListeners) {
listener.accept(member);
}
}
});

client.<Long>onEvent("term", term -> {
this.term = term;
termListeners.accept(term);
});

client.<String>onEvent("elect", leader -> {
this.leader = leader;
electionListeners.accept(member(leader));
InternalLocalGroupMember member = localMembers.get(leader);
if (member != null) {
member.electionListeners.accept(term);
}
});

client.<String>onEvent("resign", leader -> {
if (this.leader != null && this.leader.equals(leader)) {
this.leader = null;
}
});

client.<GroupCommands.Message>onEvent("message", message -> {
InternalLocalGroupMember localMember = localMembers.get(message.member());
if (localMember != null) {
localMember.handle(message);
}
});

client.onEvent("execute", Runnable::run);
client.onEvent("join", this::onJoinEvent);
client.onEvent("leave", this::onLeaveEvent);
client.onEvent("term", this::onTermEvent);
client.onEvent("elect", this::onElectEvent);
client.onEvent("resign", this::onResignEvent);
client.onEvent("message", this::onMessageEvent);
client.onEvent("execute", this::onExecuteEvent);

return result;
}).thenCompose(v -> sync())
Expand All @@ -275,6 +240,81 @@ private CompletableFuture<Void> sync() {
});
}

/**
* Handles a join event received from the cluster.
*/
private void onJoinEvent(String memberId) {
GroupMember member;
if (joining.contains(memberId)) {
member = new InternalLocalGroupMember(memberId);
members.put(memberId, member);
} else {
member = members.computeIfAbsent(memberId, InternalGroupMember::new);
}

for (Listener<GroupMember> listener : joinListeners) {
listener.accept(member);
}
}

/**
* Handles a leave event received from the cluster.
*/
private void onLeaveEvent(String memberId) {
GroupMember member = members.remove(memberId);
if (member != null) {
for (Listener<GroupMember> listener : leaveListeners) {
listener.accept(member);
}
}
}

/**
* Handles a term change event received from the cluster.
*/
private void onTermEvent(long term) {
this.term = term;
termListeners.accept(term);
}

/**
* Handles an elect event received from the cluster.
*/
private void onElectEvent(String leader) {
this.leader = leader;
electionListeners.accept(member(leader));
GroupMember member = members.get(leader);
if (member != null && member instanceof InternalLocalGroupMember) {
((InternalLocalGroupMember) member).electionListeners.accept(term);
}
}

/**
* Handles a resign event received from the cluster.
*/
private void onResignEvent(String leader) {
if (this.leader != null && this.leader.equals(leader)) {
this.leader = null;
}
}

/**
* Handles a message event received from the cluster.
*/
private void onMessageEvent(GroupCommands.Message message) {
GroupMember localMember = members.get(message.member());
if (localMember != null && localMember instanceof InternalLocalGroupMember) {
((InternalLocalGroupMember) localMember).handle(message);
}
}

/**
* Handles an execute event received from the cluster.
*/
private void onExecuteEvent(Runnable callback) {
callback.run();
}

/**
* Returns the current group leader.
* <p>
Expand Down Expand Up @@ -418,11 +458,7 @@ public Collection<GroupMember> members() {
* @return A completable future to be completed once the member has joined.
*/
public CompletableFuture<LocalGroupMember> join() {
return submit(new GroupCommands.Join(UUID.randomUUID().toString(), false)).thenApply(memberId -> {
InternalLocalGroupMember member = new InternalLocalGroupMember(memberId);
localMembers.put(member.id(), member);
return member;
});
return join(UUID.randomUUID().toString(), false);
}

/**
Expand Down Expand Up @@ -453,9 +489,28 @@ public CompletableFuture<LocalGroupMember> join() {
* @return A completable future to be completed once the member has joined.
*/
public CompletableFuture<LocalGroupMember> join(String memberId) {
return submit(new GroupCommands.Join(memberId, true)).thenApply(id -> {
InternalLocalGroupMember member = new InternalLocalGroupMember(id);
localMembers.put(member.id(), member);
return join(memberId, true);
}

/**
* Joins the group.
*
* @param memberId The member ID with which to join the group.
* @param persistent Indicates whether the member ID is persistent.
* @return A completable future to be completed once the member has joined the group.
*/
private CompletableFuture<LocalGroupMember> join(String memberId, boolean persistent) {
joining.add(memberId);
return submit(new GroupCommands.Join(memberId, persistent)).whenComplete((result, error) -> {
if (error != null) {
joining.remove(memberId);
}
}).thenApply(id -> {
LocalGroupMember member = (LocalGroupMember) members.get(id);
if (member == null) {
member = new InternalLocalGroupMember(id);
members.put(id, member);
}
return member;
});
}
Expand Down Expand Up @@ -555,7 +610,7 @@ public CompletableFuture<Void> resign() {
@Override
public CompletableFuture<Void> leave() {
return submit(new GroupCommands.Leave(memberId)).whenComplete((result, error) -> {
localMembers.remove(memberId);
members.remove(memberId);
});
}

Expand Down

0 comments on commit f531f28

Please sign in to comment.