Skip to content

Commit

Permalink
Refactor asynchronous replication to use sessions for passive members…
Browse files Browse the repository at this point in the history
…hip.
  • Loading branch information
kuujo committed Jul 23, 2015
1 parent 654997a commit 1d3157e
Show file tree
Hide file tree
Showing 27 changed files with 168 additions and 1,232 deletions.
Expand Up @@ -217,7 +217,7 @@ public RaftClient build() {
// Resolve Alleycat serializable types with the ServiceLoaderResolver.
serializer.resolve(new ServiceLoaderResolver());

return new RaftClient(new RaftClientState(transport, members, serializer).setKeepAliveInterval(keepAliveInterval));
return new RaftClient(new RaftClientState(members, transport, serializer).setKeepAliveInterval(keepAliveInterval));
}
}

Expand Down
Expand Up @@ -52,11 +52,11 @@
public class RaftClientState implements Managed<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftClientState.class);
private static final long REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
private final int id;
private final Member member;
private final Members members;
private final Transport transport;
private final Client client;
private Member member;
private Member remote;
private Connection connection;
private final Context context;
private CompletableFuture<Void> registerFuture;
Expand All @@ -75,18 +75,24 @@ public class RaftClientState implements Managed<Void> {
private volatile long response;
private volatile long version;

public RaftClientState(Transport transport, Members members, Alleycat serializer) {
this(0, transport, members, serializer);
public RaftClientState(Members members, Transport transport, Alleycat serializer) {
this(Member.CLIENT, members, transport, serializer);
}

protected RaftClientState(int clientId, Transport transport, Members members, Alleycat serializer) {
protected RaftClientState(Member member, Members members, Transport transport, Alleycat serializer) {
if (member == null)
throw new NullPointerException("member cannot be null");
if (members == null)
throw new NullPointerException("members cannot be null");
if (transport == null)
throw new NullPointerException("transport cannot be null");
if (serializer == null)
throw new NullPointerException("serializer cannot be null");

this.id = clientId;
this.context = new SingleThreadContext("copycat-client-" + clientId, serializer.clone());
this.member = member;
this.members = members;
this.transport = transport;
this.context = new SingleThreadContext("copycat-client-" + member.id(), serializer.clone());
this.client = transport.client(UUID.randomUUID());
this.session = new ClientSession(context);
}
Expand Down Expand Up @@ -270,7 +276,7 @@ public RaftClientState setKeepAliveInterval(long keepAliveInterval) {
* @return A completable future to be completed once the connection has been connected.
*/
protected CompletableFuture<Connection> getConnection(Member member) {
if (connection != null && member.equals(this.member)) {
if (connection != null && member.equals(this.remote)) {
return CompletableFuture.completedFuture(connection);
}

Expand All @@ -283,7 +289,7 @@ protected CompletableFuture<Connection> getConnection(Member member) {

Function<Connection, Connection> connectHandler = connection -> {
this.connection = connection;
this.member = member;
this.remote = member;
session.connect(connection);
connection.closeListener(c -> this.connection = null);
connection.exceptionListener(e -> this.connection = null);
Expand Down Expand Up @@ -620,7 +626,7 @@ protected CompletableFuture<RegisterResponse> register(List<Member> members, Com
Member member = selectMember(members);

RegisterRequest request = RegisterRequest.builder()
.withMember(id)
.withMember(member)
.withConnection(client.id())
.build();

Expand Down
42 changes: 26 additions & 16 deletions protocol/src/main/java/net/kuujo/copycat/raft/Member.java
Expand Up @@ -27,6 +27,14 @@
*/
public class Member implements AlleycatSerializable {

/**
* Default client member.
*/
public static final Member CLIENT = builder()
.withId(0)
.withType(Type.CLIENT)
.build();

/**
* Member type.
*/
Expand All @@ -35,18 +43,32 @@ public static enum Type {
/**
* Represents an active voting member.
*/
ACTIVE,
ACTIVE(true),

/**
* Represents a passive non-voting member.
*/
PASSIVE,
PASSIVE(true),

/**
* Represents a pure client.
*/
CLIENT
CLIENT(false);

private final boolean replica;

private Type(boolean replica) {
this.replica = replica;
}

/**
* Returns a boolean value indicating whether the member type is a replica type.
*
* @return Indicates whether the member type is a replica type.
*/
public boolean isReplica() {
return replica;
}
}

/**
Expand All @@ -60,7 +82,6 @@ public static Builder builder() {

private int id;
private Type type = Type.ACTIVE;
private long session;
private String host;
private int port;

Expand Down Expand Up @@ -89,15 +110,6 @@ public Type type() {
return type;
}

/**
* Returns the member session ID.
*
* @return The member session ID.
*/
public long session() {
return session;
}

/**
* Returns the member host.
*
Expand All @@ -121,8 +133,7 @@ public void writeObject(BufferOutput buffer, Alleycat alleycat) {
buffer.writeInt(id)
.writeByte(type.ordinal())
.writeString(host)
.writeInt(port)
.writeLong(session);
.writeInt(port);
}

@Override
Expand All @@ -131,7 +142,6 @@ public void readObject(BufferInput buffer, Alleycat alleycat) {
type = Type.values()[buffer.readByte()];
host = buffer.readString();
port = buffer.readInt();
session = buffer.readLong();
}

/**
Expand Down

This file was deleted.

0 comments on commit 1d3157e

Please sign in to comment.