Skip to content

Commit

Permalink
Refactor module structure to separate transport layer into separate p…
Browse files Browse the repository at this point in the history
…roject.
  • Loading branch information
kuujo committed Jul 16, 2015
1 parent f3bb531 commit 58d6eca
Show file tree
Hide file tree
Showing 142 changed files with 407 additions and 272 deletions.
5 changes: 0 additions & 5 deletions all/pom.xml
Expand Up @@ -32,11 +32,6 @@
<artifactId>copycat-netty</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-raft</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat</artifactId>
Expand Down
Expand Up @@ -23,8 +23,8 @@
import net.kuujo.copycat.AbstractResource;
import net.kuujo.copycat.Mode;
import net.kuujo.copycat.Stateful;
import net.kuujo.copycat.log.Compaction;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.log.Compaction;

import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
Expand Down
8 changes: 4 additions & 4 deletions raft/server/pom.xml → client/pom.xml
Expand Up @@ -19,17 +19,17 @@

<parent>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-raft-parent</artifactId>
<artifactId>copycat-parent</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>

<artifactId>copycat-raft-server</artifactId>
<name>Copycat Raft Server</name>
<artifactId>copycat-client</artifactId>
<name>Copycat Client</name>

<dependencies>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-raft-client</artifactId>
<artifactId>copycat-protocol</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
Expand Up @@ -16,7 +16,7 @@
package net.kuujo.copycat.raft;

import net.kuujo.alleycat.Alleycat;
import net.kuujo.copycat.raft.protocol.Protocol;
import net.kuujo.copycat.transport.Transport;
import net.kuujo.copycat.raft.state.RaftClientState;

