Skip to content

Commit

Permalink
Make protocol context accessors public in order to allow resources to…
Browse files Browse the repository at this point in the history
… provide named execution contexts.
  • Loading branch information
kuujo committed Apr 26, 2015
1 parent 5b99cb6 commit a657e70
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 27 deletions.
20 changes: 20 additions & 0 deletions core/src/main/java/net/kuujo/copycat/protocol/Protocol.java
Expand Up @@ -19,6 +19,7 @@
import net.kuujo.copycat.EventListener;
import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.io.Buffer;
import net.kuujo.copycat.util.ExecutionContext;
import net.kuujo.copycat.util.Managed;

import java.util.Set;
Expand All @@ -34,6 +35,7 @@ public abstract class Protocol implements Managed<Protocol> {
protected final Set<EventListener<Event>> listeners = new CopyOnWriteArraySet<>();
protected String topic;
protected Cluster cluster;
protected ExecutionContext context;

/**
* Sets the protocol cluster.
Expand Down Expand Up @@ -71,6 +73,24 @@ public String getTopic() {
return topic;
}

/**
* Sets the protocol execution context.
*
* @param context The protocol execution context.
*/
public void setContext(ExecutionContext context) {
this.context = context;
}

/**
* Returns the protocol execution context.
*
* @return The protocol execution context.
*/
public ExecutionContext getContext() {
return context;
}

/**
* Adds an event listener to the protocol.
*
Expand Down
Expand Up @@ -19,6 +19,7 @@
import net.kuujo.copycat.io.Buffer;
import net.kuujo.copycat.io.serializer.Serializer;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.ExecutionContext;

import java.util.concurrent.CompletableFuture;

Expand All @@ -41,6 +42,7 @@ protected DiscreteResource(DiscreteResourceConfig config) {
this.partitionedCluster = new PartitionedCluster(config.getCluster(), config.getReplicationStrategy(), config.getPartitions());
protocol.setTopic(config.getName());
protocol.setCluster(partitionedCluster);
protocol.setContext(new ExecutionContext("copycat-protocol-" + config.getName()));
}

/**
Expand Down
Expand Up @@ -17,6 +17,7 @@

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Partitioned resource.
Expand Down Expand Up @@ -46,6 +47,29 @@ protected U partition(Object key) {
return partitions.get(partitioner.partition(key, partitions.size()));
}

@Override
@SuppressWarnings("unchecked")
public CompletableFuture<V> open() {
return super.open().thenCompose(v -> {
CompletableFuture[] futures = new CompletableFuture[partitions.size()];
for (int i = 0; i < partitions.size(); i++) {
futures[i] = partitions.get(i).open();
}
return CompletableFuture.allOf(futures);
}).thenApply(v -> (V) this);
}

@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Void> close() {
CompletableFuture[] futures = new CompletableFuture[partitions.size()];
for (int i = 0; i < partitions.size(); i++) {
futures[i] = partitions.get(i).open();
}
return CompletableFuture.allOf(futures)
.thenCompose(v -> super.close());
}

/**
* Partitioned resource builder.
*
Expand Down
Expand Up @@ -51,8 +51,7 @@ public static Builder builder() {
}

private final Logger LOGGER = LoggerFactory.getLogger(RaftProtocol.class);
private final ExecutionContext context;
private final ThreadChecker threadChecker;
private ThreadChecker threadChecker;
private final RaftConfig config;
private RaftStorage storage;
private RaftState state;
Expand All @@ -70,10 +69,14 @@ public static Builder builder() {
private long lastApplied = 0;
private volatile boolean open;

protected RaftProtocol(RaftStorage storage, RaftConfig config, ExecutionContext context) {
protected RaftProtocol(RaftStorage storage, RaftConfig config) {
this.storage = storage;
this.config = config;
this.context = context;
}

@Override
public void setContext(ExecutionContext context) {
super.setContext(context);
threadChecker = new ThreadChecker(context);
}

Expand Down Expand Up @@ -350,15 +353,6 @@ long getHeartbeatInterval() {
return config.getHeartbeatInterval();
}

/**
* Returns the context executor.
*
* @return The context executor.
*/
ExecutionContext getContext() {
return context;
}

@Override
public Protocol commitHandler(CommitHandler handler) {
this.commitHandler = handler;
Expand Down Expand Up @@ -547,7 +541,6 @@ public String toString() {
public static class Builder extends Protocol.Builder {
private RaftStorage storage;
private RaftConfig config = new RaftConfig();
private ExecutionContext context;

/**
* Sets the Raft storage.
Expand Down Expand Up @@ -610,20 +603,9 @@ public Builder withHeartbeatInterval(long heartbeatInterval, TimeUnit unit) {
return this;
}

/**
* Sets the Raft execution context.
*
* @param context The Raft execution context.
* @return The Raft protocol builder.
*/
public Builder withContext(ExecutionContext context) {
this.context = context;
return this;
}

@Override
public Protocol build() {
return new RaftProtocol(storage, config, context);
return new RaftProtocol(storage, config);
}
}

Expand Down
Expand Up @@ -1349,7 +1349,6 @@ private TestCluster buildCluster(int id, Member.Type type, int nodes, TestMember
*/
private RaftProtocol buildProtocol(int id, ManagedCluster cluster) throws Exception {
RaftProtocol protocol = (RaftProtocol) RaftProtocol.builder()
.withContext(new ExecutionContext("test-" + id))
.withStorage(BufferedStorage.builder()
.withName(String.format("test-%d", id))
.withDirectory(String.format("%s/test-%d", testDirectory, id))
Expand All @@ -1358,6 +1357,7 @@ private RaftProtocol buildProtocol(int id, ManagedCluster cluster) throws Except

protocol.setCluster(cluster.open().get());
protocol.setTopic("test");
protocol.setContext(new ExecutionContext("test-" + id));
return protocol;
}

Expand Down

0 comments on commit a657e70

Please sign in to comment.