Skip to content

Commit

Permalink
Ensure startup and shutdown hooks run properly on wrapped resources s…
Browse files Browse the repository at this point in the history
…uch as state machine and data structures.
  • Loading branch information
kuujo committed Jan 10, 2015
1 parent 5a3ff81 commit 89fdb63
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 40 deletions.
Expand Up @@ -111,15 +111,18 @@ public CompletableFuture<Void> clear() {
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<S> open() {
return stateMachine.open().thenRun(() -> {
proxy = stateMachine.createProxy(proxyClass);
}).thenApply(v -> (S) this);
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
this.proxy = stateMachine.createProxy(proxyClass);
}).thenApply(v -> (S) this);
}

@Override
public CompletableFuture<Void> close() {
proxy = null;
return stateMachine.close();
return stateMachine.close()
.thenCompose(v -> runShutdownTasks());
}

}
Expand Up @@ -65,15 +65,18 @@ public CompletableFuture<Void> unlock() {

@Override
public CompletableFuture<AsyncLock> open() {
return stateMachine.open().thenRun(() -> {
this.proxy = stateMachine.createProxy(AsyncLockProxy.class);
}).thenApply(v -> this);
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
this.proxy = stateMachine.createProxy(AsyncLockProxy.class);
}).thenApply(v -> this);
}

@Override
public CompletableFuture<Void> close() {
proxy = null;
return stateMachine.close();
return stateMachine.close()
.thenCompose(v -> runShutdownTasks());
}

}
Expand Up @@ -174,15 +174,19 @@ public CompletableFuture<V> merge(K key, V value, BiFunction<? super V, ? super
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<AsyncMap<K, V>> open() {
return stateMachine.open().thenRun(() -> {
this.proxy = stateMachine.createProxy(AsyncMapProxy.class);
}).thenApply(v -> this);
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
this.proxy = stateMachine.createProxy(AsyncMapProxy.class);
})
.thenApply(v -> null);
}

@Override
public CompletableFuture<Void> close() {
proxy = null;
return stateMachine.close();
return stateMachine.close()
.thenCompose(v -> runShutdownTasks());
}

}
Expand Up @@ -153,15 +153,18 @@ public CompletableFuture<Collection<V>> replace(K key, Collection<V> value) {
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<AsyncMultiMap<K, V>> open() {
return stateMachine.open().thenRun(() -> {
this.proxy = stateMachine.createProxy(AsyncMultiMapProxy.class);
}).thenApply(v -> this);
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
this.proxy = stateMachine.createProxy(AsyncMultiMapProxy.class);
}).thenApply(v -> this);
}

@Override
public CompletableFuture<Void> close() {
proxy = null;
return stateMachine.close();
return stateMachine.close()
.thenCompose(v -> runShutdownTasks());
}

}
@@ -0,0 +1,98 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.collections;

