Skip to content

Commit

Permalink
Refactor Raft implementation to replace passive members with Raft mem…
Browse files Browse the repository at this point in the history
…bership changes.
  • Loading branch information
kuujo committed Jul 26, 2015
1 parent 2f36c6e commit 0f04d7a
Show file tree
Hide file tree
Showing 42 changed files with 3,564 additions and 2,601 deletions.
Expand Up @@ -43,9 +43,9 @@ public class AsyncReferenceTest extends ConcurrentTestCase {
*/
@SuppressWarnings("unchecked")
public void testSetGet() throws Throwable {
Servers servers = createCopycats(3, 2);
List<Copycat> servers = createCopycats(3);

Copycat copycat = servers.active.get(0);
Copycat copycat = servers.get(0);

Node node = copycat.create("/test").get();
AsyncReference<String> reference = node.create(AsyncReference.class).get();
Expand All @@ -67,9 +67,9 @@ public void testSetGet() throws Throwable {
*/
@SuppressWarnings("unchecked")
public void testChange() throws Throwable {
Servers servers = createCopycats(3, 2);
List<Copycat> servers = createCopycats(3);

Copycat copycat = servers.active.get(0);
Copycat copycat = servers.get(0);

Node node = copycat.create("/test").get();
AsyncReference<String> reference = node.create(AsyncReference.class).get();
Expand Down Expand Up @@ -97,16 +97,16 @@ public void testChange() throws Throwable {
* Tests compare-and-set.
*/
public void testCompareAndSet() throws Throwable {
Servers servers = createCopycats(3, 2);
List<Copycat> servers = createCopycats(3);

Node node1 = servers.active.get(0).create("/test").get();
Node node1 = servers.get(0).create("/test").get();
AsyncReference<Integer> reference1 = node1.create(AsyncReference.class).get();

expectResume();
reference1.set(1).thenRun(this::resume);
await();

Node node2 = servers.active.get(0).create("/test").get();
Node node2 = servers.get(0).create("/test").get();
AsyncReference<Integer> reference2 = node2.create(AsyncReference.class).get();

expectResume();
Expand All @@ -127,15 +127,15 @@ public void testCompareAndSet() throws Throwable {
/**
* Creates a Copycat instance.
*/
private Servers createCopycats(int activeNodes, int passiveNodes) throws Throwable {
private List<Copycat> createCopycats(int nodes) throws Throwable {
LocalServerRegistry registry = new LocalServerRegistry();

List<Copycat> active = new ArrayList<>();

expectResumes(activeNodes);
expectResumes(nodes);

Members.Builder builder = Members.builder();
for (int i = 1; i <= activeNodes; i++) {
for (int i = 1; i <= nodes; i++) {
builder.addMember(Member.builder()
.withId(i)
.withHost("localhost")
Expand All @@ -145,7 +145,7 @@ private Servers createCopycats(int activeNodes, int passiveNodes) throws Throwab

Members members = builder.build();

for (int i = 1; i <= activeNodes; i++) {
for (int i = 1; i <= nodes; i++) {
Copycat copycat = CopycatServer.builder()
.withMemberId(i)
.withMembers(members)
Expand All @@ -164,43 +164,7 @@ private Servers createCopycats(int activeNodes, int passiveNodes) throws Throwab

await();

List<Copycat> passive = new ArrayList<>();

expectResumes(passiveNodes);

for (int i = activeNodes + 1; i <= activeNodes + passiveNodes; i++) {
Copycat copycat = CopycatServer.builder()
.withMemberId(i)
.withMemberType(Member.Type.PASSIVE)
.withHost("localhost")
.withPort(5000 + i)
.withMembers(members)
.withTransport(LocalTransport.builder()
.withRegistry(registry)
.build())
.withLog(Log.builder()
.withStorageLevel(StorageLevel.MEMORY)
.build())
.build();

copycat.open().thenRun(this::resume);

passive.add(copycat);
}

await();

return new Servers(active, passive);
}

private static class Servers {
private final List<Copycat> active;
private final List<Copycat> passive;

private Servers(List<Copycat> active, List<Copycat> passive) {
this.active = active;
this.passive = passive;
}
return active;
}

}
26 changes: 13 additions & 13 deletions client/src/main/java/net/kuujo/copycat/raft/client/RaftClient.java
Expand Up @@ -18,7 +18,7 @@
import net.kuujo.alleycat.Alleycat;
import net.kuujo.alleycat.ServiceLoaderResolver;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.client.state.RaftClientState;
import net.kuujo.copycat.raft.client.state.ClientContext;
import net.kuujo.copycat.transport.Transport;
import net.kuujo.copycat.util.concurrent.Context;

Expand All @@ -41,37 +41,37 @@ public static Builder builder() {
return new Builder();
}

private final RaftClientState client;
private final ClientContext context;
private CompletableFuture<Raft> openFuture;
private CompletableFuture<Void> closeFuture;
private volatile boolean open;

private RaftClient(RaftClientState client) {
this.client = client;
protected RaftClient(ClientContext context) {
this.context = context;
}

@Override
public Context context() {
return client.getContext();
return context.getContext();
}

@Override
public Session session() {
return client.getSession();
return context.getSession();
}

@Override
public <T> CompletableFuture<T> submit(Command<T> command) {
if (!open)
throw new IllegalStateException("protocol not open");
return client.submit(command);
return context.submit(command);
}

@Override
public <T> CompletableFuture<T> submit(Query<T> query) {
if (!open)
throw new IllegalStateException("protocol not open");
return client.submit(query);
return context.submit(query);
}

@Override
Expand All @@ -83,13 +83,13 @@ public CompletableFuture<Raft> open() {
synchronized (this) {
if (openFuture == null) {
if (closeFuture == null) {
openFuture = client.open().thenApply(c -> {
openFuture = context.open().thenApply(c -> {
openFuture = null;
open = true;
return this;
});
} else {
openFuture = closeFuture.thenCompose(v -> client.open().thenApply(c -> {
openFuture = closeFuture.thenCompose(v -> context.open().thenApply(c -> {
openFuture = null;
open = true;
return this;
Expand All @@ -115,12 +115,12 @@ public CompletableFuture<Void> close() {
synchronized (this) {
if (closeFuture == null) {
if (openFuture == null) {
closeFuture = client.close().thenRun(() -> {
closeFuture = context.close().thenRun(() -> {
closeFuture = null;
open = false;
});
} else {
closeFuture = openFuture.thenCompose(c -> client.close().thenRun(() -> {
closeFuture = openFuture.thenCompose(c -> context.close().thenRun(() -> {
closeFuture = null;
open = false;
}));
Expand Down Expand Up @@ -223,7 +223,7 @@ public RaftClient build() {
// Resolve Alleycat serializable types with the ServiceLoaderResolver.
serializer.resolve(new ServiceLoaderResolver());

return new RaftClient(new RaftClientState(members, transport, serializer).setKeepAliveInterval(keepAliveInterval));
return new RaftClient(new ClientContext(members, transport, serializer).setKeepAliveInterval(keepAliveInterval));
}
}

Expand Down

0 comments on commit 0f04d7a

Please sign in to comment.