Skip to content

Commit

Permalink
Refactor cluster to remove gossip protocol for membership detection.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed May 23, 2015
1 parent 0bc54d3 commit fcddde6
Show file tree
Hide file tree
Showing 23 changed files with 356 additions and 671 deletions.
Expand Up @@ -15,9 +15,7 @@
*/
package net.kuujo.copycat.cluster;

import net.kuujo.copycat.EventListener;
import net.kuujo.copycat.io.serializer.Serializer;
import net.kuujo.copycat.util.ExecutionContext;

import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand All @@ -35,10 +33,9 @@ public abstract class AbstractCluster implements ManagedCluster {
protected final AbstractLocalMember localMember;
protected final Map<Integer, AbstractRemoteMember> remoteMembers = new ConcurrentHashMap<>();
protected final Map<Integer, AbstractMember> members = new ConcurrentHashMap<>();
protected final Set<EventListener<MembershipChangeEvent>> membershipListeners = new CopyOnWriteArraySet<>();
protected final Set<MembershipListener> membershipListeners = new CopyOnWriteArraySet<>();
protected final Serializer serializer;
private final AtomicInteger permits = new AtomicInteger();
private MembershipDetector membershipDetector;
private CompletableFuture<Cluster> openFuture;
private CompletableFuture<Void> closeFuture;
private AtomicBoolean open = new AtomicBoolean();
Expand All @@ -54,7 +51,7 @@ protected AbstractCluster(AbstractLocalMember localMember, Collection<? extends
/**
* Creates a new remote member.
*/
protected abstract AbstractRemoteMember createRemoteMember(AbstractMember.Info info);
protected abstract AbstractRemoteMember createRemoteMember(MemberInfo info);

@Override
public LocalMember member() {
Expand Down Expand Up @@ -92,22 +89,57 @@ public <T> Cluster broadcast(String topic, T message) {
return this;
}

/**
* Handles a membership change event.
*/
private void handleMembershipChange(MembershipChangeEvent event) {
if (event.type() == MembershipChangeEvent.Type.JOIN) {
AbstractRemoteMember member = createRemoteMember(event.info());
member.connect().whenComplete((result, error) -> {
if (error == null) {
members.put(member.id(), member);
remoteMembers.put(member.id(), member);
membershipListeners.forEach(l -> l.memberJoined(member));
}
});
} else if (event.type() == MembershipChangeEvent.Type.LEAVE) {
AbstractRemoteMember member = remoteMembers.remove(event.info().id());
if (member != null) {
member.close().whenComplete((result, error) -> {
membershipListeners.forEach(l -> l.memberLeft(member.id()));
});
}
}
}

@Override
public Cluster addMembershipListener(EventListener<MembershipChangeEvent> listener) {
public Cluster addListener(MembershipListener listener) {
if (listener == null)
throw new NullPointerException("listener cannot be null");
membershipListeners.add(listener);
return this;
}

@Override
public Cluster removeMembershipListener(EventListener<MembershipChangeEvent> listener) {
public Cluster removeListener(MembershipListener listener) {
if (listener == null)
throw new NullPointerException("listener cannot be null");
membershipListeners.remove(listener);
return this;
}

@Override
public Cluster addProvider(MembershipProvider provider) {
provider.addListener(this::handleMembershipChange);
return this;
}

@Override
public Cluster removeProvider(MembershipProvider provider) {
provider.removeListener(this::handleMembershipChange);
return this;
}

/**
* Opens the cluster.
*
Expand All @@ -126,7 +158,6 @@ public CompletableFuture<Cluster> open() {
}
return CompletableFuture.allOf(futures);
}).thenApply(v -> {
membershipDetector = new MembershipDetector(this, new ExecutionContext(String.format("copycat-membership-detector-%d", localMember.id())));
openFuture = null;
if (permits.get() > 0) {
open.set(true);
Expand Down Expand Up @@ -167,13 +198,9 @@ public CompletableFuture<Void> close() {
closeFuture = CompletableFuture.allOf(futures)
.thenCompose(v -> localMember.close())
.thenRun(() -> {
if (membershipDetector != null) {
membershipDetector.close();
membershipDetector = null;
closeFuture = null;
if (permits.get() == 0) {
open.set(false);
}
closeFuture = null;
if (permits.get() == 0) {
open.set(false);
}
});
}
Expand All @@ -194,50 +221,42 @@ public boolean isClosed() {
/**
* Cluster builder.
*/
public static abstract class Builder<BUILDER extends Builder<BUILDER, MEMBER>, MEMBER extends ManagedMember> implements Cluster.Builder<BUILDER, MEMBER> {
public static abstract class Builder<T extends Builder<T, U>, U extends ManagedMember> implements Cluster.Builder<T, U> {
protected int memberId;
protected Member.Type memberType;
protected final Map<Integer, MEMBER> members = new HashMap<>();
protected final Map<Integer, U> members = new HashMap<>();
protected Serializer serializer;

@Override
@SuppressWarnings("unchecked")
public BUILDER withMemberId(int id) {
public T withMemberId(int id) {
if (id < 0)
throw new IllegalArgumentException("member ID cannot be negative");
this.memberId = id;
return (BUILDER) this;
}

@Override
@SuppressWarnings("unchecked")
public BUILDER withMemberType(Member.Type type) {
this.memberType = type;
return (BUILDER) this;
return (T) this;
}

@Override
@SuppressWarnings("unchecked")
public BUILDER withSeeds(Collection<MEMBER> members) {
public T withSeeds(Collection<U> members) {
this.members.clear();
members.forEach(m -> this.members.put(m.id(), m));
return (BUILDER) this;
return (T) this;
}

@Override
@SuppressWarnings("unchecked")
public BUILDER addSeed(MEMBER member) {
public T addSeed(U member) {
if (member == null)
throw new NullPointerException("member cannot be null");
members.put(member.id(), member);
return (BUILDER) this;
return (T) this;
}

@Override
@SuppressWarnings("unchecked")
public BUILDER withSerializer(Serializer serializer) {
public T withSerializer(Serializer serializer) {
this.serializer = serializer;
return (BUILDER) this;
return (T) this;
}
}

Expand Down
Expand Up @@ -24,8 +24,8 @@
*/
public abstract class AbstractLocalMember extends AbstractMember implements ManagedLocalMember {

protected AbstractLocalMember(Info info, ExecutionContext context) {
super(info, context);
protected AbstractLocalMember(MemberInfo info, Type type, ExecutionContext context) {
super(info, type, context);
}

}

0 comments on commit fcddde6

Please sign in to comment.