Skip to content

Commit

Permalink
Add passive members to collections test cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 21, 2015
1 parent 221b383 commit 1486476
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
Expand Up @@ -34,7 +34,7 @@ public void testAsyncMapPutGet() throws Throwable {
TestCluster<AsyncMap<String, String>> cluster = TestCluster.of((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withLog(new BufferedLog()))); TestCluster<AsyncMap<String, String>> cluster = TestCluster.of((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withLog(new BufferedLog())));
cluster.open().thenRun(this::resume); cluster.open().thenRun(this::resume);
await(5000); await(5000);
AsyncMap<String, String> map = cluster.resources().get(0); AsyncMap<String, String> map = cluster.activeResources().iterator().next();
map.put("foo", "Hello world!").thenRun(() -> { map.put("foo", "Hello world!").thenRun(() -> {
map.get("foo").thenAccept(result -> { map.get("foo").thenAccept(result -> {
threadAssertEquals(result, "Hello world!"); threadAssertEquals(result, "Hello world!");
Expand All @@ -51,7 +51,7 @@ public void testAsyncMapPutRemove() throws Throwable {
TestCluster<AsyncMap<String, String>> cluster = TestCluster.of((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withLog(new BufferedLog()))); TestCluster<AsyncMap<String, String>> cluster = TestCluster.of((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withLog(new BufferedLog())));
cluster.open().thenRun(this::resume); cluster.open().thenRun(this::resume);
await(5000); await(5000);
AsyncMap<String, String> map = cluster.resources().get(0); AsyncMap<String, String> map = cluster.activeResources().iterator().next();
map.put("foo", "Hello world!").thenRun(() -> { map.put("foo", "Hello world!").thenRun(() -> {
map.get("foo").thenAccept(r1 -> { map.get("foo").thenAccept(r1 -> {
threadAssertEquals(r1, "Hello world!"); threadAssertEquals(r1, "Hello world!");
Expand Down
Expand Up @@ -19,6 +19,7 @@
import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.protocol.LocalProtocol; import net.kuujo.copycat.protocol.LocalProtocol;


import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction; import java.util.function.BiFunction;
Expand All @@ -30,40 +31,59 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class TestCluster<T extends Resource<T>> { public class TestCluster<T extends Resource<T>> {
private final List<T> resources; private final List<T> activeResources;
private final List<T> passiveResources;


private TestCluster(List<T> resources) { private TestCluster(List<T> activeResources, List<T> passiveResources) {
this.resources = resources; this.activeResources = activeResources;
this.passiveResources = passiveResources;
} }


/** /**
* Creates a test cluster for the given resource factory. * Creates a test cluster for the given resource factory.
*/ */
@SuppressWarnings("all")
public static <T extends Resource<T>> TestCluster<T> of(BiFunction<String, ClusterConfig, T> factory) { public static <T extends Resource<T>> TestCluster<T> of(BiFunction<String, ClusterConfig, T> factory) {
ClusterConfig cluster = new ClusterConfig() ClusterConfig cluster = new ClusterConfig()
.withProtocol(new LocalProtocol()); .withProtocol(new LocalProtocol());
for (int i = 1; i <= 3; i++) { for (int i = 1; i <= 3; i++) {
cluster.addMember(String.format("local://test%d", i)); cluster.addMember(String.format("local://test%d", i));
} }
return new TestCluster<T>(cluster.getMembers().stream().collect(Collectors.mapping(uri -> factory.apply(uri, cluster), Collectors.toList()))); List<T> activeResources = cluster.getMembers().stream().collect(Collectors.mapping(uri -> factory.apply(uri, cluster), Collectors.toList()));
List<T> passiveResources = Arrays.asList("local://test4", "local://test5").stream().collect(Collectors.mapping(uri -> factory.apply(uri, cluster), Collectors.toList()));
return new TestCluster<T>(activeResources, passiveResources);
} }


/** /**
* Returns a list of cluster resources. * Returns a list of active cluster resources.
*
* @return A list of active cluster resources.
*/ */
public List<T> resources() { public List<T> activeResources() {
return resources; return activeResources;
}

/**
* Returns a list of passive cluster resources.
*
* @return A list of passive cluster resources.
*/
public List<T> passiveResources() {
return passiveResources;
} }


/** /**
* Opens all resources. * Opens all resources.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Void> open() { public CompletableFuture<Void> open() {
CompletableFuture<Void>[] futures = new CompletableFuture[resources.size()]; CompletableFuture<Void>[] futures = new CompletableFuture[activeResources.size() + passiveResources.size()];
for (int i = 0; i < resources.size(); i++) { int i = 0;
T resource = resources.get(i); for (T resource : activeResources) {
futures[i] = resources.get(i).open().thenRun(() -> System.out.println(resource.cluster().member().uri() + " started successfully!")).thenApply(v -> null); futures[i++] = resource.open().thenRun(() -> System.out.println(resource.cluster().member().uri() + " started successfully!")).thenApply(v -> null);
}
for (T resource : passiveResources) {
futures[i++] = resource.open().thenRun(() -> System.out.println(resource.cluster().member().uri() + " started successfully!")).thenApply(v -> null);
} }
return CompletableFuture.allOf(futures); return CompletableFuture.allOf(futures);
} }
Expand All @@ -73,9 +93,13 @@ public CompletableFuture<Void> open() {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
CompletableFuture<Void>[] futures = new CompletableFuture[resources.size()]; CompletableFuture<Void>[] futures = new CompletableFuture[activeResources.size() + passiveResources.size()];
for (int i = 0; i < resources.size(); i++) { int i = 0;
futures[i] = resources.get(i).close(); for (T resource : passiveResources) {
futures[i++] = resource.close();
}
for (T resource : activeResources) {
futures[i++] = resource.close();
} }
return CompletableFuture.allOf(futures); return CompletableFuture.allOf(futures);
} }
Expand Down

0 comments on commit 1486476

Please sign in to comment.