Skip to content

Commit

Permalink
Improve Raft client protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 14, 2017
1 parent a03b84f commit 3175e3d
Show file tree
Hide file tree
Showing 45 changed files with 1,902 additions and 1,901 deletions.
Expand Up @@ -166,6 +166,7 @@ default CompletableFuture<RaftClient> connect(NodeId... members) {
final class Builder implements io.atomix.util.Builder<RaftClient> {
private final Collection<NodeId> cluster;
private String clientId = UUID.randomUUID().toString();
private NodeId nodeId;
private RaftClientProtocol protocol;
private int threadPoolSize = Runtime.getRuntime().availableProcessors();

Expand All @@ -188,6 +189,18 @@ public Builder withClientId(String clientId) {
return this;
}

/**
* Sets the local node identifier.
*
* @param nodeId The local node identifier.
* @return The client builder.
* @throws NullPointerException if {@code nodeId} is null
*/
public Builder withNodeId(NodeId nodeId) {
this.nodeId = checkNotNull(nodeId, "nodeId cannot be null");
return this;
}

/**
* Sets the client protocol.
*
Expand Down Expand Up @@ -215,8 +228,9 @@ public Builder withThreadPoolSize(int threadPoolSize) {

@Override
public RaftClient build() {
checkNotNull(nodeId, "nodeId cannot be null");
ScheduledExecutorService executor = Executors.newScheduledThreadPool(threadPoolSize);
return new DefaultRaftClient(clientId, cluster, protocol, executor);
return new DefaultRaftClient(clientId, nodeId, cluster, protocol, executor);
}
}
}
Expand Up @@ -45,14 +45,15 @@ public class DefaultRaftClient implements RaftClient {

public DefaultRaftClient(
String clientId,
NodeId nodeId,
Collection<NodeId> cluster,
RaftClientProtocol protocol,
ScheduledExecutorService threadPoolExecutor) {
this.clientId = checkNotNull(clientId, "clientId cannot be null");
this.cluster = checkNotNull(cluster, "cluster cannot be null");
this.threadPoolExecutor = checkNotNull(threadPoolExecutor, "threadPoolExecutor cannot be null");
this.metadata = new DefaultRaftMetadataClient(clientId, protocol, selectorManager);
this.sessionManager = new RaftSessionManager(clientId, protocol, selectorManager, threadPoolExecutor);
this.sessionManager = new RaftSessionManager(clientId, nodeId, protocol, selectorManager, threadPoolExecutor);
}

@Override
Expand Down
Expand Up @@ -18,13 +18,13 @@
import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.client.CommunicationStrategies;
import io.atomix.protocols.raft.client.RaftMetadataClient;
import io.atomix.protocols.raft.metadata.RaftSessionMetadata;
import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.protocols.raft.metadata.RaftSessionMetadata;
import io.atomix.protocols.raft.session.impl.NodeSelectorManager;
import io.atomix.protocols.raft.session.impl.RaftClientConnection;
import io.atomix.protocols.raft.session.impl.RaftConnection;

import java.util.Collection;
import java.util.Set;
Expand All @@ -38,11 +38,11 @@
*/
public class DefaultRaftMetadataClient implements RaftMetadataClient {
private final NodeSelectorManager selectorManager;
private final RaftClientConnection connection;
private final RaftConnection connection;

public DefaultRaftMetadataClient(String clientId, RaftClientProtocol protocol, NodeSelectorManager selectorManager) {
this.selectorManager = checkNotNull(selectorManager, "selectorManager cannot be null");
this.connection = new RaftClientConnection(clientId, protocol.dispatcher(), selectorManager.createSelector(CommunicationStrategies.LEADER));
this.connection = new RaftConnection(clientId, protocol.dispatcher(), selectorManager.createSelector(CommunicationStrategies.LEADER));
}

@Override
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.protocols.raft.server.cluster;
package io.atomix.protocols.raft.cluster;

import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.server.RaftServer;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.protocols.raft.server.cluster;
package io.atomix.protocols.raft.cluster;

import io.atomix.cluster.NodeId;
import io.atomix.util.temp.Listener;
Expand Down
Expand Up @@ -19,4 +19,4 @@
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
package io.atomix.protocols.raft.server.cluster;
package io.atomix.protocols.raft.cluster;
Expand Up @@ -15,7 +15,7 @@
*/
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

import java.util.Objects;

Expand Down
Expand Up @@ -16,7 +16,7 @@
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.error.RaftError;
import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

import java.util.Collection;
import java.util.Objects;
Expand Down
Expand Up @@ -16,7 +16,7 @@
package io.atomix.protocols.raft.protocol;

import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

import java.util.Collection;
import java.util.Objects;
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

/**
* Server join configuration change request.
Expand Down
Expand Up @@ -16,7 +16,7 @@
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.error.RaftError;
import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

import java.util.Collection;

Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

/**
* Server leave configuration request.
Expand Down
Expand Up @@ -16,7 +16,7 @@
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.error.RaftError;
import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

import java.util.Collection;

Expand Down
Expand Up @@ -15,6 +15,8 @@
*/
package io.atomix.protocols.raft.protocol;

import io.atomix.cluster.NodeId;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -35,25 +37,25 @@ public static Builder builder() {
return new Builder();
}

private final String client;
private final NodeId node;
private final String name;
private final String stateMachine;
private final long timeout;

public OpenSessionRequest(String client, String name, String stateMachine, long timeout) {
this.client = client;
public OpenSessionRequest(NodeId node, String name, String stateMachine, long timeout) {
this.node = node;
this.name = name;
this.stateMachine = stateMachine;
this.timeout = timeout;
}

/**
* Returns the client identifier.
* Returns the client node identifier.
*
* @return The client identifier.
* @return The client node identifier.
*/
public String client() {
return client;
public NodeId node() {
return node;
}

/**
Expand Down Expand Up @@ -92,7 +94,8 @@ public int hashCode() {
public boolean equals(Object object) {
if (object instanceof OpenSessionRequest) {
OpenSessionRequest request = (OpenSessionRequest) object;
return request.name.equals(name)
return request.node.equals(node)
&& request.name.equals(name)
&& request.stateMachine.equals(stateMachine)
&& request.timeout == timeout;
}
Expand All @@ -102,7 +105,7 @@ public boolean equals(Object object) {
@Override
public String toString() {
return toStringHelper(this)
.add("client", client)
.add("node", node)
.add("name", name)
.add("stateMachine", stateMachine)
.add("timeout", timeout)
Expand All @@ -113,20 +116,20 @@ public String toString() {
* Open session request builder.
*/
public static class Builder extends AbstractRaftRequest.Builder<Builder, OpenSessionRequest> {
private String client;
private NodeId node;
private String name;
private String stateMachine;
private long timeout;

/**
* Sets the client identifier.
* Sets the client node identifier.
*
* @param client The client identifier.
* @param node The client node identifier.
* @return The open session request builder.
* @throws NullPointerException if {@code client} is {@code null}
* @throws NullPointerException if {@code node} is {@code null}
*/
public Builder withClient(String client) {
this.client = checkNotNull(client, "client");
public Builder withNode(NodeId node) {
this.node = checkNotNull(node, "node cannot be null");
return this;
}

Expand All @@ -138,7 +141,7 @@ public Builder withClient(String client) {
* @throws NullPointerException if {@code name} is {@code null}
*/
public Builder withName(String name) {
this.name = checkNotNull(name, "name");
this.name = checkNotNull(name, "name cannot be null");
return this;
}

Expand All @@ -150,7 +153,7 @@ public Builder withName(String name) {
* @throws NullPointerException if {@code type} is {@code null}
*/
public Builder withStateMachine(String stateMachine) {
this.stateMachine = checkNotNull(stateMachine, "stateMachine");
this.stateMachine = checkNotNull(stateMachine, "stateMachine cannot be null");
return this;
}

Expand All @@ -170,9 +173,9 @@ public Builder withTimeout(long timeout) {
@Override
protected void validate() {
super.validate();
checkNotNull(client, "client");
checkNotNull(name, "name");
checkNotNull(stateMachine, "stateMachine");
checkNotNull(node, "client cannot be null");
checkNotNull(name, "name cannot be null");
checkNotNull(stateMachine, "stateMachine cannot be null");
checkArgument(timeout >= 0, "timeout must be positive");
}

Expand All @@ -182,7 +185,7 @@ protected void validate() {
@Override
public OpenSessionRequest build() {
validate();
return new OpenSessionRequest(client, name, stateMachine, timeout);
return new OpenSessionRequest(node, name, stateMachine, timeout);
}
}
}
Expand Up @@ -82,6 +82,6 @@ public CompletableFuture<MetadataResponse> metadata(NodeId nodeId, MetadataReque

@Override
public void reset(ResetRequest request) {
clusterCommunicator.broadcast(request, context.resetSubject, serializer::encode);
clusterCommunicator.broadcast(request, context.resetSubject(request.session), serializer::encode);
}
}
Expand Up @@ -28,7 +28,6 @@ class RaftMessageContext {
final MessageSubject querySubject;
final MessageSubject commandSubject;
final MessageSubject metadataSubject;
final MessageSubject resetSubject;
final MessageSubject joinSubject;
final MessageSubject leaveSubject;
final MessageSubject configureSubject;
Expand All @@ -46,7 +45,6 @@ class RaftMessageContext {
this.querySubject = getSubject(prefix, "query");
this.commandSubject = getSubject(prefix, "command");
this.metadataSubject = getSubject(prefix, "metadata");
this.resetSubject = getSubject(prefix, "reset");
this.joinSubject = getSubject(prefix, "join");
this.leaveSubject = getSubject(prefix, "leave");
this.configureSubject = getSubject(prefix, "configure");
Expand Down Expand Up @@ -78,4 +76,18 @@ MessageSubject publishSubject(long sessionId) {
return new MessageSubject(String.format("%s-publish-%d", prefix, sessionId));
}
}

/**
* Returns the reset subject for the given session.
*
* @param sessionId the session for which to return the reset subject
* @return the reset subject for the given session
*/
MessageSubject resetSubject(long sessionId) {
if (prefix == null) {
return new MessageSubject(String.format("reset-%d", sessionId));
} else {
return new MessageSubject(String.format("%s-reset-%d", prefix, sessionId));
}
}
}
Expand Up @@ -180,12 +180,12 @@ public void unregisterAppendHandler() {
}

@Override
public void registerResetListener(Consumer<ResetRequest> listener, Executor executor) {
clusterCommunicator.addSubscriber(context.resetSubject, serializer::decode, listener, executor);
public void registerResetListener(long sessionId, Consumer<ResetRequest> listener, Executor executor) {
clusterCommunicator.addSubscriber(context.resetSubject(sessionId), serializer::decode, listener, executor);
}

@Override
public void unregisterResetListener() {
clusterCommunicator.removeSubscriber(context.openSessionSubject);
public void unregisterResetListener(long sessionId) {
clusterCommunicator.removeSubscriber(context.resetSubject(sessionId));
}
}
Expand Up @@ -196,14 +196,17 @@ public interface RaftServerProtocolListener {
/**
* Registers a reset request listener.
*
* @param sessionId the session ID for which to register the listener
* @param listener the reset request listener to add
* @param executor the executor with which to execute the listener
*/
void registerResetListener(Consumer<ResetRequest> listener, Executor executor);
void registerResetListener(long sessionId, Consumer<ResetRequest> listener, Executor executor);

/**
* Unregisters the given reset request listener.
*
* @param sessionId the session ID for which to unregister the listener
*/
void unregisterResetListener();
void unregisterResetListener(long sessionId);

}
Expand Up @@ -15,7 +15,7 @@
*/
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

import java.util.Objects;

Expand Down
Expand Up @@ -16,7 +16,7 @@
package io.atomix.protocols.raft.protocol;

import io.atomix.protocols.raft.error.RaftError;
import io.atomix.protocols.raft.server.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.RaftMember;

import java.util.Collection;

Expand Down

0 comments on commit 3175e3d

Please sign in to comment.