Skip to content

Commit

Permalink
Separate ClusterCommunicationService from RaftClient with protocol API.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 13, 2017
1 parent 789c599 commit 4102097
Show file tree
Hide file tree
Showing 28 changed files with 620 additions and 294 deletions.
30 changes: 11 additions & 19 deletions core/src/main/java/io/atomix/protocols/raft/client/RaftClient.java
Expand Up @@ -15,9 +15,9 @@
*/ */
package io.atomix.protocols.raft.client; package io.atomix.protocols.raft.client;


import io.atomix.cluster.ClusterCommunicationService;
import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.client.impl.DefaultRaftClient; import io.atomix.protocols.raft.client.impl.DefaultRaftClient;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;


import java.util.Arrays; import java.util.Arrays;
Expand Down Expand Up @@ -81,9 +81,9 @@ static Builder builder(Collection<NodeId> cluster) {
} }


/** /**
* Returns the client identifier. * Returns the globally unique client identifier.
* *
* @return The client identifier. * @return the globally unique client identifier
*/ */
String id(); String id();


Expand Down Expand Up @@ -166,8 +166,7 @@ default CompletableFuture<RaftClient> connect(NodeId... members) {
final class Builder implements io.atomix.util.Builder<RaftClient> { final class Builder implements io.atomix.util.Builder<RaftClient> {
private final Collection<NodeId> cluster; private final Collection<NodeId> cluster;
private String clientId = UUID.randomUUID().toString(); private String clientId = UUID.randomUUID().toString();
private String clusterName = "raft"; private RaftClientProtocol protocol;
private ClusterCommunicationService clusterCommunicator;
private int threadPoolSize = Runtime.getRuntime().availableProcessors(); private int threadPoolSize = Runtime.getRuntime().availableProcessors();


private Builder(Collection<NodeId> cluster) { private Builder(Collection<NodeId> cluster) {
Expand All @@ -185,19 +184,19 @@ private Builder(Collection<NodeId> cluster) {
* @throws NullPointerException if {@code clientId} is null * @throws NullPointerException if {@code clientId} is null
*/ */
public Builder withClientId(String clientId) { public Builder withClientId(String clientId) {
this.clientId = checkNotNull(clientId, "clientId"); this.clientId = checkNotNull(clientId, "clientId cannot be null");
return this; return this;
} }


/** /**
* Sets the cluster name. * Sets the client protocol.
* *
* @param clusterName the cluster name * @param protocol the client protocol
* @return the client builder * @return the client builder
* @throws NullPointerException if the cluster name is null * @throws NullPointerException if the protocol is null
*/ */
public Builder withClusterName(String clusterName) { public Builder withProtocol(RaftClientProtocol protocol) {
this.clusterName = checkNotNull(clusterName, "clusterName cannot be null"); this.protocol = checkNotNull(protocol, "protocol cannot be null");
return this; return this;
} }


Expand All @@ -216,15 +215,8 @@ public Builder withThreadPoolSize(int threadPoolSize) {


@Override @Override
public RaftClient build() { public RaftClient build() {
checkNotNull(clusterName, "clusterName cannot be null");
ScheduledExecutorService executor = Executors.newScheduledThreadPool(threadPoolSize); ScheduledExecutorService executor = Executors.newScheduledThreadPool(threadPoolSize);
return new DefaultRaftClient( return new DefaultRaftClient(clientId, cluster, protocol, executor);
clientId,
clusterName,
cluster,
clusterCommunicator,
executor);
} }
} }

} }
Expand Up @@ -15,13 +15,13 @@
*/ */
package io.atomix.protocols.raft.client.impl; package io.atomix.protocols.raft.client.impl;


import io.atomix.cluster.ClusterCommunicationService;
import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.client.RaftClient; import io.atomix.protocols.raft.client.RaftClient;
import io.atomix.protocols.raft.client.RaftMetadataClient; import io.atomix.protocols.raft.client.RaftMetadataClient;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.session.impl.RaftSessionManager;
import io.atomix.protocols.raft.session.impl.NodeSelectorManager; import io.atomix.protocols.raft.session.impl.NodeSelectorManager;
import io.atomix.protocols.raft.session.impl.RaftSessionManager;


import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
Expand All @@ -36,33 +36,28 @@
* @author <a href="http://github.com/kuujo>Jordan Halterman</a> * @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/ */
public class DefaultRaftClient implements RaftClient { public class DefaultRaftClient implements RaftClient {
private static final String DEFAULT_HOST = "0.0.0.0"; private final String clientId;
private static final int DEFAULT_PORT = 8700;
private final String id;
private final Collection<NodeId> cluster; private final Collection<NodeId> cluster;
private final ClusterCommunicationService communicationService;
private final ScheduledExecutorService threadPoolExecutor; private final ScheduledExecutorService threadPoolExecutor;
private final RaftMetadataClient metadata; private final RaftMetadataClient metadata;
private final NodeSelectorManager selectorManager = new NodeSelectorManager(); private final NodeSelectorManager selectorManager = new NodeSelectorManager();
private final RaftSessionManager sessionManager; private final RaftSessionManager sessionManager;


public DefaultRaftClient( public DefaultRaftClient(
String clientId, String clientId,
String clusterName,
Collection<NodeId> cluster, Collection<NodeId> cluster,
ClusterCommunicationService communicationService, RaftClientProtocol protocol,
ScheduledExecutorService threadPoolExecutor) { ScheduledExecutorService threadPoolExecutor) {
this.id = checkNotNull(clientId, "clientId cannot be null"); this.clientId = checkNotNull(clientId, "clientId cannot be null");
this.cluster = checkNotNull(cluster, "cluster cannot be null"); this.cluster = checkNotNull(cluster, "cluster cannot be null");
this.communicationService = checkNotNull(communicationService, "communicationService cannot be null");
this.threadPoolExecutor = checkNotNull(threadPoolExecutor, "threadPoolExecutor cannot be null"); this.threadPoolExecutor = checkNotNull(threadPoolExecutor, "threadPoolExecutor cannot be null");
this.metadata = new DefaultRaftMetadataClient(clientId, clusterName, communicationService, selectorManager); this.metadata = new DefaultRaftMetadataClient(clientId, protocol, selectorManager);
this.sessionManager = new RaftSessionManager(clientId, clusterName, communicationService, selectorManager, threadPoolExecutor); this.sessionManager = new RaftSessionManager(clientId, protocol, selectorManager, threadPoolExecutor);
} }


@Override @Override
public String id() { public String id() {
return id; return clientId;
} }


@Override @Override
Expand Down Expand Up @@ -108,20 +103,10 @@ public synchronized CompletableFuture<Void> close() {
return sessionManager.close(); return sessionManager.close();
} }


@Override
public int hashCode() {
return 23 + 37 * id.hashCode();
}

@Override
public boolean equals(Object object) {
return object instanceof DefaultRaftClient && ((DefaultRaftClient) object).id.equals(id);
}

@Override @Override
public String toString() { public String toString() {
return toStringHelper(this) return toStringHelper(this)
.add("id", id) .add("id", clientId)
.toString(); .toString();
} }


Expand All @@ -134,5 +119,4 @@ public RaftSession build() {
return sessionManager.openSession(name, type, communicationStrategy, serializer, timeout).join(); return sessionManager.openSession(name, type, communicationStrategy, serializer, timeout).join();
} }
} }

} }
Expand Up @@ -15,16 +15,16 @@
*/ */
package io.atomix.protocols.raft.client.impl; package io.atomix.protocols.raft.client.impl;


