Skip to content

Commit

Permalink
Refactor hierarchical resource interface to flat interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 20, 2015
1 parent dbe1d38 commit a50f9da
Show file tree
Hide file tree
Showing 19 changed files with 202 additions and 789 deletions.
Expand Up @@ -18,7 +18,6 @@
import net.jodah.concurrentunit.ConcurrentTestCase; import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.Copycat; import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage; import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.io.transport.LocalTransport;
Expand Down Expand Up @@ -53,8 +52,7 @@ public void testSetGet() throws Throwable {


Copycat copycat = servers.get(0); Copycat copycat = servers.get(0);


Node node = copycat.create("/test").get(); DistributedAtomicValue<String> reference = copycat.create("test", DistributedAtomicValue.class).get();
DistributedAtomicValue<String> reference = node.create(DistributedAtomicValue.class).get();


expectResume(); expectResume();
reference.set("Hello world!").thenRun(this::resume); reference.set("Hello world!").thenRun(this::resume);
Expand All @@ -77,8 +75,7 @@ public void testChangeEvent() throws Throwable {


Copycat copycat = servers.get(0); Copycat copycat = servers.get(0);


Node node = copycat.create("/test").get(); DistributedAtomicValue<String> reference = copycat.create("test", DistributedAtomicValue.class).get();
DistributedAtomicValue<String> reference = node.create(DistributedAtomicValue.class).get();


expectResume(); expectResume();
reference.onChange(value -> { reference.onChange(value -> {
Expand Down Expand Up @@ -198,8 +195,7 @@ public void testMembershipChange() throws Throwable {
copycat4.open().thenRun(this::resume); copycat4.open().thenRun(this::resume);
await(); await();


Node node = copycat4.create("/test").get(); DistributedAtomicValue<String> reference = copycat4.create("test", DistributedAtomicValue.class).get();
DistributedAtomicValue<String> reference = node.create(DistributedAtomicValue.class).get();


expectResume(); expectResume();
reference.set("Hello world!").thenRun(this::resume); reference.set("Hello world!").thenRun(this::resume);
Expand All @@ -219,15 +215,13 @@ public void testMembershipChange() throws Throwable {
public void testCompareAndSet() throws Throwable { public void testCompareAndSet() throws Throwable {
List<Copycat> servers = createCopycats(3); List<Copycat> servers = createCopycats(3);


Node node1 = servers.get(0).create("/test").get(); DistributedAtomicValue<Integer> reference1 = servers.get(0).create("test", DistributedAtomicValue.class).get();
DistributedAtomicValue<Integer> reference1 = node1.create(DistributedAtomicValue.class).get();


expectResume(); expectResume();
reference1.set(1).thenRun(this::resume); reference1.set(1).thenRun(this::resume);
await(); await();


Node node2 = servers.get(0).create("/test").get(); DistributedAtomicValue<Integer> reference2 = servers.get(0).create("test", DistributedAtomicValue.class).get();
DistributedAtomicValue<Integer> reference2 = node2.create(DistributedAtomicValue.class).get();


expectResume(); expectResume();
reference2.compareAndSet(1, 2).thenAccept(result -> { reference2.compareAndSet(1, 2).thenAccept(result -> {
Expand Down
Expand Up @@ -18,7 +18,6 @@
import net.jodah.concurrentunit.ConcurrentTestCase; import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.Copycat; import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage; import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.io.transport.LocalTransport;
Expand Down Expand Up @@ -53,8 +52,7 @@ public void testPutGetRemove() throws Throwable {


Copycat copycat = copycats.get(0); Copycat copycat = copycats.get(0);


Node node = copycat.create("/test").get(); DistributedMap<String, String> map = copycat.create("test", DistributedMap.class).get();
DistributedMap<String, String> map = node.create(DistributedMap.class).get();


expectResume(); expectResume();
map.put("foo", "Hello world!").thenRun(this::resume); map.put("foo", "Hello world!").thenRun(this::resume);
Expand Down Expand Up @@ -93,8 +91,7 @@ public void testMapSize() throws Throwable {


Copycat copycat = copycats.get(0); Copycat copycat = copycats.get(0);


Node node = copycat.create("/test").get(); DistributedMap<String, String> map = copycat.create("test", DistributedMap.class).get();
DistributedMap<String, String> map = node.create(DistributedMap.class).get();


expectResume(); expectResume();
map.size().thenAccept(size -> { map.size().thenAccept(size -> {
Expand Down Expand Up @@ -137,8 +134,7 @@ public void testMapTtl() throws Throwable {


Copycat copycat = copycats.get(0); Copycat copycat = copycats.get(0);


Node node = copycat.create("/test").get(); DistributedMap<String, String> map = copycat.create("test", DistributedMap.class).get();
DistributedMap<String, String> map = node.create(DistributedMap.class).get();


expectResume(); expectResume();
map.put("foo", "Hello world!", Duration.ofSeconds(1)).thenRun(this::resume); map.put("foo", "Hello world!", Duration.ofSeconds(1)).thenRun(this::resume);
Expand Down
Expand Up @@ -18,7 +18,6 @@
import net.jodah.concurrentunit.ConcurrentTestCase; import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.Copycat; import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage; import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.io.transport.LocalTransport;
Expand Down Expand Up @@ -55,12 +54,10 @@ public void testSetAddRemove() throws Throwable {
Copycat copycat1 = copycats.get(0); Copycat copycat1 = copycats.get(0);
Copycat copycat2 = copycats.get(1); Copycat copycat2 = copycats.get(1);


Node node1 = copycat1.create("/test").get(); DistributedSet<String> set1 = copycat1.create("test", DistributedSet.class).get();
DistributedSet<String> set1 = node1.create(DistributedSet.class).get();
assertFalse(set1.contains("Hello world!").get()); assertFalse(set1.contains("Hello world!").get());


Node node2 = copycat2.create("/test").get(); DistributedSet<String> set2 = copycat2.create("test", DistributedSet.class).get();
DistributedSet<String> set2 = node2.create(DistributedSet.class).get();
assertFalse(set2.contains("Hello world!").get()); assertFalse(set2.contains("Hello world!").get());


set1.add("Hello world!").join(); set1.add("Hello world!").join();
Expand Down
Expand Up @@ -18,7 +18,6 @@
import net.jodah.concurrentunit.ConcurrentTestCase; import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.Copycat; import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage; import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.io.transport.LocalTransport;
Expand Down Expand Up @@ -53,8 +52,7 @@ public void testElection() throws Throwable {


Copycat copycat = servers.get(0); Copycat copycat = servers.get(0);


Node node = copycat.create("/test").get(); DistributedLeaderElection election = copycat.create("test", DistributedLeaderElection.class).get();
DistributedLeaderElection election = node.create(DistributedLeaderElection.class).get();


expectResumes(2); expectResumes(2);
election.onElection(v -> resume()).thenRun(this::resume); election.onElection(v -> resume()).thenRun(this::resume);
Expand Down
Expand Up @@ -18,7 +18,6 @@
import net.jodah.concurrentunit.ConcurrentTestCase; import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.Copycat; import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.CopycatServer; import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage; import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.transport.LocalServerRegistry; import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport; import net.kuujo.copycat.io.transport.LocalTransport;
Expand Down Expand Up @@ -53,8 +52,7 @@ public void testLockUnlock() throws Throwable {


Copycat copycat = servers.get(0); Copycat copycat = servers.get(0);


Node node = copycat.create("/test").get(); DistributedLock lock = copycat.create("test", DistributedLock.class).get();
DistributedLock lock = node.create(DistributedLock.class).get();


expectResume(); expectResume();
lock.lock().thenRun(this::resume); lock.lock().thenRun(this::resume);
Expand Down
32 changes: 0 additions & 32 deletions core/src/main/java/net/kuujo/copycat/Configurable.java

This file was deleted.

104 changes: 16 additions & 88 deletions core/src/main/java/net/kuujo/copycat/Copycat.java
Expand Up @@ -15,10 +15,9 @@
*/ */
package net.kuujo.copycat; package net.kuujo.copycat;


import net.kuujo.copycat.manager.CreatePath;
import net.kuujo.copycat.manager.CreateResource; import net.kuujo.copycat.manager.CreateResource;
import net.kuujo.copycat.manager.DeletePath; import net.kuujo.copycat.manager.GetResource;
import net.kuujo.copycat.manager.PathExists; import net.kuujo.copycat.manager.ResourceExists;
import net.kuujo.copycat.raft.RaftClient; import net.kuujo.copycat.raft.RaftClient;
import net.kuujo.copycat.raft.StateMachine; import net.kuujo.copycat.raft.StateMachine;
import net.kuujo.copycat.raft.protocol.Command; import net.kuujo.copycat.raft.protocol.Command;
Expand All @@ -37,9 +36,8 @@
* creation and management of {@link net.kuujo.copycat.Resource} objects via a filesystem like interface. There is a * creation and management of {@link net.kuujo.copycat.Resource} objects via a filesystem like interface. There is a
* one-to-one relationship between paths and resources, so each path can be associated with one and only one resource. * one-to-one relationship between paths and resources, so each path can be associated with one and only one resource.
* <p> * <p>
* To create a resource, create a {@link net.kuujo.copycat.Node} and then create the resource by passing the resource * To create a resource, pass the resource {@link java.lang.Class} to the {@link Copycat#create(String, Class)} method.
* {@link java.lang.Class} to the {@link Node#create(Class)} method. When a resource is created, the * When a resource is created, the {@link StateMachine} associated with the resource will be created on each Raft server
* {@link StateMachine} associated with the resource will be created on each Raft server
* and future operations submitted for that resource will be applied to the state machine. Internally, resource state * and future operations submitted for that resource will be applied to the state machine. Internally, resource state
* machines are multiplexed across a shared Raft log. * machines are multiplexed across a shared Raft log.
* <p> * <p>
Expand All @@ -50,82 +48,36 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public abstract class Copycat implements Managed<Copycat> { public abstract class Copycat implements Managed<Copycat> {
static final String PATH_SEPARATOR = "/";
protected final RaftClient client; protected final RaftClient client;
private final Map<String, Node> nodes = new ConcurrentHashMap<>();
private final Map<Long, ResourceContext> resources = new ConcurrentHashMap<>(); private final Map<Long, ResourceContext> resources = new ConcurrentHashMap<>();


protected Copycat(RaftClient client) { protected Copycat(RaftClient client) {
this.client = client; this.client = client;
} }


/**
* Returns a reference to the node at the given path.
* <p>
* The returned node represents the node at the given {@code path}. The node may or may not already exist.
* This method does not create the returned node. In order to create the node in the cluster, the user must call
* the {@link net.kuujo.copycat.Node#create()} method on the returned {@link net.kuujo.copycat.Node} or alternatively
* call {@link #create(String)} directly.
*
* @param path The path for which to return the node.
* @return A reference to the node at the given path.
*/
public Node node(String path) {
if (path == null)
throw new NullPointerException("path cannot be null");
if (!path.startsWith(PATH_SEPARATOR))
path = PATH_SEPARATOR + path;
if (path.endsWith(PATH_SEPARATOR))
path = path.substring(0, path.length() - 1);
return nodes.computeIfAbsent(path, p -> new Node(p, this));
}

/** /**
* Checks whether a path exists. * Checks whether a path exists.
* *
* @param path The path to check. * @param path The path to check.
* @return A completable future indicating whether the given path exists. * @return A completable future indicating whether the given path exists.
*/ */
public CompletableFuture<Boolean> exists(String path) { public CompletableFuture<Boolean> exists(String path) {
return client.submit(new PathExists(path)); return client.submit(new ResourceExists(path));
}

/**
* Creates a node at the given path.
* <p>
* If a node at the given path already exists, the existing node will be returned, otherwise a new {@link net.kuujo.copycat.Node}
* will be returned. Additionally, if the node's parents don't already exist they'll be created. For instance, calling
* this method with {@code /foo/bar/baz} will create {@code foo}, {@code foo/bar}, and {@code foo/bar/baz} if they
* don't already exist.
*
* @param path The path for which to create the node.
* @return A completable future to be completed once the node has been created.
*/
public CompletableFuture<Node> create(String path) {
return client.submit(CreatePath.builder()
.withPath(path)
.build())
.thenApply(result -> node(path));
} }


/** /**
* Creates a resource at the given path. * Gets the resource at the given path.
* <p>
* If a node at the given path already exists, the existing node will be returned, otherwise a new {@link net.kuujo.copycat.Node}
* will be returned. Additionally, if the node's parents don't already exist they'll be created. For instance, calling
* this method with {@code /foo/bar/baz} will create {@code foo}, {@code foo/bar}, and {@code foo/bar/baz} if they
* don't already exist.
* *
* @param path The path at which to create the resource. * @param path The path at which to get the resource.
* @param type The resource type to create. * @param type The expected resource type.
* @param <T> The resource type. * @param <T> The resource type.
* @return A completable future to be completed once the resource has been created. * @return A completable future to be completed once the resource has been loaded.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T extends Resource> CompletableFuture<T> create(String path, Class<? super T> type) { public <T extends Resource> CompletableFuture<T> get(String path, Class<? super T> type) {
try { try {
T resource = (T) type.newInstance(); T resource = (T) type.newInstance();
return client.submit(CreateResource.builder() return client.submit(GetResource.builder()
.withPath(path) .withPath(path)
.withType(resource.stateMachine()) .withType(resource.stateMachine())
.build()) .build())
Expand All @@ -139,57 +91,33 @@ public <T extends Resource> CompletableFuture<T> create(String path, Class<? sup
} }


/** /**
* Creates a configurable resource at the given path. * Creates a resource at the given path.
* <p> * <p>
* If a node at the given path already exists, the existing node will be returned, otherwise a new {@link net.kuujo.copycat.Node} * If a resource at the given path already exists, the existing resource will be returned, otherwise a new
* will be returned. Additionally, if the node's parents don't already exist they'll be created. For instance, calling * resource of the given {@code type} will be created.
* this method with {@code /foo/bar/baz} will create {@code foo}, {@code foo/bar}, and {@code foo/bar/baz} if they
* don't already exist.
* *
* @param path The path at which to create the resource. * @param path The path at which to create the resource.
* @param type The resource type to create. * @param type The resource type to create.
* @param options The resource options.
* @param <T> The resource type. * @param <T> The resource type.
* @param <U> The resource configuration type.
* @return A completable future to be completed once the resource has been created. * @return A completable future to be completed once the resource has been created.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T extends Resource & Configurable<U>, U extends Options> CompletableFuture<T> create(String path, Class<? super T> type, U options) { public <T extends Resource> CompletableFuture<T> create(String path, Class<? super T> type) {
try { try {
T resource = (T) type.newInstance(); T resource = (T) type.newInstance();
return client.submit(CreateResource.builder() return client.submit(CreateResource.builder()
.withPath(path) .withPath(path)
.withType(resource.stateMachine()) .withType(resource.stateMachine())
.build()) .build())
.thenApply(id -> { .thenApply(id -> {
resource.open(resources.computeIfAbsent(id, i -> { resource.open(resources.computeIfAbsent(id, i -> new ResourceContext(id, client)));
ResourceContext context = new ResourceContext(id, client);
resource.configure(options);
return context;
}));
return resource; return resource;
}); });
} catch (InstantiationException | IllegalAccessException e) { } catch (InstantiationException | IllegalAccessException e) {
throw new ResourceException("failed to instantiate resource: " + type, e); throw new ResourceException("failed to instantiate resource: " + type, e);
} }
} }


/**
* Deletes a node at the given path.
* <p>
* Both the {@link net.kuujo.copycat.Node} at the given path and any {@link net.kuujo.copycat.Resource} associated
* with the node will be permanently deleted, and state stored at the node will not be recoverable.
*
* @param path The path at which to delete the node.
* @return A completable future to be completed once the node has been deleted.
*/
public CompletableFuture<Copycat> delete(String path) {
return client.submit(DeletePath.builder()
.withPath(path)
.build())
.thenApply(result -> this);
}

@Override @Override
public CompletableFuture<Copycat> open() { public CompletableFuture<Copycat> open() {
return client.open().thenApply(v -> this); return client.open().thenApply(v -> this);
Expand Down

0 comments on commit a50f9da

Please sign in to comment.