Skip to content

Commit

Permalink
Ensure Raft states properly register cluster message handlers.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 25, 2015
1 parent 40d3a6f commit a9e69ca
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 49 deletions.
Expand Up @@ -470,12 +470,13 @@ public CompletableFuture<Buffer> delete(Buffer key, Buffer entry, Consistency co
/** /**
* Wraps a call to the state context in the context executor. * Wraps a call to the state context in the context executor.
*/ */
private <T extends Request, U extends Response> CompletableFuture<U> runOnContext(T request, ProtocolHandler<T, U> handler) { @SuppressWarnings("unchecked")
private <T extends Request, U extends Response> CompletableFuture<U> runOnContext(T request, RaftState state) {
CompletableFuture<U> future = new CompletableFuture<>(); CompletableFuture<U> future = new CompletableFuture<>();
context.execute(() -> { context.execute(() -> {
handler.apply(request).whenComplete((response, error) -> { state.handle(request).whenComplete((response, error) -> {
if (error == null) { if (error == null) {
future.complete(response); future.complete((U) response);
} else { } else {
future.completeExceptionally(error); future.completeExceptionally(error);
} }
Expand Down
21 changes: 12 additions & 9 deletions raft/src/main/java/net/kuujo/copycat/protocol/raft/RaftState.java
Expand Up @@ -15,6 +15,7 @@
*/ */
package net.kuujo.copycat.protocol.raft; package net.kuujo.copycat.protocol.raft;


import net.kuujo.copycat.cluster.MessageHandler;
import net.kuujo.copycat.protocol.raft.rpc.*; import net.kuujo.copycat.protocol.raft.rpc.*;
import net.kuujo.copycat.util.Managed; import net.kuujo.copycat.util.Managed;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -27,7 +28,7 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
abstract class RaftState implements ProtocolHandler<Request, Response>, Managed<RaftState> { abstract class RaftState implements MessageHandler<Request, Response>, Managed<RaftState> {
protected final Logger LOGGER = LoggerFactory.getLogger(getClass()); protected final Logger LOGGER = LoggerFactory.getLogger(getClass());
protected final RaftProtocol context; protected final RaftProtocol context;
private volatile boolean open; private volatile boolean open;
Expand Down Expand Up @@ -122,6 +123,7 @@ protected final <R extends Response> R logResponse(R response) {
@Override @Override
public CompletableFuture<RaftState> open() { public CompletableFuture<RaftState> open() {
context.checkThread(); context.checkThread();
context.getCluster().member().registerHandler(context.getTopic(), this);
open = true; open = true;
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
Expand All @@ -132,23 +134,23 @@ public boolean isOpen() {
} }


@Override @Override
public CompletableFuture<? extends Response> apply(Request request) { public CompletableFuture<Response> handle(Request request) {
context.checkThread(); context.checkThread();
switch (request.type()) { switch (request.type()) {
case APPEND: case APPEND:
return append(request.asAppendRequest()); return append(request.asAppendRequest()).thenApply(Response::asAppendResponse);
case SYNC: case SYNC:
return sync(request.asSyncRequest()); return sync(request.asSyncRequest()).thenApply(Response::asSyncResponse);
case POLL: case POLL:
return poll(request.asPollRequest()); return poll(request.asPollRequest()).thenApply(Response::asPollResponse);
case VOTE: case VOTE:
return vote(request.asVoteRequest()); return vote(request.asVoteRequest()).thenApply(Response::asVoteResponse);
case WRITE: case WRITE:
return write(request.asWriteRequest()); return write(request.asWriteRequest()).thenApply(Response::asWriteResponse);
case READ: case READ:
return read(request.asReadRequest()); return read(request.asReadRequest()).thenApply(Response::asReadResponse);
case DELETE: case DELETE:
return delete(request.asDeleteRequest()); return delete(request.asDeleteRequest()).thenApply(Response::asDeleteResponse);
} }
throw new IllegalArgumentException("invalid request type"); throw new IllegalArgumentException("invalid request type");
} }
Expand Down Expand Up @@ -191,6 +193,7 @@ public CompletableFuture<? extends Response> apply(Request request) {
@Override @Override
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
context.checkThread(); context.checkThread();
context.getCluster().member().unregisterHandler(context.getTopic());
open = false; open = false;
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
Expand Down
Expand Up @@ -71,7 +71,7 @@ protected CompletableFuture<DeleteResponse> delete(DeleteRequest request) {
} }


@Override @Override
public CompletableFuture<? extends Response> apply(Request request) { public CompletableFuture<Response> handle(Request request) {
return exceptionalFuture(new IllegalStateException("inactive state")); return exceptionalFuture(new IllegalStateException("inactive state"));
} }


Expand Down

This file was deleted.

@@ -0,0 +1,96 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.protocol.raft;

import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.cluster.*;
import net.kuujo.copycat.protocol.raft.storage.BufferedStorage;
import net.kuujo.copycat.util.ExecutionContext;
import org.testng.annotations.Test;

/**
* Raft protocol test.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
@Test
public class RaftProtocolTest extends ConcurrentTestCase {

/**
* Tests that the protocol elects a leader.
*/
public void testElectLeader() throws Throwable {
RaftTestMemberRegistry registry = new RaftTestMemberRegistry();

RaftProtocol protocol1 = buildProtocol(1, 3, registry);
RaftProtocol protocol2 = buildProtocol(2, 3, registry);
RaftProtocol protocol3 = buildProtocol(3, 3, registry);

expectResumes(3);

protocol1.open().thenRun(this::resume);
protocol2.open().thenRun(this::resume);
protocol3.open().thenRun(this::resume);

await();
}

/**
* Builds a Raft test cluster.
*/
private RaftTestCluster buildCluster(int id, int nodes, RaftTestMemberRegistry registry) {
RaftTestCluster.Builder builder = RaftTestCluster.builder()
.withRegistry(registry)
.withLocalMember(RaftTestLocalMember.builder()
.withId(id)
.withType(Member.Type.ACTIVE)
.withAddress(String.format("test-%d", id))
.build());

for (int i = 1; i <= nodes; i++) {
if (i != id) {
builder.addRemoteMember(RaftTestRemoteMember.builder()
.withId(i)
.withType(Member.Type.ACTIVE)
.withAddress(String.format("test-%d", i))
.build());
}
}

return builder.build();
}

/**
* Creates a Raft protocol for the given node.
*/
private RaftProtocol buildProtocol(int id, int nodes, RaftTestMemberRegistry registry) {
ManagedCluster cluster = buildCluster(id, nodes, registry);
cluster.open().join();

RaftProtocol protocol = (RaftProtocol) RaftProtocol.builder()
.withContext(new ExecutionContext("test-" + id))
.withStorage(BufferedStorage.builder()
.withName(String.format("test-%d", id))
.withDirectory(String.format("test-logs/test-%d", id))
.build())
.build();

protocol.setCluster(cluster);
protocol.setTopic("test");
return protocol;
}

}

0 comments on commit a9e69ca

Please sign in to comment.