import io.atomix.cluster.ClusterCommunicationService;
import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.client.CommunicationStrategies; import io.atomix.protocols.raft.client.CommunicationStrategies;
import io.atomix.protocols.raft.client.RaftMetadataClient; import io.atomix.protocols.raft.client.RaftMetadataClient;
import io.atomix.protocols.raft.session.impl.RaftClientConnection;
import io.atomix.protocols.raft.session.impl.NodeSelectorManager;
import io.atomix.protocols.raft.metadata.RaftSessionMetadata;
import io.atomix.protocols.raft.protocol.MetadataRequest; import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse; import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.RaftResponse; import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.protocols.raft.metadata.RaftSessionMetadata;
import io.atomix.protocols.raft.session.impl.NodeSelectorManager;
import io.atomix.protocols.raft.session.impl.RaftClientConnection;


import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
Expand All @@ -40,9 +40,9 @@ public class DefaultRaftMetadataClient implements RaftMetadataClient {
private final NodeSelectorManager selectorManager; private final NodeSelectorManager selectorManager;
private final RaftClientConnection connection; private final RaftClientConnection connection;


public DefaultRaftMetadataClient(String clientId, String clusterName, ClusterCommunicationService communicationService, NodeSelectorManager selectorManager) { public DefaultRaftMetadataClient(String clientId, RaftClientProtocol protocol, NodeSelectorManager selectorManager) {
this.selectorManager = checkNotNull(selectorManager, "selectorManager cannot be null"); this.selectorManager = checkNotNull(selectorManager, "selectorManager cannot be null");
this.connection = new RaftClientConnection(clientId, clusterName, communicationService, selectorManager.createSelector(CommunicationStrategies.LEADER)); this.connection = new RaftClientConnection(clientId, protocol.dispatcher(), selectorManager.createSelector(CommunicationStrategies.LEADER));
} }


@Override @Override
Expand All @@ -62,7 +62,7 @@ public Collection<NodeId> servers() {
*/ */
private CompletableFuture<MetadataResponse> getMetadata() { private CompletableFuture<MetadataResponse> getMetadata() {
CompletableFuture<MetadataResponse> future = new CompletableFuture<>(); CompletableFuture<MetadataResponse> future = new CompletableFuture<>();
connection.<MetadataRequest, MetadataResponse>sendAndReceive(MetadataRequest.builder().build()).whenComplete((response, error) -> { connection.metadata(MetadataRequest.builder().build()).whenComplete((response, error) -> {
if (error == null) { if (error == null) {
if (response.status() == RaftResponse.Status.OK) { if (response.status() == RaftResponse.Status.OK) {
future.complete(response); future.complete(response);
Expand Down
Expand Up @@ -28,83 +28,92 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public abstract class AbstractRaftResponse implements RaftResponse { public abstract class AbstractRaftResponse implements RaftResponse {
protected final Status status; protected final Status status;
protected final RaftError error; protected final RaftError error;


protected AbstractRaftResponse(Status status, RaftError error) { protected AbstractRaftResponse(Status status, RaftError error) {
this.status = status; this.status = status;
this.error = error; this.error = error;
} }

@Override
public Status status() {
return status;
}

@Override
public RaftError error() {
return error;
}


@Override @Override
public int hashCode() { public Status status() {
return Objects.hash(getClass(), status); return status;
} }


@Override @Override
public String toString() { public RaftError error() {
if (status == Status.OK) { return error;
return toStringHelper(this)
.add("status", status)
.toString();
} else {
return toStringHelper(this)
.add("status", status)
.add("error", error)
.toString();
} }
}


/** @Override
* Abstract response builder. public int hashCode() {
* return Objects.hash(getClass(), status);
* @param <T> The builder type. }
* @param <U> The response type.
*/
protected static abstract class Builder<T extends Builder<T, U>, U extends AbstractRaftResponse> implements RaftResponse.Builder<T, U> {
protected Status status;
protected RaftError error;


@Override @Override
@SuppressWarnings("unchecked") public boolean equals(Object object) {
public T withStatus(Status status) { if (object.getClass() == getClass()) {
this.status = checkNotNull(status, "status cannot be null"); AbstractRaftResponse response = (AbstractRaftResponse) object;
return (T) this; return response.status == status && Objects.equals(response.error, error);
}
return false;
} }


@Override @Override
@SuppressWarnings("unchecked") public String toString() {
public T withError(RaftError error) { if (status == Status.OK) {
this.error = checkNotNull(error, "error cannot be null"); return toStringHelper(this)
return (T) this; .add("status", status)
.toString();
} else {
return toStringHelper(this)
.add("status", status)
.add("error", error)
.toString();
}
} }


/** /**
* Validates the builder. * Abstract response builder.
*
* @param <T> The builder type.
* @param <U> The response type.
*/ */
protected void validate() { protected static abstract class Builder<T extends Builder<T, U>, U extends AbstractRaftResponse> implements RaftResponse.Builder<T, U> {
checkNotNull(status, "status cannot be null"); protected Status status;
if (status == Status.ERROR) { protected RaftError error;
checkNotNull(error, "error cannot be null");
}
}


@Override @Override
public String toString() { @SuppressWarnings("unchecked")
return toStringHelper(this) public T withStatus(Status status) {
.add("status", status) this.status = checkNotNull(status, "status cannot be null");
.add("error", error) return (T) this;
.toString(); }

@Override
@SuppressWarnings("unchecked")
public T withError(RaftError error) {
this.error = checkNotNull(error, "error cannot be null");
return (T) this;
}

/**
* Validates the builder.
*/
protected void validate() {
checkNotNull(status, "status cannot be null");
if (status == Status.ERROR) {
checkNotNull(error, "error cannot be null");
}
}

@Override
public String toString() {
return toStringHelper(this)
.add("status", status)
.add("error", error)
.toString();
}
} }
}
} }
Expand Up @@ -35,11 +35,6 @@ public CloseSessionRequest(long session) {
super(session); super(session);
} }


@Override
public Type type() {
return Type.CLOSE_SESSION;
}

/** /**
* Unregister request builder. * Unregister request builder.
*/ */
Expand Down
Expand Up @@ -53,11 +53,6 @@ public CommandRequest(long session, long sequence, byte[] bytes) {
super(session, sequence, bytes); super(session, sequence, bytes);
} }


@Override
public Type type() {
return Type.COMMAND;
}

@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(getClass(), session, sequence, bytes); return Objects.hash(getClass(), session, sequence, bytes);
Expand Down
Expand Up @@ -49,11 +49,6 @@ public ConnectRequest(long session, long connection) {
this.connection = connection; this.connection = connection;
} }


@Override
public Type type() {
return Type.CONNECT;
}

/** /**
* Returns the connecting session ID. * Returns the connecting session ID.
* *
Expand Down

0 comments on commit 4102097

Please sign in to comment.