import java.util.Random;
Expand Down Expand Up @@ -144,7 +144,7 @@ public CompletableFuture<Void> delete() {
*/
public static class Builder implements Raft.Builder<Builder, RaftClient> {
private static final Random RANDOM = new Random();
private Protocol protocol;
private Transport transport;
private Alleycat serializer;
private long keepAliveInterval = 1000;
private Members members;
Expand All @@ -153,8 +153,8 @@ private Builder() {
}

@Override
public Builder withProtocol(Protocol protocol) {
this.protocol = protocol;
public Builder withTransport(Transport transport) {
this.transport = transport;
return this;
}

Expand Down Expand Up @@ -206,7 +206,7 @@ public Builder withMembers(Members members) {

@Override
public RaftClient build() {
return new RaftClient(new RaftClientState(nextClientId(), protocol, members, serializer).setKeepAliveInterval(keepAliveInterval));
return new RaftClient(new RaftClientState(nextClientId(), transport, members, serializer).setKeepAliveInterval(keepAliveInterval));
}

/**
Expand Down
Expand Up @@ -21,14 +21,21 @@
import net.kuujo.copycat.Listeners;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.protocol.*;
import net.kuujo.copycat.raft.protocol.ProtocolException;
import net.kuujo.copycat.transport.Client;
import net.kuujo.copycat.transport.Connection;
import net.kuujo.copycat.transport.Transport;
import net.kuujo.copycat.transport.TransportException;
import net.kuujo.copycat.util.Managed;
import net.kuujo.copycat.util.concurrent.ComposableFuture;
import net.kuujo.copycat.util.concurrent.Context;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.concurrent.SingleThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand All @@ -48,7 +55,7 @@ public class RaftClientState implements Managed<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftClientState.class);
private static final long REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
private final Members members;
private final Protocol protocol;
private final Transport transport;
private final Client client;
private Connection connection;
private final Context context;
Expand All @@ -69,14 +76,14 @@ public class RaftClientState implements Managed<Void> {
private volatile long response;
private volatile long version;

public RaftClientState(int clientId, Protocol protocol, Members members, Alleycat serializer) {
public RaftClientState(int clientId, Transport transport, Members members, Alleycat serializer) {
if (members == null)
throw new NullPointerException("members cannot be null");

this.context = new SingleThreadContext("copycat-client-" + clientId, serializer.clone());
this.members = members;
this.protocol = protocol;
this.client = protocol.client(clientId);
this.transport = transport;
this.client = transport.client(clientId);
this.session = new ClientSession(client.id());
this.session.close();
}
Expand Down Expand Up @@ -268,10 +275,17 @@ private CompletableFuture<Connection> getConnection(Member member) {
return CompletableFuture.completedFuture(connection);
}

final InetSocketAddress address;
try {
address = new InetSocketAddress(InetAddress.getByName(member.host()), member.port());
} catch (UnknownHostException e) {
return Futures.exceptionalFuture(e);
}

if (connection != null) {
CompletableFuture<Connection> future = new ComposableFuture<>();
connection.close().whenComplete((result, error) -> {
client.connect(member).whenComplete((connection, connectError) -> {
client.connect(address).whenComplete((connection, connectError) -> {
if (connectError == null) {
this.connection = connection;
future.complete(connection);
Expand All @@ -283,7 +297,7 @@ private CompletableFuture<Connection> getConnection(Member member) {
return future;
}

return client.connect(member).thenApply(connection -> {
return client.connect(address).thenApply(connection -> {
this.connection = connection;
return connection;
});
Expand Down Expand Up @@ -359,7 +373,7 @@ private <T> CompletableFuture<T> submit(CommandRequest request, CompletableFutur
submit(request, future);
} else if (error instanceof NoLeaderException) {
submit(request, future);
} else if (error instanceof ProtocolException) {
} else if (error instanceof TransportException) {
LOGGER.warn("Failed to communicate with {}: {}", member, error);
submit(request, future);
} else if (error instanceof UnknownSessionException) {
Expand Down Expand Up @@ -485,7 +499,7 @@ private <T> CompletableFuture<T> submit(QueryRequest request, CompletableFuture<
submit(request, future);
} else if (error instanceof NoLeaderException) {
submit(request, future);
} else if (error instanceof ProtocolException) {
} else if (error instanceof TransportException) {
LOGGER.warn("Failed to communicate with {}: {}", member, error);
submit(request, future);
} else if (error instanceof UnknownSessionException) {
Expand Down Expand Up @@ -741,7 +755,7 @@ public CompletableFuture<Void> close() {
cancelRegisterTimer();
cancelKeepAliveTimer();
open = false;
protocol.close().whenCompleteAsync((result, error) -> {
transport.close().whenCompleteAsync((result, error) -> {
if (error == null) {
future.complete(null);
} else {
Expand Down
Expand Up @@ -23,8 +23,8 @@
import net.kuujo.copycat.AbstractResource;
import net.kuujo.copycat.Mode;
import net.kuujo.copycat.Stateful;
import net.kuujo.copycat.log.Compaction;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.log.Compaction;

import java.util.HashMap;
import java.util.HashSet;
Expand Down
Expand Up @@ -23,8 +23,8 @@
import net.kuujo.copycat.AbstractResource;
import net.kuujo.copycat.Mode;
import net.kuujo.copycat.Stateful;
import net.kuujo.copycat.log.Compaction;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.log.Compaction;

import java.util.HashMap;
import java.util.HashSet;
Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Expand Up @@ -14,12 +14,12 @@
<dependencies>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-raft-client</artifactId>
<artifactId>copycat-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-raft-server</artifactId>
<artifactId>copycat-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/net/kuujo/copycat/CopycatClient.java
Expand Up @@ -18,6 +18,7 @@
import net.kuujo.alleycat.Alleycat;
import net.kuujo.copycat.raft.Members;
import net.kuujo.copycat.raft.RaftClient;
import net.kuujo.copycat.transport.Transport;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -50,6 +51,17 @@ public static class Builder implements Copycat.Builder<CopycatClient> {
private Builder() {
}

/**
* Sets the client transport.
*
* @param transport The client transport.
* @return The client builder.
*/
public Builder withTransport(Transport transport) {
builder.withTransport(transport);
return this;
}

/**
* Sets the client serializer.
*
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/java/net/kuujo/copycat/CopycatServer.java
Expand Up @@ -16,10 +16,11 @@
package net.kuujo.copycat;

import net.kuujo.alleycat.Alleycat;
import net.kuujo.copycat.log.Log;
import net.kuujo.copycat.manager.ResourceManager;
import net.kuujo.copycat.raft.Members;
import net.kuujo.copycat.raft.RaftServer;
import net.kuujo.copycat.raft.log.Log;
import net.kuujo.copycat.transport.Transport;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -52,6 +53,17 @@ public static class Builder implements Copycat.Builder<CopycatServer> {
private Builder() {
}

/**
* Sets the server transport.
*
* @param transport The client server.
* @return The client builder.
*/
public Builder withTransport(Transport transport) {
builder.withTransport(transport);
return this;
}

/**
* Sets the server member ID.
*
Expand Down
Expand Up @@ -18,8 +18,8 @@
import net.kuujo.copycat.ResourceCommand;
import net.kuujo.copycat.ResourceOperation;
import net.kuujo.copycat.ResourceQuery;
import net.kuujo.copycat.log.Compaction;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.log.Compaction;

import java.util.*;

Expand Down
8 changes: 4 additions & 4 deletions raft/client/pom.xml → log/pom.xml
Expand Up @@ -19,17 +19,17 @@

<parent>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-raft-parent</artifactId>
<artifactId>copycat-parent</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>

<artifactId>copycat-raft-client</artifactId>
<name>Copycat Raft Client</name>
<artifactId>copycat-log</artifactId>
<name>Copycat Log</name>

<dependencies>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-raft-protocol</artifactId>
<artifactId>copycat-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import java.util.concurrent.CompletableFuture;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import net.kuujo.copycat.util.concurrent.Context;
import org.slf4j.Logger;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

/**
* Segment descriptor exception.
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import net.kuujo.alleycat.AlleycatSerializable;
import net.kuujo.alleycat.util.ReferenceCounted;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import java.util.concurrent.CompletableFuture;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import net.kuujo.copycat.util.concurrent.Context;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import java.io.File;
import java.util.concurrent.TimeUnit;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import net.kuujo.copycat.CopycatException;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;
package net.kuujo.copycat.log;

import net.kuujo.copycat.util.concurrent.Context;
import org.slf4j.Logger;
Expand Down

0 comments on commit 58d6eca

Please sign in to comment.