Skip to content

Commit

Permalink
Fix various issues with tracking members and sessions internally.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 24, 2015
1 parent 1387c3a commit 2f36c6e
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 111 deletions.
Expand Up @@ -51,7 +51,6 @@ private static enum State {
private volatile State state = State.CLOSED; private volatile State state = State.CLOSED;
private volatile long id; private volatile long id;
private volatile UUID connectionId; private volatile UUID connectionId;
private volatile Connection connection;


public ClientSession(Context context) { public ClientSession(Context context) {
this.context = context; this.context = context;
Expand All @@ -71,7 +70,6 @@ public UUID connection() {
* Sets the session connection. * Sets the session connection.
*/ */
void connect(Connection connection) { void connect(Connection connection) {
this.connection = connection;
connection.handler(PublishRequest.class, this::handlePublish); connection.handler(PublishRequest.class, this::handlePublish);
} }


Expand All @@ -83,12 +81,11 @@ public ListenerContext<?> onReceive(Listener listener) {


@Override @Override
public CompletableFuture<Void> publish(Object message) { public CompletableFuture<Void> publish(Object message) {
context.execute(() -> { return CompletableFuture.runAsync(() -> {
for (Listener<Object> listener : receiveListeners) { for (Listener<Object> listener : receiveListeners) {
listener.accept(message); listener.accept(message);
} }
}); }, context);
return CompletableFuture.completedFuture(null);
} }


/** /**
Expand Down Expand Up @@ -149,7 +146,6 @@ private void triggerClose() {


id = 0; id = 0;
connectionId = null; connectionId = null;
connection = null;
} }


@Override @Override
Expand Down
Expand Up @@ -619,7 +619,6 @@ protected CompletableFuture<Void> register(List<Member> members) {
setSessionId(response.session()); setSessionId(response.session());
session.open(response.session(), connection.id()); session.open(response.session(), connection.id());
session.open(response.session(), client.id()); session.open(response.session(), client.id());
this.members.configure(response.members());
}); });
} }


Expand All @@ -635,7 +634,7 @@ protected CompletableFuture<RegisterResponse> register(List<Member> members, Com
Member member = selectMember(members); Member member = selectMember(members);


RegisterRequest request = RegisterRequest.builder() RegisterRequest request = RegisterRequest.builder()
.withMember(member) .withMember(this.member)
.withConnection(client.id()) .withConnection(client.id())
.build(); .build();


Expand Down Expand Up @@ -684,7 +683,6 @@ protected CompletableFuture<Void> keepAlive(List<Member> members) {
setTerm(response.term()); setTerm(response.term());
setLeader(response.leader()); setLeader(response.leader());
setVersion(response.version()); setVersion(response.version());
this.members.configure(response.members());
}); });
} }


Expand All @@ -709,8 +707,12 @@ protected CompletableFuture<KeepAliveResponse> keepAlive(List<Member> members, C
if (isOpen()) { if (isOpen()) {
connection.<KeepAliveRequest, KeepAliveResponse>send(request).whenComplete((response, error) -> { connection.<KeepAliveRequest, KeepAliveResponse>send(request).whenComplete((response, error) -> {
if (isOpen()) { if (isOpen()) {
if (error == null && response.status() == Response.Status.OK) { if (error == null) {
future.complete(response); if (response.status() == Response.Status.OK) {
future.complete(response);
} else {
future.completeExceptionally(response.error().createException());
}
} else { } else {
future.completeExceptionally(error); future.completeExceptionally(error);
} }
Expand Down
19 changes: 11 additions & 8 deletions core/src/main/java/net/kuujo/copycat/ResourceSession.java
Expand Up @@ -65,11 +65,14 @@ public ListenerContext<Session> onOpen(Listener<Session> listener) {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Void> publish(Object message) { public CompletableFuture<Void> publish(Object message) {
context.execute(() -> { ResourceMessage resourceMessage = (ResourceMessage) message;
for (Listener<Object> listener : receiveListeners) { if (resourceMessage.resource() == resource) {
listener.accept(message); return CompletableFuture.runAsync(() -> {
} for (Listener<Object> listener : receiveListeners) {
}); listener.accept(resourceMessage.message());
}
}, context);
}
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }


Expand Down Expand Up @@ -97,17 +100,17 @@ private void handleReceive(ResourceMessage<?> message) {


@Override @Override
public ListenerContext<Session> onClose(Listener<Session> listener) { public ListenerContext<Session> onClose(Listener<Session> listener) {
return null; return parent.onClose(listener);
} }


@Override @Override
public boolean isClosed() { public boolean isClosed() {
return false; return parent.isClosed();
} }


@Override @Override
public boolean isExpired() { public boolean isExpired() {
return false; return parent.isExpired();
} }


/** /**
Expand Down
Expand Up @@ -21,7 +21,6 @@
import net.kuujo.alleycat.io.BufferOutput; import net.kuujo.alleycat.io.BufferOutput;
import net.kuujo.alleycat.util.ReferenceManager; import net.kuujo.alleycat.util.ReferenceManager;
import net.kuujo.copycat.BuilderPool; import net.kuujo.copycat.BuilderPool;
import net.kuujo.copycat.raft.Members;
import net.kuujo.copycat.raft.RaftError; import net.kuujo.copycat.raft.RaftError;


import java.util.Objects; import java.util.Objects;
Expand Down Expand Up @@ -56,7 +55,6 @@ public static Builder builder(KeepAliveResponse response) {


private long term; private long term;
private int leader; private int leader;
private Members members;


public KeepAliveResponse(ReferenceManager<KeepAliveResponse> referenceManager) { public KeepAliveResponse(ReferenceManager<KeepAliveResponse> referenceManager) {
super(referenceManager); super(referenceManager);
Expand Down Expand Up @@ -85,15 +83,6 @@ public int leader() {
return leader; return leader;
} }


/**
* Returns the responding node's member set.
*
* @return The responding node's member set.
*/
public Members members() {
return members;
}

@Override @Override
public void readObject(BufferInput buffer, Alleycat alleycat) { public void readObject(BufferInput buffer, Alleycat alleycat) {
status = Status.forId(buffer.readByte()); status = Status.forId(buffer.readByte());
Expand All @@ -102,7 +91,6 @@ public void readObject(BufferInput buffer, Alleycat alleycat) {
term = buffer.readLong(); term = buffer.readLong();
leader = buffer.readInt(); leader = buffer.readInt();
version = buffer.readLong(); version = buffer.readLong();
members = alleycat.readObject(buffer);
} else { } else {
error = RaftError.forId(buffer.readByte()); error = RaftError.forId(buffer.readByte());
} }
Expand All @@ -113,7 +101,6 @@ public void writeObject(BufferOutput buffer, Alleycat alleycat) {
buffer.writeByte(status.id()); buffer.writeByte(status.id());
if (status == Status.OK) { if (status == Status.OK) {
buffer.writeLong(term).writeInt(leader).writeLong(version); buffer.writeLong(term).writeInt(leader).writeLong(version);
alleycat.writeObject(members, buffer);
} else { } else {
buffer.writeByte(error.id()); buffer.writeByte(error.id());
} }
Expand All @@ -130,15 +117,14 @@ public boolean equals(Object object) {
KeepAliveResponse response = (KeepAliveResponse) object; KeepAliveResponse response = (KeepAliveResponse) object;
return response.status == status return response.status == status
&& response.term == term && response.term == term
&& response.leader == leader && response.leader == leader;
&& response.members.equals(members);
} }
return false; return false;
} }


@Override @Override
public String toString() { public String toString() {
return String.format("%s[term=%d, leader=%d, members=%s]", getClass().getSimpleName(), term, leader, members); return String.format("%s[term=%d, leader=%d]", getClass().getSimpleName(), term, leader);
} }


/** /**
Expand All @@ -155,7 +141,6 @@ protected void reset() {
super.reset(); super.reset();
response.term = 0; response.term = 0;
response.leader = 0; response.leader = 0;
response.members = null;
} }


/** /**
Expand All @@ -182,17 +167,6 @@ public Builder withLeader(int leader) {
return this; return this;
} }


/**
* Sets the response members.
*
* @param members The response members.
* @return The keep alive response builder.;
*/
public Builder withMembers(Members members) {
response.members = members;
return this;
}

@Override @Override
public KeepAliveResponse build() { public KeepAliveResponse build() {
super.build(); super.build();
Expand Down
Expand Up @@ -21,7 +21,6 @@
import net.kuujo.alleycat.io.BufferOutput; import net.kuujo.alleycat.io.BufferOutput;
import net.kuujo.alleycat.util.ReferenceManager; import net.kuujo.alleycat.util.ReferenceManager;
import net.kuujo.copycat.BuilderPool; import net.kuujo.copycat.BuilderPool;
import net.kuujo.copycat.raft.Members;
import net.kuujo.copycat.raft.RaftError; import net.kuujo.copycat.raft.RaftError;


import java.util.Objects; import java.util.Objects;
Expand Down Expand Up @@ -57,7 +56,6 @@ public static Builder builder(RegisterResponse response) {
private long term; private long term;
private int leader; private int leader;
private long session; private long session;
private Members members;


public RegisterResponse(ReferenceManager<RegisterResponse> referenceManager) { public RegisterResponse(ReferenceManager<RegisterResponse> referenceManager) {
super(referenceManager); super(referenceManager);
Expand Down Expand Up @@ -95,15 +93,6 @@ public long session() {
return session; return session;
} }


/**
* Returns the responding node's member set.
*
* @return The responding node's member set.
*/
public Members members() {
return members;
}

@Override @Override
public void readObject(BufferInput buffer, Alleycat alleycat) { public void readObject(BufferInput buffer, Alleycat alleycat) {
status = Status.forId(buffer.readByte()); status = Status.forId(buffer.readByte());
Expand All @@ -112,7 +101,6 @@ public void readObject(BufferInput buffer, Alleycat alleycat) {
term = buffer.readLong(); term = buffer.readLong();
leader = buffer.readInt(); leader = buffer.readInt();
session = buffer.readLong(); session = buffer.readLong();
members = alleycat.readObject(buffer);
} else { } else {
error = RaftError.forId(buffer.readByte()); error = RaftError.forId(buffer.readByte());
} }
Expand All @@ -123,7 +111,6 @@ public void writeObject(BufferOutput buffer, Alleycat alleycat) {
buffer.writeByte(status.id()); buffer.writeByte(status.id());
if (status == Status.OK) { if (status == Status.OK) {
buffer.writeLong(term).writeInt(leader).writeLong(session); buffer.writeLong(term).writeInt(leader).writeLong(session);
alleycat.writeObject(members, buffer);
} else { } else {
buffer.writeByte(error.id()); buffer.writeByte(error.id());
} }
Expand All @@ -147,7 +134,7 @@ public boolean equals(Object object) {


@Override @Override
public String toString() { public String toString() {
return String.format("%s[term=%d, leader=%d, session=%d, members=%s]", getClass().getSimpleName(), term, leader, session, members); return String.format("%s[term=%d, leader=%d, session=%d]", getClass().getSimpleName(), term, leader, session);
} }


/** /**
Expand All @@ -165,7 +152,6 @@ protected void reset() {
response.term = 0; response.term = 0;
response.leader = 0; response.leader = 0;
response.session = 0; response.session = 0;
response.members = null;
} }


/** /**
Expand Down Expand Up @@ -205,19 +191,6 @@ public Builder withSession(long session) {
return this; return this;
} }


/**
* Sets the response members.
*
* @param members The response members.
* @return The response builder.
*/
public Builder withMembers(Members members) {
if (members == null)
throw new NullPointerException("members cannot be null");
response.members = members;
return this;
}

@Override @Override
public RegisterResponse build() { public RegisterResponse build() {
super.build(); super.build();
Expand Down
Expand Up @@ -125,7 +125,7 @@ else if (request.candidate() == context.getMemberId()) {
} }
// If the requesting candidate is not a known member of the cluster (to this // If the requesting candidate is not a known member of the cluster (to this
// node) then don't vote for it. Only vote for candidates that we know about. // node) then don't vote for it. Only vote for candidates that we know about.
else if (!context.getCluster().getActiveMembers().stream().<Integer>map(MemberState::getId).collect(Collectors.toSet()).contains(request.candidate())) { else if (!context.getCluster().getActiveMembers().stream().<Integer>map(m -> m.getMember().id()).collect(Collectors.toSet()).contains(request.candidate())) {
LOGGER.debug("{} - Rejected {}: candidate is not known to the local member", context.getMemberId(), request); LOGGER.debug("{} - Rejected {}: candidate is not known to the local member", context.getMemberId(), request);
return VoteResponse.builder() return VoteResponse.builder()
.withStatus(Response.Status.OK) .withStatus(Response.Status.OK)
Expand Down
Expand Up @@ -36,8 +36,8 @@ class ClusterState implements Iterable<MemberState> {
* @return The cluster state. * @return The cluster state.
*/ */
ClusterState addMember(MemberState member) { ClusterState addMember(MemberState member) {
members.put(member.getId(), member); members.put(member.getMember().id(), member);
if (member.getType() == Member.Type.ACTIVE) { if (member.getMember().type() == Member.Type.ACTIVE) {
addActiveMember(member); addActiveMember(member);
} else { } else {
addPassiveMember(member); addPassiveMember(member);
Expand Down Expand Up @@ -73,7 +73,7 @@ private void resetPassiveMembers() {
* Sorts the active members. * Sorts the active members.
*/ */
private void sortActiveMembers() { private void sortActiveMembers() {
Collections.sort(activeMembers, (m1, m2) -> m1.getId() - m2.getId()); Collections.sort(activeMembers, (m1, m2) -> m1.getMember().id() - m2.getMember().id());
for (int i = 0; i < activeMembers.size(); i++) { for (int i = 0; i < activeMembers.size(); i++) {
activeMembers.get(i).setIndex(i); activeMembers.get(i).setIndex(i);
} }
Expand All @@ -83,7 +83,7 @@ private void sortActiveMembers() {
* Sorts the passive members. * Sorts the passive members.
*/ */
private void sortPassiveMembers() { private void sortPassiveMembers() {
Collections.sort(passiveMembers, (m1, m2) -> m1.getId() - m2.getId()); Collections.sort(passiveMembers, (m1, m2) -> m1.getMember().id() - m2.getMember().id());
for (int i = 0; i < passiveMembers.size(); i++) { for (int i = 0; i < passiveMembers.size(); i++) {
passiveMembers.get(i).setIndex(i); passiveMembers.get(i).setIndex(i);
} }
Expand All @@ -96,8 +96,8 @@ private void sortPassiveMembers() {
* @return The cluster state. * @return The cluster state.
*/ */
ClusterState removeMember(MemberState member) { ClusterState removeMember(MemberState member) {
members.remove(member.getId()); members.remove(member.getMember().id());
if (member.getType() == Member.Type.ACTIVE) { if (member.getMember().type() == Member.Type.ACTIVE) {
removeActiveMember(member); removeActiveMember(member);
} else { } else {
removePassiveMember(member); removePassiveMember(member);
Expand All @@ -111,7 +111,7 @@ ClusterState removeMember(MemberState member) {
private void removeActiveMember(MemberState member) { private void removeActiveMember(MemberState member) {
Iterator<MemberState> iterator = activeMembers.iterator(); Iterator<MemberState> iterator = activeMembers.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
if (iterator.next().getId() == member.getId()) { if (iterator.next().getMember().id() == member.getMember().id()) {
iterator.remove(); iterator.remove();
} }
} }
Expand All @@ -124,7 +124,7 @@ private void removeActiveMember(MemberState member) {
private void removePassiveMember(MemberState member) { private void removePassiveMember(MemberState member) {
Iterator<MemberState> iterator = passiveMembers.iterator(); Iterator<MemberState> iterator = passiveMembers.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
if (iterator.next().getId() == member.getId()) { if (iterator.next().getMember().id() == member.getMember().id()) {
iterator.remove(); iterator.remove();
} }
} }
Expand Down
Expand Up @@ -232,7 +232,7 @@ private void replicateCommits(long sessionId) {
* Returns a boolean value indicating whether the given member is a replica of this follower. * Returns a boolean value indicating whether the given member is a replica of this follower.
*/ */
private boolean isActiveReplica(MemberState member) { private boolean isActiveReplica(MemberState member) {
if (member != null && member.getType() == Member.Type.PASSIVE && member.getSession() != 0) { if (member != null && member.getMember().type() == Member.Type.PASSIVE && member.getSession() != 0) {
MemberState thisMember = context.getCluster().getMember(context.getMemberId()); MemberState thisMember = context.getCluster().getMember(context.getMemberId());
int index = thisMember.getIndex(); int index = thisMember.getIndex();
int activeMembers = context.getCluster().getActiveMembers().size(); int activeMembers = context.getCluster().getActiveMembers().size();
Expand All @@ -257,7 +257,7 @@ private void commit(MemberState member) {
if (member.getNextIndex() == 0) if (member.getNextIndex() == 0)
member.setNextIndex(context.getLog().lastIndex()); member.setNextIndex(context.getLog().lastIndex());


if (!committing.contains(member.getId())) { if (!committing.contains(member.getMember().id())) {
long prevIndex = getPrevIndex(member); long prevIndex = getPrevIndex(member);
RaftEntry prevEntry = getPrevEntry(prevIndex); RaftEntry prevEntry = getPrevEntry(prevIndex);
List<RaftEntry> entries = getEntries(prevIndex); List<RaftEntry> entries = getEntries(prevIndex);
Expand Down Expand Up @@ -323,11 +323,11 @@ private void commit(MemberState member, long prevIndex, RaftEntry prevEntry, Lis
.withGlobalIndex(context.getGlobalIndex()) .withGlobalIndex(context.getGlobalIndex())
.build(); .build();


committing.add(member.getId()); committing.add(member.getMember().id());
LOGGER.debug("{} - Sent {} to {}", context.getMemberId(), request, member); LOGGER.debug("{} - Sent {} to {}", context.getMemberId(), request, member);
context.getConnections().getConnection(context.getMembers().member(member.getId())).thenAccept(connection -> { context.getConnections().getConnection(member.getMember()).thenAccept(connection -> {
connection.<AppendRequest, AppendResponse>send(request).whenCompleteAsync((response, error) -> { connection.<AppendRequest, AppendResponse>send(request).whenCompleteAsync((response, error) -> {
committing.remove(member.getId()); committing.remove(member.getMember().id());
context.checkThread(); context.checkThread();


if (isOpen()) { if (isOpen()) {
Expand Down

0 comments on commit 2f36c6e

Please sign in to comment.