Skip to content

Commit

Permalink
Make sessions remote only.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed May 27, 2015
1 parent c029154 commit 146239e
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 18 deletions.
Expand Up @@ -157,6 +157,9 @@ public CompletableFuture<Void> configure(MemberInfo... membersInfo) {
* Registers a member. * Registers a member.
*/ */
public CompletableFuture<Member> register(Session session) { public CompletableFuture<Member> register(Session session) {
if (session.member().id() == localMember.id())
return CompletableFuture.completedFuture(localMember);

ManagedRemoteMember member = remoteMembers.containsKey(session.member().id()) ? remoteMembers.get(session.member().id()) : createMember(session.member()); ManagedRemoteMember member = remoteMembers.containsKey(session.member().id()) ? remoteMembers.get(session.member().id()) : createMember(session.member());
return member.connect().thenApply(v -> { return member.connect().thenApply(v -> {
member.status = Member.Status.ALIVE; member.status = Member.Status.ALIVE;
Expand Down Expand Up @@ -254,7 +257,7 @@ public ManagedLocalMember member() {
} }


@Override @Override
public ManagedMember member(int id) { public Member member(int id) {
if (localMember.id() == id) if (localMember.id() == id)
return localMember; return localMember;
ManagedMember member = remoteMembers.get(id); ManagedMember member = remoteMembers.get(id);
Expand Down
Expand Up @@ -28,7 +28,6 @@ public abstract class ManagedMember implements Member {
protected final MemberInfo info; protected final MemberInfo info;
protected Type type; protected Type type;
protected Status status = Status.DEAD; protected Status status = Status.DEAD;
protected Session session;
protected final ExecutionContext context; protected final ExecutionContext context;


protected ManagedMember(MemberInfo info, Type type, ExecutionContext context) { protected ManagedMember(MemberInfo info, Type type, ExecutionContext context) {
Expand Down Expand Up @@ -65,11 +64,6 @@ public MemberInfo info() {
return info; return info;
} }


@Override
public Session session() {
return session;
}

@Override @Override
public boolean equals(Object object) { public boolean equals(Object object) {
return object instanceof Member && ((Member) object).id() == info.id(); return object instanceof Member && ((Member) object).id() == info.id();
Expand Down
Expand Up @@ -25,11 +25,17 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public abstract class ManagedRemoteMember extends ManagedMember implements RemoteMember { public abstract class ManagedRemoteMember extends ManagedMember implements RemoteMember {
protected Session session;


protected ManagedRemoteMember(MemberInfo info, Type type, ExecutionContext context) { protected ManagedRemoteMember(MemberInfo info, Type type, ExecutionContext context) {
super(info, type, context); super(info, type, context);
} }


@Override
public Session session() {
return session;
}

/** /**
* Connects the remote client. * Connects the remote client.
* *
Expand Down
7 changes: 0 additions & 7 deletions cluster/src/main/java/net/kuujo/copycat/cluster/Member.java
Expand Up @@ -93,13 +93,6 @@ static enum Status {
*/ */
MemberInfo info(); MemberInfo info();


/**
* Returns the member session.
*
* @return The member session.
*/
Session session();

/** /**
* Sends a message to the member.<p> * Sends a message to the member.<p>
* *
Expand Down
Expand Up @@ -21,4 +21,12 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public interface RemoteMember extends Member { public interface RemoteMember extends Member {

/**
* Returns the member session.
*
* @return The member session.
*/
Session session();

} }
21 changes: 21 additions & 0 deletions raft/src/main/java/net/kuujo/copycat/raft/state/RaftContext.java
Expand Up @@ -56,6 +56,7 @@ public class RaftContext implements Managed<RaftContext> {
private CompletableFuture<RaftContext> openFuture; private CompletableFuture<RaftContext> openFuture;
private long electionTimeout = 500; private long electionTimeout = 500;
private long heartbeatInterval = 250; private long heartbeatInterval = 250;
private long session;
private int leader; private int leader;
private long term; private long term;
private int lastVotedFor; private int lastVotedFor;
Expand Down Expand Up @@ -194,6 +195,26 @@ public RaftContext removeElectionListener(EventListener<Member> listener) {
return this; return this;
} }


/**
* Sets the state session.
*
* @param session The state session.
* @return The Raft context.
*/
RaftContext setSession(long session) {
this.session = session;
return this;
}

/**
* Returns the state session.
*
* @return The state session.
*/
public long getSession() {
return session;
}

/** /**
* Sets the state leader. * Sets the state leader.
* *
Expand Down
Expand Up @@ -86,6 +86,7 @@ private CompletableFuture<Long> register(List<Member> members, CompletableFuture
if (error == null) { if (error == null) {
context.setTerm(response.term()); context.setTerm(response.term());
context.setLeader(response.leader()); context.setLeader(response.leader());
context.setSession(response.session());
context.getCluster().configure(response.members().toArray(new TypedMemberInfo[response.members().size()])).whenComplete((configureResult, configureError) -> { context.getCluster().configure(response.members().toArray(new TypedMemberInfo[response.members().size()])).whenComplete((configureResult, configureError) -> {
if (configureError == null) { if (configureError == null) {
future.complete(response.session()); future.complete(response.session());
Expand Down Expand Up @@ -139,7 +140,7 @@ private CompletableFuture<Void> keepAlive(List<Member> members, CompletableFutur
} }


KeepAliveRequest request = KeepAliveRequest.builder() KeepAliveRequest request = KeepAliveRequest.builder()
.withSession(context.getCluster().member().session().id()) .withSession(context.getSession())
.build(); .build();
member.<KeepAliveRequest, KeepAliveResponse>send(context.getTopic(), request).whenComplete((response, error) -> { member.<KeepAliveRequest, KeepAliveResponse>send(context.getTopic(), request).whenComplete((response, error) -> {
if (error == null) { if (error == null) {
Expand Down
Expand Up @@ -128,7 +128,7 @@ public TestCluster build() {
info = new TestMember.Info(memberId, address); info = new TestMember.Info(memberId, address);
} }


TestLocalMember localMember = new TestLocalMember(info, type, serializer != null ? serializer : new Serializer(), new ExecutionContext(String.format("copycat-cluster-%d", memberId))); TestLocalMember localMember = new TestLocalMember(info, type, member != null, serializer != null ? serializer : new Serializer(), new ExecutionContext(String.format("copycat-cluster-%d", memberId)));
return new TestCluster(localMember, members.values().stream().map(m -> (TestRemoteMember) m).collect(Collectors.toList()), registry, serializer); return new TestCluster(localMember, members.values().stream().map(m -> (TestRemoteMember) m).collect(Collectors.toList()), registry, serializer);
} }
} }
Expand Down
Expand Up @@ -35,8 +35,8 @@ public class TestLocalMember extends ManagedLocalMember implements TestMember {
private final Map<String, HandlerHolder> handlers = new HashMap<>(); private final Map<String, HandlerHolder> handlers = new HashMap<>();
private TestMemberRegistry registry; private TestMemberRegistry registry;


TestLocalMember(TestMember.Info info, Member.Type type, Serializer serializer, ExecutionContext context) { TestLocalMember(TestMember.Info info, Member.Type type, boolean seed, Serializer serializer, ExecutionContext context) {
super(info, type, context); super(info, type, seed, context);
this.serializer = serializer; this.serializer = serializer;
this.info = info; this.info = info;
} }
Expand Down

0 comments on commit 146239e

Please sign in to comment.