import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.protocol.LocalProtocol;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Asynchronous map test.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
@Test
public class AsyncMapTest extends ConcurrentTestCase {

public static class TestCluster<T extends Resource<T>> {
private final List<T> resources;

private TestCluster(List<T> resources) {
this.resources = resources;
}

/**
* Creates a test cluster for the given resource factory.
*/
public static <T extends Resource<T>> TestCluster<T> of(BiFunction<String, ClusterConfig, T> factory) {
ClusterConfig cluster = new ClusterConfig()
.withProtocol(new LocalProtocol());
for (int i = 1; i <= 5; i++) {
cluster.addMember(String.format("local://%d", i));
}
return new TestCluster<T>(cluster.getMembers().stream().collect(Collectors.mapping(uri -> factory.apply(uri, cluster), Collectors.toList())));
}

/**
* Returns a list of cluster resources.
*/
public List<T> resources() {
return resources;
}

/**
* Opens all resources.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Void> open() {
CompletableFuture<Void>[] futures = new CompletableFuture[resources.size()];
for (int i = 0; i < resources.size(); i++) {
futures[i] = resources.get(i).open().thenApply(v -> null);
}
return CompletableFuture.allOf(futures);
}

/**
* Closes all resources.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Void> close() {
CompletableFuture<Void>[] futures = new CompletableFuture[resources.size()];
for (int i = 0; i < resources.size(); i++) {
futures[i] = resources.get(i).close();
}
return CompletableFuture.allOf(futures);
}
}

/**
* Tests putting a value in an asynchronous map.
*/
public void testAsyncMapPut() throws Throwable {
TestCluster<AsyncMap<String, String>> cluster = TestCluster.of((uri, config) -> AsyncMap.create("test", uri, config));
cluster.open().thenRun(this::resume);
await(5000);
}

}
42 changes: 20 additions & 22 deletions core/src/main/java/net/kuujo/copycat/internal/AbstractResource.java
Expand Up @@ -49,6 +49,11 @@ protected AbstractResource(ResourceContext context) {
this.executor = context.config().getExecutor() != null ? context.config().getExecutor() : Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-" + context.name() + "-%d"));
}

@Override
public String name() {
return context.name();
}

@Override
public Cluster cluster() {
return context.cluster();
Expand All @@ -61,46 +66,39 @@ public CopycatState state() {

@Override
@SuppressWarnings("unchecked")
public synchronized T withStartupTask(Task<CompletableFuture<Void>> task) {
public T withStartupTask(Task<CompletableFuture<Void>> task) {
startupTasks.add(task);
return (T) this;
}

/**
* Runs the resource's startup tasks.
*/
@SuppressWarnings("all")
protected final CompletableFuture<Void> runStartupTasks() {
return CompletableFuture.allOf(startupTasks.stream().map(t -> t.execute()).toArray(size -> new CompletableFuture[size]));
}

@Override
@SuppressWarnings("unchecked")
public synchronized T withShutdownTask(Task<CompletableFuture<Void>> task) {
public T withShutdownTask(Task<CompletableFuture<Void>> task) {
shutdownTasks.add(task);
return (T) this;
}

@Override
public String name() {
return context.name();
}

@Override
/**
* Runs the resource's startup tasks.
*/
@SuppressWarnings("all")
public synchronized CompletableFuture<T> open() {
if (!context.isOpen()) {
return CompletableFuture.allOf(startupTasks.stream().map(t -> t.execute()).toArray(size -> new CompletableFuture[size]))
.thenCompose(v -> context.open())
.thenApply(v -> (T) this);
}
return CompletableFuture.completedFuture(null);
protected final CompletableFuture<Void> runShutdownTasks() {
return CompletableFuture.allOf(shutdownTasks.stream().map(t -> t.execute()).toArray(size -> new CompletableFuture[size]));
}

@Override
public boolean isOpen() {
return context.isOpen();
}

@Override
@SuppressWarnings("all")
public synchronized CompletableFuture<Void> close() {
return context.close()
.thenCompose(v -> CompletableFuture.allOf(shutdownTasks.stream().map(t -> t.execute()).toArray(size -> new CompletableFuture[size])));
}

@Override
public boolean isClosed() {
return context.isClosed();
Expand Down
Expand Up @@ -78,4 +78,17 @@ private ByteBuffer consume(Long index, ByteBuffer entry) {
return result;
}

@Override
public CompletableFuture<EventLog<T>> open() {
return runStartupTasks()
.thenComposeAsync(v -> context.open(), executor)
.thenApply(v -> this);
}

@Override
public CompletableFuture<Void> close() {
return context.close()
.thenCompose(v -> runShutdownTasks());
}

}
Expand Up @@ -24,6 +24,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Default leader election implementation.
Expand Down Expand Up @@ -56,4 +57,17 @@ public synchronized LeaderElection removeListener(EventListener<Member> listener
return this;
}

@Override
public CompletableFuture<LeaderElection> open() {
return runStartupTasks()
.thenCompose(v -> context.open())
.thenApply(v -> this);
}

@Override
public CompletableFuture<Void> close() {
return context.close()
.thenCompose(v -> runShutdownTasks());
}

}
Expand Up @@ -134,6 +134,19 @@ public <U> CompletableFuture<U> submit(String command, T entry) {
}
}

@Override
public CompletableFuture<StateLog<T>> open() {
return runStartupTasks()
.thenComposeAsync(v -> context.open(), executor)
.thenApply(v -> this);
}

@Override
public CompletableFuture<Void> close() {
return context.close()
.thenComposeAsync(v -> runShutdownTasks(), executor);
}

/**
* Consumes a log entry.
*
Expand Down
Expand Up @@ -138,7 +138,9 @@ private void install(Map<String, Object> snapshot) {
public synchronized CompletableFuture<StateMachine<T>> open() {
log.snapshotWith(this::snapshot);
log.installWith(this::install);
return log.open().thenApply(v -> this);
return runStartupTasks()
.thenComposeAsync(v -> log.open(), executor)
.thenApply(v -> this);
}

@Override
Expand All @@ -151,7 +153,7 @@ public synchronized CompletableFuture<Void> close() {
return log.close().whenComplete((result, error) -> {
log.snapshotWith(null);
log.installWith(null);
});
}).thenComposeAsync(v -> runShutdownTasks(), executor);
}

@Override
Expand Down

0 comments on commit 89fdb63

Please sign in to comment.