Skip to content

Commit

Permalink
Refactor Raft protocol methods for coding standards.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 23, 2017
1 parent 16cc215 commit 18809b5
Show file tree
Hide file tree
Showing 66 changed files with 443 additions and 444 deletions.
Expand Up @@ -95,7 +95,7 @@ public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataR

@Override
public void reset(ResetRequest request) {
clusterCommunicator.broadcast(request, context.resetSubject(request.session()), serializer::encode);
clusterCommunicator.broadcast(request, context.resetSubject(request.getSession()), serializer::encode);
}

@Override
Expand Down
Expand Up @@ -152,7 +152,7 @@ public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest

@Override
public void publish(MemberId memberId, PublishRequest request) {
clusterCommunicator.unicast(request, context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.value()));
clusterCommunicator.unicast(request, context.publishSubject(request.getSession()), serializer::encode, NodeId.nodeId(memberId.value()));
}

@Override
Expand Down
Expand Up @@ -44,8 +44,8 @@ public interface RaftClient {
* @return The client builder.
*/
@SuppressWarnings("unchecked")
static Builder builder() {
return builder(Collections.EMPTY_LIST);
static Builder newBuilder() {
return newBuilder(Collections.EMPTY_LIST);
}

/**
Expand All @@ -58,8 +58,8 @@ static Builder builder() {
* @param cluster The cluster to which to connect.
* @return The client builder.
*/
static Builder builder(MemberId... cluster) {
return builder(Arrays.asList(cluster));
static Builder newBuilder(MemberId... cluster) {
return newBuilder(Arrays.asList(cluster));
}

/**
Expand All @@ -72,7 +72,7 @@ static Builder builder(MemberId... cluster) {
* @param cluster The cluster to which to connect.
* @return The client builder.
*/
static Builder builder(Collection<MemberId> cluster) {
static Builder newBuilder(Collection<MemberId> cluster) {
return new DefaultRaftClient.Builder(cluster);
}

Expand Down Expand Up @@ -150,7 +150,7 @@ default CompletableFuture<RaftClient> connect(MemberId... members) {
/**
* Builds a new Copycat client.
* <p>
* New client builders should be constructed using the static {@link #builder()} factory method.
* New client builders should be constructed using the static {@link #newBuilder()} factory method.
* <pre>
* {@code
* CopycatClient client = CopycatClient.builder(new Address("123.456.789.0", 5000), new Address("123.456.789.1", 5000)
Expand Down
Expand Up @@ -165,8 +165,8 @@ public interface RaftServer {
*
* @return The server builder.
*/
static Builder builder() {
return builder(null);
static Builder newBuilder() {
return newBuilder(null);
}

/**
Expand All @@ -177,7 +177,7 @@ static Builder builder() {
* @param localMemberId The local node identifier.
* @return The server builder.
*/
static Builder builder(MemberId localMemberId) {
static Builder newBuilder(MemberId localMemberId) {
return new DefaultRaftServer.Builder(localMemberId);
}

Expand Down Expand Up @@ -258,7 +258,7 @@ enum Role {
* <p>
* The {@link RaftCluster} is representative of the server's current view of the cluster configuration. The first time
* the server is {@link #bootstrap() started}, the cluster configuration will be initialized using the {@link MemberId}
* list provided to the server {@link #builder(MemberId) builder}. For {@link StorageLevel#DISK persistent}
* list provided to the server {@link #newBuilder(MemberId) builder}. For {@link StorageLevel#DISK persistent}
* servers, subsequent starts will result in the last known cluster configuration being loaded from disk.
* <p>
* The returned {@link RaftCluster} can be used to modify the state of the cluster to which this server belongs. Note,
Expand Down Expand Up @@ -464,7 +464,7 @@ enum Role {
* This builder should be used to programmatically configure and construct a new {@link RaftServer} instance.
* The builder provides methods for configuring all aspects of a Copycat server. The {@code CopycatServer.Builder}
* class cannot be instantiated directly. To create a new builder, use one of the
* {@link RaftServer#builder(MemberId) server builder factory} methods.
* {@link RaftServer#newBuilder(MemberId) server builder factory} methods.
* <pre>
* {@code
* CopycatServer.Builder builder = CopycatServer.builder(address);
Expand Down
Expand Up @@ -195,22 +195,22 @@ private void configure(RaftMember.Type type, CompletableFuture<Void> future) {
// Attempt to leave the cluster by submitting a LeaveRequest directly to the server state.
// Non-leader states should forward the request to the leader if there is one. Leader states
// will log, replicate, and commit the reconfiguration.
cluster.getContext().getRaftRole().onReconfigure(ReconfigureRequest.builder()
cluster.getContext().getRaftRole().onReconfigure(ReconfigureRequest.newBuilder()
.withIndex(cluster.getConfiguration().getIndex())
.withTerm(cluster.getConfiguration().getTerm())
.withMember(new DefaultRaftMember(id, type, status, updated))
.build()).whenComplete((response, error) -> {
if (error == null) {
if (response.status() == RaftResponse.Status.OK) {
if (response.getStatus() == RaftResponse.Status.OK) {
cancelConfigureTimer();
cluster.configure(new Configuration(response.index(), response.term(), response.timestamp(), response.members()));
cluster.configure(new Configuration(response.getIndex(), response.getTerm(), response.getTimestamp(), response.getMembers()));
future.complete(null);
} else if (response.error() == null || response.error() == RaftError.Type.NO_LEADER_ERROR) {
} else if (response.getError() == null || response.getError() == RaftError.Type.NO_LEADER_ERROR) {
cancelConfigureTimer();
configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout().multipliedBy(2), () -> configure(type, future));
} else {
cancelConfigureTimer();
future.completeExceptionally(response.error().createException());
future.completeExceptionally(response.getError().createException());
}
}
});
Expand Down
Expand Up @@ -385,18 +385,18 @@ private void join(Iterator<RaftMemberContext> iterator) {
RaftMemberContext member = iterator.next();
LOGGER.debug("{} - Attempting to join via {}", getMember().getMemberId(), member.getMember().getMemberId());

JoinRequest request = JoinRequest.builder()
JoinRequest request = JoinRequest.newBuilder()
.withMember(new DefaultRaftMember(getMember().getMemberId(), getMember().getType(), getMember().getStatus(), getMember().getLastUpdated()))
.build();
context.getProtocol().join(member.getMember().getMemberId(), request).whenComplete((response, error) -> {
// Cancel the join timer.
cancelJoinTimer();

if (error == null) {
if (response.status() == RaftResponse.Status.OK) {
if (response.getStatus() == RaftResponse.Status.OK) {
LOGGER.info("{} - Successfully joined via {}", getMember().getMemberId(), member.getMember().getMemberId());

Configuration configuration = new Configuration(response.index(), response.term(), response.timestamp(), response.members());
Configuration configuration = new Configuration(response.getIndex(), response.getTerm(), response.getTimestamp(), response.getMembers());

// Configure the cluster with the join response.
// Commit the configuration as we know it was committed via the successful join response.
Expand All @@ -408,7 +408,7 @@ private void join(Iterator<RaftMemberContext> iterator) {
} else if (joinFuture != null) {
joinFuture.complete(null);
}
} else if (response.error() == null || response.error() == RaftError.Type.CONFIGURATION_ERROR) {
} else if (response.getError() == null || response.getError() == RaftError.Type.CONFIGURATION_ERROR) {
// If the response error is null, that indicates that no error occurred but the leader was
// in a state that was incapable of handling the join request. Attempt to join the leader
// again after an election timeout.
Expand Down Expand Up @@ -496,14 +496,14 @@ private void leave(CompletableFuture<Void> future) {
// Attempt to leave the cluster by submitting a LeaveRequest directly to the server state.
// Non-leader states should forward the request to the leader if there is one. Leader states
// will log, replicate, and commit the reconfiguration.
context.getRaftRole().onLeave(LeaveRequest.builder()
context.getRaftRole().onLeave(LeaveRequest.newBuilder()
.withMember(getMember())
.build()).whenComplete((response, error) -> {
// Cancel the leave timer.
cancelLeaveTimer();

if (error == null && response.status() == RaftResponse.Status.OK) {
Configuration configuration = new Configuration(response.index(), response.term(), response.timestamp(), response.members());
if (error == null && response.getStatus() == RaftResponse.Status.OK) {
Configuration configuration = new Configuration(response.getIndex(), response.getTerm(), response.getTimestamp(), response.getMembers());

// Configure the cluster and commit the configuration as we know the successful response
// indicates commitment.
Expand Down
Expand Up @@ -67,12 +67,12 @@ public Collection<MemberId> getServers() {
*/
private CompletableFuture<MetadataResponse> getMetadata() {
CompletableFuture<MetadataResponse> future = new CompletableFuture<>();
connection.metadata(MetadataRequest.builder().build()).whenComplete((response, error) -> {
connection.metadata(MetadataRequest.newBuilder().build()).whenComplete((response, error) -> {
if (error == null) {
if (response.status() == RaftResponse.Status.OK) {
if (response.getStatus() == RaftResponse.Status.OK) {
future.complete(response);
} else {
future.completeExceptionally(response.error().createException());
future.completeExceptionally(response.getError().createException());
}
} else {
future.completeExceptionally(error);
Expand All @@ -83,12 +83,12 @@ private CompletableFuture<MetadataResponse> getMetadata() {

@Override
public CompletableFuture<Set<RaftSessionMetadata>> getSessions() {
return getMetadata().thenApply(MetadataResponse::sessions);
return getMetadata().thenApply(MetadataResponse::getSessions);
}

@Override
public CompletableFuture<Set<RaftSessionMetadata>> getSessions(String type) {
return getMetadata().thenApply(response -> response.sessions().stream().filter(s -> s.getTypeName().equals(type)).collect(Collectors.toSet()));
return getMetadata().thenApply(response -> response.getSessions().stream().filter(s -> s.getTypeName().equals(type)).collect(Collectors.toSet()));
}

}
Expand Up @@ -35,12 +35,12 @@ protected AbstractRaftResponse(Status status, RaftError error) {
}

@Override
public Status status() {
public Status getStatus() {
return status;
}

@Override
public RaftError error() {
public RaftError getError() {
return error;
}

Expand Down
Expand Up @@ -41,7 +41,7 @@ public class AppendRequest extends AbstractRaftRequest {
*
* @return A new append request builder.
*/
public static Builder builder() {
public static Builder newBuilder() {
return new Builder();
}

Expand All @@ -66,7 +66,7 @@ public AppendRequest(long term, MemberId leader, long logIndex, long logTerm, Li
*
* @return The requesting node's current term.
*/
public long term() {
public long getTerm() {
return term;
}

Expand All @@ -75,7 +75,7 @@ public long term() {
*
* @return The leader's address.
*/
public MemberId leader() {
public MemberId getLeader() {
return leader;
}

Expand All @@ -84,7 +84,7 @@ public MemberId leader() {
*
* @return The index of the log entry preceding the new entry.
*/
public long logIndex() {
public long getLogIndex() {
return logIndex;
}

Expand All @@ -93,7 +93,7 @@ public long logIndex() {
*
* @return The index of the term preceding the new entry.
*/
public long logTerm() {
public long getLogTerm() {
return logTerm;
}

Expand All @@ -102,7 +102,7 @@ public long logTerm() {
*
* @return A list of log entries.
*/
public List<Indexed<RaftLogEntry>> entries() {
public List<Indexed<RaftLogEntry>> getEntries() {
return entries;
}

Expand All @@ -111,7 +111,7 @@ public List<Indexed<RaftLogEntry>> entries() {
*
* @return The leader commit index.
*/
public long commitIndex() {
public long getCommitIndex() {
return commitIndex;
}

Expand Down
Expand Up @@ -32,7 +32,7 @@ public class AppendResponse extends AbstractRaftResponse {
*
* @return A new append response builder.
*/
public static Builder builder() {
public static Builder newBuilder() {
return new Builder();
}

Expand All @@ -52,7 +52,7 @@ public AppendResponse(Status status, RaftError error, long term, boolean succeed
*
* @return The requesting node's current term.
*/
public long term() {
public long getTerm() {
return term;
}

Expand All @@ -70,7 +70,7 @@ public boolean succeeded() {
*
* @return The last index of the responding replica's log.
*/
public long logIndex() {
public long getLogIndex() {
return logIndex;
}

Expand Down
Expand Up @@ -25,7 +25,7 @@ public class CloseSessionRequest extends SessionRequest {
*
* @return A new unregister request builder.
*/
public static Builder builder() {
public static Builder newBuilder() {
return new Builder();
}

Expand Down
Expand Up @@ -29,7 +29,7 @@ public class CloseSessionResponse extends SessionResponse {
*
* @return A new keep alive response builder.
*/
public static Builder builder() {
public static Builder newBuilder() {
return new Builder();
}

Expand Down
Expand Up @@ -27,7 +27,7 @@
* <p>
* Command requests are submitted by clients to the Copycat cluster to commit commands to
* the replicated state machine. Each command request must be associated with a registered
* {@link #session()} and have a unique {@link #sequence()} number within that session. Commands will
* {@link #getSession()} and have a unique {@link #getSequence()} number within that session. Commands will
* be applied in the cluster in the order defined by the provided sequence number. Thus, sequence numbers
* should never be skipped. In the event of a failure of a command request, the request should be resent
* with the same sequence number. Commands are guaranteed to be applied in sequence order.
Expand All @@ -43,7 +43,7 @@ public class CommandRequest extends OperationRequest {
*
* @return A new submit request builder.
*/
public static Builder builder() {
public static Builder newBuilder() {
return new Builder();
}

Expand Down
Expand Up @@ -22,7 +22,7 @@
* <p>
* Command responses are sent by servers to clients upon the completion of a
* {@link CommandRequest}. Command responses are sent with the
* {@link #index()} (or index) of the state machine at the point at which the command was evaluated.
* {@link #getIndex()} (or index) of the state machine at the point at which the command was evaluated.
* This can be used by the client to ensure it sees state progress monotonically. Note, however, that
* command responses may not be sent or received in sequential order. If a command response has to await
* the completion of an event, or if the response is proxied through another server, responses may be
Expand All @@ -35,7 +35,7 @@ public class CommandResponse extends OperationResponse {
*
* @return A new submit response builder.
*/
public static Builder builder() {
public static Builder newBuilder() {
return new Builder();
}

Expand Down
Expand Up @@ -41,7 +41,7 @@ protected ConfigurationRequest(RaftMember member) {
*
* @return The member to configure.
*/
public RaftMember member() {
public RaftMember getMember() {
return member;
}

Expand Down

0 comments on commit 18809b5

Please sign in to comment.