diff --git a/README.md b/README.md index ce9a662bb1..7f69128e47 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ participate in replication of logs via Raft, passive members The following image demonstrates the relationship between active and passive members in the Copycat cluster: -![Copycat cluster](http://s8.postimg.org/5dm2xzbz9/Copycat_Cluster_New_Page.png) +![Copycat cluster](http://s23.postimg.org/i8xyzg2ez/Copycat_Cluster_New_Page_2.png) Active members participate in synchronous log replication via the Raft consensus protocol and ultimately gossip committed log entries to passive members, while passive members gossip among each other. @@ -145,8 +145,6 @@ different partitions to different members in the cluster. The following image depicts the partitioning of resources across the Copycat cluster: -![Copycat resources](http://s15.postimg.org/56oyaa7cr/Copycat_Resources_New_Page.png) - Each [resource](#resources) in the cluster has its own related logical `Cluster` through which it communicates with other members of the [resource's cluster](#resource-clusters). Just as each resource performs replication for its associated [log](#logs), so too does each resource cluster perform leader elections independently of other resources @@ -232,9 +230,14 @@ For more about active members see the section on [cluster members](#active-membe ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new VertxTcpProtocol()) + .withLocalMember("tcp://123.456.789.0:1234") .withMembers("tcp://123.456.789.0:1234", "tcp://123.456.789.1:1234", "tcp://123.456.789.2:1234"); ``` +Note that the cluster's `local-member` attribute *must be defined*, but it does not have to be represented in the list +of configured active members. If the local member is not in the active members list, the node will join the cluster as +a *passive* member. + Alternatively, users can configure the cluster by overriding the default cluster configuration with a `cluster.conf` [Typesafe configuration](https://github.com/typesafehub/config) file: @@ -265,9 +268,9 @@ tolerate one failure, a cluster of five active members can tolerate two failures ### Creating a Copycat instance To create a `Copycat` instance, call one of the overloaded `Copycat.create()` methods: -* `Copycat.create(String uri)` - creates a Copycat instance with the file-based cluster configuration -* `Copycat.create(String uri, ClusterConfig cluster)` - creates a Copycat instance with a custom cluster configuration -* `Copycat.create(String uri, CopycatConfig config)` - create a Copycat instance with a custom configuration +* `Copycat.create()` - creates a Copycat instance with the file-based cluster configuration +* `Copycat.create(ClusterConfig cluster)` - creates a Copycat instance with a custom cluster configuration +* `Copycat.create(CopycatConfig config)` - create a Copycat instance with a custom configuration Note that the first argument to any `Copycat.create()` method is a `uri`. This is the protocol specific URI of the *local* member, and it may or may not be a member defined in the provided `ClusterConfig`. This is because Copycat @@ -275,7 +278,7 @@ actually supports eventually consistent replication for clusters much larger tha of *active* members defined in the cluster configuration. ```java -Copycat copycat = Copycat.create("tcp://123.456.789.3", cluster); +Copycat copycat = Copycat.create(cluster); ``` When a `Copycat` instance is constructed, a central replicated state machine is created for the entire Copycat cluster. @@ -391,6 +394,7 @@ only three replicas, writes will only need to be persisted on two nodes in order CopycatConfig config = new CopycatConfig() .withClusterConfig(new ClusterConfig() .withProtocol(new NettyTcpProtocol()) + .withLocalMember("tcp://123.456.789.1:1234") .withMembers( "tcp://123.456.789.0:1234", "tcp://123.456.789.1:1234", @@ -399,7 +403,7 @@ CopycatConfig config = new CopycatConfig() "tcp://123.456.789.4:1234" )); -Copycat copycat = Copycat.create("tcp://123.456.789.1:1234", config).open().get(); +Copycat copycat = Copycat.create(config).open().get(); EventLogConfig eventLogConfig = new EventLogConfig() .withSerializer(KryoSerializer.class) @@ -410,6 +414,8 @@ EventLogConfig eventLogConfig = new EventLogConfig() EventLog eventLog = copycat.eventLog("event-log", eventLogConfig); ``` +If replicas are defined for the resource and the local cluster member is not specified in the list of resource members, +the local resource will join the clustered resource as a *passive* member of the resource. #### Resource replicas @@ -591,7 +597,7 @@ of the resource-specific methods on the `Copycat` instance: * `AsyncLock lock(String name)` ```java -Copycat copycat = Copycat.create("tcp://123.456.789.0", config); +Copycat copycat = Copycat.create(config); copycat.open().thenRun(() -> { StateMachine stateMachine = copycat.stateMachine("test"); @@ -610,7 +616,7 @@ Java 8's `CompletableFuture` framework is extremely powerful. For instance, the as so: ```java -Copycat copycat = Copycat.create("tcp://123.456.789.0", config); +Copycat copycat = Copycat.create(config); copycat.open() .thenCompose(copycat.stateMachine("test").open()) @@ -698,7 +704,7 @@ CopycatConfig config = new CopycatConfig() .withStateType(Map.class) .withInitialState(DefaultMapState.class)); -Copycat copycat = Copycat.create("tcp://123.456.789.0", config).open().get(); +Copycat copycat = Copycat.create(config).open().get(); StateMachine> stateMachine = copycat.stateMachine("map").open().get(); ``` @@ -731,7 +737,7 @@ StateMachineConfig config = new StateMachineConfig() .withStateType(Map.class) .withInitialState(DefaultMapState.class)' -StateMachine> stateMachine = StateMachine.create("tcp://123.456.789.0", cluster, config).open().get(); +StateMachine> stateMachine = StateMachine.create("my-state-machine", cluster, config).open().get(); stateMachine.submit("get", "foo").get(); ``` @@ -996,13 +1002,14 @@ To create an event log, call the `eventLog` method on the `Copycat` instance. ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) + .withLocalMember("tcp://123.456.789.0:5000") .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addEventLogConfig("event-log", new EventLogConfig()); -Copycat copycat = Copycat.create("tcp://123.456.789.0:5000", config); +Copycat copycat = Copycat.create(config); copycat.open().thenRun(() -> { copycat.eventLog("event-log").open().thenAccept(eventLog -> { @@ -1027,12 +1034,13 @@ The `EventLog` interface exposes a static method for creating a standalone event ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) + .withLocalMember("tcp:/123.456.789.0:5000") .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); EventLogConfig config = new EventLogConfig() .withLog(new FileLog()); -EventLog eventLog = EventLog.create("tcp:/123.456.789.0", cluster, config); +EventLog eventLog = EventLog.create("my-event-log", cluster, config); ``` When a standalone event log is created via the `EventLog.create` static factory method, a `ClusterCoordinator` is @@ -1141,14 +1149,15 @@ To create a state log, call the `stateLog` method on the `Copycat` instance. ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addStateLogConfig("state-log", new StateLogConfig() .withLog(new FileLog()); -Copycat copycat = Copycat.create("tcp://123.456.789.0", config); +Copycat copycat = Copycat.create(config); copycat.open().thenRun(() -> { copycat.stateLog("state-log").open().thenAccept(stateLog -> { @@ -1162,13 +1171,14 @@ The `StateLog` interface exposes a static method for creating a standalone state ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) + .withLocalMember("tcp:/123.456.789.0:5000") .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); StateLogConfig config = new StateLogConfig() .withLog(new FileLog() .withFlushOnWrite(true)); -StateLog stateLog = StateLog.create("tcp:/123.456.789.0", cluster, config); +StateLog stateLog = StateLog.create("my-state-log", cluster, config); ``` When a standalone state log is created via the `StateLog.create` static factory method, a `ClusterCoordinator` is @@ -1314,7 +1324,7 @@ Once the leader election has been defined as a cluster resource, create a new `C election. To register a handler to be notified once a node has become leader, use the `addListener` method. ```java -Copycat copycat = Copycat.create("tcp://123.456.789.0", config); +Copycat copycat = Copycat.create(config); copycat.open() .thenCompose(c -> c.leaderElection("election").open()) @@ -1338,6 +1348,7 @@ The `LeaderElection` interface exposes a static factory method for creating stan ```java ClusterConfig cluster = new ClusterConfig() + .withLocalMember("tcp://123.456.789.0:5000") .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); LeaderElection.create("election", cluster).open().thenAccept(election -> { @@ -1398,14 +1409,15 @@ To create a map via the `Copycat` API, use the `map` method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addMapConfig("test-map", new AsyncMapConfig() .withConsistency(Consistency.STRONG)); -Copycat.copycat("tcp://123.456.789.0", config).open() +Copycat.copycat(config).open() .thenApply(copycat -> copycat.map("test-map")) .thenCompose(map -> map.open()) .thenAccept(map -> { @@ -1420,12 +1432,13 @@ To create a map directly, use the `AsyncMap.create` factory method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); AsyncMapConfig config = new AsyncMapConfig() .withConsistency(Consistency.STRONG); -AsyncMap.create("tcp://123.456.789.0", cluster, config).open().thenAccept(map -> { +AsyncMap.create("my-map", cluster, config).open().thenAccept(map -> { map.put("foo", "Hello world!").thenRun(() -> { map.get("foo").thenAccept(result -> System.out.println(result)); }); @@ -1442,14 +1455,15 @@ To create a list via the `Copycat` API, use the `list` method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addListConfig("test-list", new AsyncListConfig() .withConsistency(Consistency.STRONG)); -Copycat.copycat("tcp://123.456.789.0", config).open() +Copycat.copycat(config).open() .thenApply(copycat -> copycat.list("test-list")) .thenCompose(list -> list.open()) .thenAccept(list -> { @@ -1464,12 +1478,13 @@ To create a list directly, use the `AsyncList.create` factory method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); AsyncListConfig config = new AsyncListConfig() .withConsistency(Consistency.STRONG); -AsyncList.create("tcp://123.456.789.0", cluster, config).open().thenAccept(list -> { +AsyncList.create("my-list", cluster, config).open().thenAccept(list -> { list.add("Hello world!").thenRun(() -> { list.get(0).thenAccept(result -> System.out.println(result)); }); @@ -1486,14 +1501,15 @@ To create a set via the `Copycat` API, use the `set` method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addSetConfig("test-set", new AsyncSetConfig() .withConsistency(Consistency.STRONG)); -Copycat.copycat("tcp://123.456.789.0", config).open() +Copycat.copycat(config).open() .thenApply(copycat -> copycat.set("test-set")) .thenCompose(set -> set.open()) .thenAccept(set -> { @@ -1508,12 +1524,13 @@ To create a set directly, use the `AsyncSet.create` factory method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); AsyncSetConfig config = new AsyncSetConfig() .withConsistency(Consistency.STRONG); -AsyncSet.create("tcp://123.456.789.0", cluster, config).open().thenAccept(set -> { +AsyncSet.create("my-set", cluster, config).open().thenAccept(set -> { set.add("Hello world!").thenRun(() -> { set.get(0).thenAccept(result -> System.out.println(result)); }); @@ -1530,14 +1547,15 @@ To create a multimap via the `Copycat` API, use the `multiMap` method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addMultiMapConfig("test-multimap", new AsyncMultiMapConfig() .withConsistency(Consistency.STRONG)); -Copycat.copycat("tcp://123.456.789.0", config).open() +Copycat.copycat(config).open() .thenApply(copycat -> copycat.multiMap("test-multimap")) .thenCompose(multiMap -> multiMap.open()) .thenAccept(multiMap -> { @@ -1552,12 +1570,13 @@ To create a multimap directly, use the `AsyncMultiMap.create` factory method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); AsyncMultiMapConfig config = new AsyncMultiMapConfig() .withConsistency(Consistency.STRONG); -AsyncMultiMap.create("tcp://123.456.789.0", cluster, config).open().thenAccept(multiMap -> { +AsyncMultiMap.create("my-map", cluster, config).open().thenAccept(multiMap -> { multiMap.put("foo", "Hello world!").thenRun(() -> { multiMap.get("foo").thenAccept(result -> System.out.println(result)); }); @@ -1574,14 +1593,15 @@ To create a lock via the `Copycat` API, use the `lock` method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addLockConfig("test-lock", new AsyncLockConfig() .withConsistency(Consistency.STRONG)); -Copycat.copycat("tcp://123.456.789.0", config).open() +Copycat.copycat(config).open() .thenApply(copycat -> copycat.lock("test-lock")) .thenCompose(lock -> lock.open()) .thenAccept(lock -> { @@ -1597,12 +1617,13 @@ To create a lock directly, use the `AsyncLock.create` factory method: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) - .withMembers("tcp://123.456.789.0", "tcp://123.456.789.1", "tcp://123.456.789.2"); + .withLocalMember("tcp://123.456.789.0:5000") + .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); AsyncLockConfig config = new AsyncLockConfig() .withConsistency(Consistency.STRONG); -AsyncLock.create("tcp://123.456.789.0", cluster, config).open().thenAccept(lock -> { +AsyncLock.create("my-lock", cluster, config).open().thenAccept(lock -> { lock.lock().thenRun(() -> { System.out.println("Lock locked"); lock.unlock().thenRun(() -> System.out.println("Lock unlocked"); @@ -1635,16 +1656,17 @@ members of the Copycat cluster. Active members are defined for each Copycat inst ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) + .withLocalMember("tcp://123.456.789.0:5000") .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addResourceConfig(...); -Copycat copycat = Copycat.create("tcp://123.456.789.0:5000", config); +Copycat copycat = Copycat.create(config); ``` -By creating a Copycat instance with a URI that is defined as an active member in the cluster configuration, Copycat +By creating a Copycat instance with a local URI that is defined as an active member in the cluster configuration, Copycat knows that this is an active member of the cluster: ```java @@ -1679,17 +1701,18 @@ that the member is passive and should thus receive replicated logs passively: ```java ClusterConfig cluster = new ClusterConfig() .withProtocol(new NettyTcpProtocol()) + .withLocalMember("tcp://123.456.789.4:5000") .withMembers("tcp://123.456.789.0:5000", "tcp://123.456.789.1:5000", "tcp://123.456.789.2:5000"); CopycatConfig config = new CopycatConfig() .withClusterConfig(cluster) .addResourceConfig(...); -Copycat copycat = Copycat.create("tcp://123.456.789.4:5000", config); +Copycat copycat = Copycat.create(config); ``` -By simply passing a URI that is not defined as an active member of the Copycat cluster, the Copycat instance becomes -a passive member of the cluster: +By simply defining a local URI that is not defined as an active member of the Copycat cluster, the Copycat instance +becomes a passive member of the cluster: ```java assert copycat.cluster().member().type() == Member.Type.PASSIVE; diff --git a/api/src/main/java/net/kuujo/copycat/Copycat.java b/api/src/main/java/net/kuujo/copycat/Copycat.java index 12909fa28d..34a4f199da 100644 --- a/api/src/main/java/net/kuujo/copycat/Copycat.java +++ b/api/src/main/java/net/kuujo/copycat/Copycat.java @@ -39,33 +39,30 @@ public interface Copycat extends Managed { /** * Creates a new Copycat instance with the default Copycat and cluster configuration. * - * @param uri The local member URI. * @return The Copycat instance. */ - static Copycat create(String uri) { - return create(uri, new CopycatConfig().withClusterConfig(new ClusterConfig())); + static Copycat create() { + return create(new CopycatConfig().withClusterConfig(new ClusterConfig())); } /** * Creates a new Copycat instance, overriding the default cluster configuration. * - * @param uri The local member URI. * @param cluster The global cluster configuration. * @return The Copycat instance. */ - static Copycat create(String uri, ClusterConfig cluster) { - return create(uri, new CopycatConfig().withClusterConfig(cluster)); + static Copycat create(ClusterConfig cluster) { + return create(new CopycatConfig().withClusterConfig(cluster)); } /** * Creates a new Copycat instance. * - * @param uri The local member URI. * @param config The global Copycat configuration. * @return The Copycat instance. */ - static Copycat create(String uri, CopycatConfig config) { - return new DefaultCopycat(uri, config); + static Copycat create(CopycatConfig config) { + return new DefaultCopycat(config); } /** diff --git a/api/src/main/java/net/kuujo/copycat/CopycatConfig.java b/api/src/main/java/net/kuujo/copycat/CopycatConfig.java index ea2a0bdcb9..02f8cd8d11 100644 --- a/api/src/main/java/net/kuujo/copycat/CopycatConfig.java +++ b/api/src/main/java/net/kuujo/copycat/CopycatConfig.java @@ -37,7 +37,6 @@ public class CopycatConfig extends AbstractConfigurable { public static final String COPYCAT_NAME = "name"; public static final String COPYCAT_DEFAULT_SERIALIZER = "serializer"; - public static final String COPYCAT_DEFAULT_EXECUTOR = "executor"; public static final String COPYCAT_CLUSTER = "cluster"; private static final String DEFAULT_CONFIGURATION = "copycat-default"; @@ -253,4 +252,19 @@ public CoordinatorConfig resolve() { .withClusterConfig(getClusterConfig()); } + @Override + public String toString() { + return String.format("%s[%s]", getClass().getSimpleName(), config.root().unwrapped()); + } + + @Override + public boolean equals(Object object) { + return object instanceof CopycatConfig && ((CopycatConfig) object).config.equals(config); + } + + @Override + public int hashCode() { + return 17 * config.root().unwrapped().hashCode(); + } + } diff --git a/api/src/main/java/net/kuujo/copycat/internal/DefaultCopycat.java b/api/src/main/java/net/kuujo/copycat/internal/DefaultCopycat.java index 01c310bd3e..0dd99b9724 100644 --- a/api/src/main/java/net/kuujo/copycat/internal/DefaultCopycat.java +++ b/api/src/main/java/net/kuujo/copycat/internal/DefaultCopycat.java @@ -41,8 +41,8 @@ public class DefaultCopycat implements Copycat { private final ClusterCoordinator coordinator; private final CopycatConfig config; - public DefaultCopycat(String uri, CopycatConfig config) { - this.coordinator = new DefaultClusterCoordinator(uri, config.resolve()); + public DefaultCopycat(CopycatConfig config) { + this.coordinator = new DefaultClusterCoordinator(config.resolve()); this.config = config; } diff --git a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBoolean.java b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBoolean.java index f4c3da4e64..5595e030aa 100644 --- a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBoolean.java +++ b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBoolean.java @@ -42,11 +42,10 @@ public interface AsyncAtomicBoolean extends AsyncAtomicBooleanProxy, Resource coordinator.open().thenApply(v -> null)); - ((Resource) reference).addShutdownTask(coordinator::close); - return reference; + static AsyncAtomicBoolean create(String name, ClusterConfig cluster, AsyncAtomicBooleanConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + return coordinator.getResource(name, config.resolve(cluster)) + .addStartupTask(() -> coordinator.open().thenApply(v -> null)) + .addShutdownTask(coordinator::close); } } diff --git a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBooleanConfig.java b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBooleanConfig.java index 7ffb360ce2..25a4b20137 100644 --- a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBooleanConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicBooleanConfig.java @@ -20,6 +20,7 @@ import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; import net.kuujo.copycat.collections.AsyncCollectionConfig; import net.kuujo.copycat.state.StateLogConfig; +import net.kuujo.copycat.util.internal.Assert; import java.util.Map; @@ -55,6 +56,7 @@ public AsyncAtomicBooleanConfig copy() { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncAtomicBoolean.class); diff --git a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLong.java b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLong.java index 6272d9064c..2b6ec87b31 100644 --- a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLong.java +++ b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLong.java @@ -42,11 +42,10 @@ public interface AsyncAtomicLong extends AsyncAtomicLongProxy, Resource coordinator.open().thenApply(v -> null)); - ((Resource) reference).addShutdownTask(coordinator::close); - return reference; + static AsyncAtomicLong create(String name, ClusterConfig cluster, AsyncAtomicLongConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + return coordinator.getResource(name, config.resolve(cluster)) + .addStartupTask(() -> coordinator.open().thenApply(v -> null)) + .addShutdownTask(coordinator::close); } } diff --git a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLongConfig.java b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLongConfig.java index 7513c421f0..84f9e52cdf 100644 --- a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLongConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicLongConfig.java @@ -20,6 +20,7 @@ import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; import net.kuujo.copycat.collections.AsyncCollectionConfig; import net.kuujo.copycat.state.StateLogConfig; +import net.kuujo.copycat.util.internal.Assert; import java.util.Map; @@ -55,6 +56,7 @@ public AsyncAtomicLongConfig copy() { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncAtomicLong.class); diff --git a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReference.java b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReference.java index 9238354595..ec68e87470 100644 --- a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReference.java +++ b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReference.java @@ -42,12 +42,11 @@ public interface AsyncAtomicReference extends AsyncAtomicReferenceProxy, R * configurations will be loaded according to namespaces as well; for example, `references.conf`. * * @param name The asynchronous atomic reference name. - * @param uri The asynchronous atomic reference member URI. * @param The atomic reference data type. * @return The asynchronous atomic reference. */ - static AsyncAtomicReference create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new AsyncAtomicReferenceConfig()); + static AsyncAtomicReference create(String name) { + return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncAtomicReferenceConfig(name)); } /** @@ -60,32 +59,29 @@ static AsyncAtomicReference create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `references.conf`. * * @param name The asynchronous atomic reference name. - * @param uri The asynchronous atomic reference member URI. * @param cluster The cluster configuration. * @param The atomic reference data type. * @return The asynchronous atomic reference. */ - static AsyncAtomicReference create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new AsyncAtomicReferenceConfig()); + static AsyncAtomicReference create(String name, ClusterConfig cluster) { + return create(name, cluster, new AsyncAtomicReferenceConfig(name)); } /** * Creates a new asynchronous atomic reference. * * @param name The asynchronous atomic reference name. - * @param uri The asynchronous atomic reference member URI. * @param cluster The cluster configuration. * @param config The atomic reference configuration. * @param The atomic reference data type. * @return The asynchronous atomic reference. */ @SuppressWarnings({"unchecked", "rawtypes"}) - static AsyncAtomicReference create(String name, String uri, ClusterConfig cluster, AsyncAtomicReferenceConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); - AsyncAtomicReference reference = coordinator.getResource(name, config.resolve(cluster)); - ((Resource) reference).addStartupTask(() -> coordinator.open().thenApply(v -> null)); - ((Resource) reference).addShutdownTask(coordinator::close); - return reference; + static AsyncAtomicReference create(String name, ClusterConfig cluster, AsyncAtomicReferenceConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + return coordinator.>getResource(name, config.resolve(cluster)) + .addStartupTask(() -> coordinator.open().thenApply(v -> null)) + .addShutdownTask(coordinator::close); } } diff --git a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReferenceConfig.java b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReferenceConfig.java index b2586d673e..cabdce1240 100644 --- a/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReferenceConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/atomic/AsyncAtomicReferenceConfig.java @@ -20,6 +20,7 @@ import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; import net.kuujo.copycat.collections.AsyncCollectionConfig; import net.kuujo.copycat.state.StateLogConfig; +import net.kuujo.copycat.util.internal.Assert; import java.util.Map; @@ -55,6 +56,7 @@ public AsyncAtomicReferenceConfig copy() { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncAtomicReference.class); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncList.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncList.java index de203318d1..8eb5b3cbb5 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncList.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncList.java @@ -19,7 +19,6 @@ import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig; import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator; -import net.kuujo.copycat.resource.internal.AbstractResource; /** * Asynchronous list. @@ -44,12 +43,11 @@ public interface AsyncList extends AsyncCollection, T>, AsyncLis * configurations will be loaded according to namespaces as well; for example, `lists.conf`. * * @param name The asynchronous list name. - * @param uri The asynchronous list member URI. * @param The list data type. * @return The asynchronous list. */ - static AsyncList create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new AsyncListConfig()); + static AsyncList create(String name) { + return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncListConfig(name)); } /** @@ -62,32 +60,29 @@ static AsyncList create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `lists.conf`. * * @param name The asynchronous list name. - * @param uri The asynchronous list member URI. * @param cluster The cluster configuration. * @param The list data type. * @return The asynchronous list. */ - static AsyncList create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new AsyncListConfig()); + static AsyncList create(String name, ClusterConfig cluster) { + return create(name, cluster, new AsyncListConfig(name)); } /** * Creates a new asynchronous list. * * @param name The asynchronous list name. - * @param uri The asynchronous list member URI. * @param cluster The cluster configuration. * @param config The list configuration. * @param The list data type. * @return The asynchronous list. */ @SuppressWarnings({"unchecked", "rawtypes"}) - static AsyncList create(String name, String uri, ClusterConfig cluster, AsyncListConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); - AsyncList list = coordinator.getResource(name, config.resolve(cluster)); - ((AbstractResource) list).addStartupTask(() -> coordinator.open().thenApply(v -> null)); - ((AbstractResource) list).addShutdownTask(coordinator::close); - return list; + static AsyncList create(String name, ClusterConfig cluster, AsyncListConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + return coordinator.>getResource(name, config.resolve(cluster)) + .addStartupTask(() -> coordinator.open().thenApply(v -> null)) + .addShutdownTask(coordinator::close); } } diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncListConfig.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncListConfig.java index e94c1ae229..b0ca396d6e 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncListConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncListConfig.java @@ -19,6 +19,7 @@ import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; import net.kuujo.copycat.collections.internal.collection.DefaultAsyncList; import net.kuujo.copycat.state.StateLogConfig; +import net.kuujo.copycat.util.internal.Assert; import java.util.Map; @@ -54,6 +55,7 @@ public AsyncListConfig copy() { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncList.class); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncLock.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncLock.java index fd6553759e..a46aea4db2 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncLock.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncLock.java @@ -43,11 +43,10 @@ public interface AsyncLock extends Resource { * configurations will be loaded according to namespaces as well; for example, `locks.conf`. * * @param name The asynchronous lock name. - * @param uri The asynchronous lock member URI. * @return The asynchronous lock. */ - static AsyncLock create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new AsyncLockConfig()); + static AsyncLock create(String name) { + return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncLockConfig(name)); } /** @@ -60,25 +59,23 @@ static AsyncLock create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `locks.conf`. * * @param name The asynchronous lock name. - * @param uri The asynchronous lock member URI. * @param cluster The cluster configuration. * @return The asynchronous lock. */ - static AsyncLock create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new AsyncLockConfig()); + static AsyncLock create(String name, ClusterConfig cluster) { + return create(name, cluster, new AsyncLockConfig(name)); } /** * Creates a new asynchronous lock. * * @param name The asynchronous lock name. - * @param uri The asynchronous lock member URI. * @param cluster The cluster configuration. * @param config The lock configuration. * @return The asynchronous lock. */ - static AsyncLock create(String name, String uri, ClusterConfig cluster, AsyncLockConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static AsyncLock create(String name, ClusterConfig cluster, AsyncLockConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncLockConfig.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncLockConfig.java index f4b71bd5fe..ca62529396 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncLockConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncLockConfig.java @@ -20,6 +20,7 @@ import net.kuujo.copycat.collections.internal.lock.DefaultAsyncLock; import net.kuujo.copycat.resource.ResourceConfig; import net.kuujo.copycat.state.StateLogConfig; +import net.kuujo.copycat.util.internal.Assert; import java.util.Map; @@ -55,6 +56,7 @@ public AsyncLockConfig copy() { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncLock.class); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMap.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMap.java index 10b2e570c1..c852f84056 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMap.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMap.java @@ -45,13 +45,12 @@ public interface AsyncMap extends AsyncMapProxy, Resource The map key type. * @param The map value type. * @return The asynchronous map. */ - static AsyncMap create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new AsyncMapConfig()); + static AsyncMap create(String name) { + return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncMapConfig(name)); } /** @@ -64,29 +63,27 @@ static AsyncMap create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `maps.conf`. * * @param name The asynchronous map name. - * @param uri The asynchronous map member URI. * @param cluster The cluster configuration. * @param The map key type. * @param The map value type. * @return The asynchronous map. */ - static AsyncMap create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new AsyncMapConfig()); + static AsyncMap create(String name, ClusterConfig cluster) { + return create(name, cluster, new AsyncMapConfig(name)); } /** * Creates a new asynchronous map. * * @param name The asynchronous map name. - * @param uri The asynchronous map member URI. * @param cluster The cluster configuration. * @param config The map configuration. * @param The map key type. * @param The map value type. * @return The asynchronous map. */ - static AsyncMap create(String name, String uri, ClusterConfig cluster, AsyncMapConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static AsyncMap create(String name, ClusterConfig cluster, AsyncMapConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.>getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMapConfig.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMapConfig.java index e7d06aa702..a01066b094 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMapConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMapConfig.java @@ -113,6 +113,7 @@ public AsyncMapConfig withConsistency(Consistency consistency) { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncMap.class); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMap.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMap.java index 92bde3fe0e..0d9f56a1e9 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMap.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMap.java @@ -45,13 +45,12 @@ public interface AsyncMultiMap extends AsyncMultiMapProxy, Resource< * configurations will be loaded according to namespaces as well; for example, `multimaps.conf`. * * @param name The asynchronous multimap name. - * @param uri The asynchronous multimap member URI. * @param The multimap key type. * @param The multimap value type. * @return The asynchronous multimap. */ - static AsyncMultiMap create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new AsyncMultiMapConfig()); + static AsyncMultiMap create(String name) { + return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncMultiMapConfig(name)); } /** @@ -64,29 +63,27 @@ static AsyncMultiMap create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `multimaps.conf`. * * @param name The asynchronous multimap name. - * @param uri The asynchronous multimap member URI. * @param cluster The cluster configuration. * @param The multimap key type. * @param The multimap value type. * @return The asynchronous multimap. */ - static AsyncMultiMap create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new AsyncMultiMapConfig()); + static AsyncMultiMap create(String name, ClusterConfig cluster) { + return create(name, cluster, new AsyncMultiMapConfig(name)); } /** * Creates a new asynchronous multimap. * * @param name The asynchronous multimap name. - * @param uri The asynchronous multimap member URI. * @param cluster The cluster configuration. * @param config The multimap configuration. * @param The multimap key type. * @param The multimap value type. * @return The asynchronous multimap. */ - static AsyncMultiMap create(String name, String uri, ClusterConfig cluster, AsyncMultiMapConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static AsyncMultiMap create(String name, ClusterConfig cluster, AsyncMultiMapConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.>getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMapConfig.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMapConfig.java index 3bb279667a..ea3bebcec8 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMapConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncMultiMapConfig.java @@ -113,6 +113,7 @@ public AsyncMultiMapConfig withConsistency(Consistency consistency) { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncMultiMap.class); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncSet.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncSet.java index 5f0aa7c7b5..44df634802 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncSet.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncSet.java @@ -43,12 +43,11 @@ public interface AsyncSet extends AsyncCollection, T>, AsyncSetPr * configurations will be loaded according to namespaces as well; for example, `sets.conf`. * * @param name The asynchronous set name. - * @param uri The asynchronous set member URI. * @param The set data type. * @return The asynchronous set. */ - static AsyncSet create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new AsyncSetConfig()); + static AsyncSet create(String name) { + return create(name, new ClusterConfig(String.format("%s-cluster", name)), new AsyncSetConfig(name)); } /** @@ -61,27 +60,25 @@ static AsyncSet create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `sets.conf`. * * @param name The asynchronous set name. - * @param uri The asynchronous set member URI. * @param cluster The cluster configuration. * @param The set data type. * @return The asynchronous set. */ - static AsyncSet create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new AsyncSetConfig()); + static AsyncSet create(String name, ClusterConfig cluster) { + return create(name, cluster, new AsyncSetConfig(name)); } /** * Creates a new asynchronous set. * * @param name The asynchronous set name. - * @param uri The asynchronous set member URI. * @param cluster The cluster configuration. * @param config The set configuration. * @param The set data type. * @return The asynchronous set. */ - static AsyncSet create(String name, String uri, ClusterConfig cluster, AsyncSetConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static AsyncSet create(String name, ClusterConfig cluster, AsyncSetConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.>getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/collections/src/main/java/net/kuujo/copycat/collections/AsyncSetConfig.java b/collections/src/main/java/net/kuujo/copycat/collections/AsyncSetConfig.java index 6c087b2fb2..ad4bbb26bc 100644 --- a/collections/src/main/java/net/kuujo/copycat/collections/AsyncSetConfig.java +++ b/collections/src/main/java/net/kuujo/copycat/collections/AsyncSetConfig.java @@ -19,6 +19,7 @@ import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; import net.kuujo.copycat.collections.internal.collection.DefaultAsyncSet; import net.kuujo.copycat.state.StateLogConfig; +import net.kuujo.copycat.util.internal.Assert; import java.util.Map; @@ -54,6 +55,7 @@ public AsyncSetConfig copy() { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultAsyncSet.class); diff --git a/collections/src/test/java/net/kuujo/copycat/collections/AsyncMapTest.java b/collections/src/test/java/net/kuujo/copycat/collections/AsyncMapTest.java index 0a75760e93..3758077ecd 100644 --- a/collections/src/test/java/net/kuujo/copycat/collections/AsyncMapTest.java +++ b/collections/src/test/java/net/kuujo/copycat/collections/AsyncMapTest.java @@ -43,7 +43,7 @@ public void testAsyncMapPutGet() throws Throwable { .withPassiveMembers(2) .withUriFactory(id -> String.format("local://test%d", id)) .withClusterFactory(members -> new ClusterConfig().withProtocol(new LocalProtocol()).withMembers(members)) - .withResourceFactory((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withLog(new BufferedLog()))) + .withResourceFactory(config -> AsyncMap.create("test", config, new AsyncMapConfig().withLog(new BufferedLog()))) .build(); expectResume(); cluster.open().thenRun(this::resume); @@ -69,7 +69,7 @@ public void testAsyncMapPutRemove() throws Throwable { .withPassiveMembers(2) .withUriFactory(id -> String.format("local://test%d", id)) .withClusterFactory(members -> new ClusterConfig().withProtocol(new LocalProtocol()).withMembers(members)) - .withResourceFactory((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withLog(new BufferedLog()))) + .withResourceFactory(config -> AsyncMap.create("test", config, new AsyncMapConfig().withLog(new BufferedLog()))) .build(); expectResume(); cluster.open().thenRun(this::resume); @@ -100,7 +100,7 @@ public void testAsyncMapGetFromPassiveMember() throws Throwable { .withPassiveMembers(2) .withUriFactory(id -> String.format("local://test%d", id)) .withClusterFactory(members -> new ClusterConfig().withProtocol(new LocalProtocol()).withMembers(members)) - .withResourceFactory((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withConsistency(Consistency.WEAK).withLog(new BufferedLog()))) + .withResourceFactory(config -> AsyncMap.create("test", config, new AsyncMapConfig().withConsistency(Consistency.WEAK).withLog(new BufferedLog()))) .build(); expectResume(); @@ -132,7 +132,7 @@ public void testAsyncMapPutMany() throws Throwable { .withPassiveMembers(2) .withUriFactory(id -> String.format("local://test%d", id)) .withClusterFactory(members -> new ClusterConfig().withProtocol(new LocalProtocol()).withMembers(members)) - .withResourceFactory((uri, config) -> AsyncMap.create("test", uri, config, new AsyncMapConfig().withConsistency(Consistency.WEAK).withLog(new BufferedLog().withSegmentInterval(1024).withFlushOnWrite(true)))) + .withResourceFactory(config -> AsyncMap.create("test", config, new AsyncMapConfig().withConsistency(Consistency.WEAK).withLog(new BufferedLog().withSegmentInterval(1024).withFlushOnWrite(true)))) .build(); expectResume(); diff --git a/collections/src/test/java/net/kuujo/copycat/collections/AsyncSetTest.java b/collections/src/test/java/net/kuujo/copycat/collections/AsyncSetTest.java index 7333f443b7..7a7fe68e6b 100644 --- a/collections/src/test/java/net/kuujo/copycat/collections/AsyncSetTest.java +++ b/collections/src/test/java/net/kuujo/copycat/collections/AsyncSetTest.java @@ -39,7 +39,7 @@ public void testSetAddRemove() throws Throwable { .withPassiveMembers(2) .withUriFactory(id -> String.format("local://test%d", id)) .withClusterFactory(members -> new ClusterConfig().withProtocol(new LocalProtocol()).withMembers(members)) - .withResourceFactory((uri, config) -> AsyncSet.create("test", uri, config, new AsyncSetConfig().withLog(new BufferedLog()))) + .withResourceFactory(config -> AsyncSet.create("test", config, new AsyncSetConfig().withLog(new BufferedLog()))) .build(); expectResume(); diff --git a/core/src/main/java/net/kuujo/copycat/cluster/ClusterConfig.java b/core/src/main/java/net/kuujo/copycat/cluster/ClusterConfig.java index 359cb72b7f..11e0b15ab0 100644 --- a/core/src/main/java/net/kuujo/copycat/cluster/ClusterConfig.java +++ b/core/src/main/java/net/kuujo/copycat/cluster/ClusterConfig.java @@ -15,7 +15,6 @@ package net.kuujo.copycat.cluster; import com.typesafe.config.ConfigValueFactory; - import net.kuujo.copycat.protocol.Protocol; import net.kuujo.copycat.util.AbstractConfigurable; import net.kuujo.copycat.util.Configurable; @@ -45,6 +44,7 @@ * {@code * ClusterConfig cluster = new ClusterConfig() * .withProtocol(new VertxEventBusProtocol(vertx)) + * .withLocalMember("eventbus://foo") * .withMembers("eventbus://foo", "eventbus://bar", "eventbus://baz"); * } * @@ -55,6 +55,7 @@ public class ClusterConfig extends AbstractConfigurable { private static final String CLUSTER_PROTOCOL = "protocol"; private static final String CLUSTER_ELECTION_TIMEOUT = "election.timeout"; private static final String CLUSTER_HEARTBEAT_INTERVAL = "heartbeat.interval"; + private static final String CLUSTER_LOCAL_MEMBER = "local-member"; private static final String CLUSTER_MEMBERS = "members"; private static final String CONFIGURATION = "cluster"; @@ -222,6 +223,39 @@ public ClusterConfig withHeartbeatInterval(long heartbeatInterval, TimeUnit unit return this; } + /** + * Sets the local cluster member. + * + * @param uri The local cluster member. + */ + public void setLocalMember(String uri) { + if (uri != null) { + this.config = config.withValue(CLUSTER_LOCAL_MEMBER, ConfigValueFactory.fromAnyRef(uri)); + } else { + this.config = config.withoutPath(CLUSTER_LOCAL_MEMBER); + } + } + + /** + * Returns the local cluster member. + * + * @return The local cluster member or {@code null} if no local member is specified. + */ + public String getLocalMember() { + return config.hasPath(CLUSTER_LOCAL_MEMBER) ? config.getString(CLUSTER_LOCAL_MEMBER) : null; + } + + /** + * Sets the local member, returning the configuration for method chaining. + * + * @param uri The local cluster member URI. + * @return The cluster configuration. + */ + public ClusterConfig withLocalMember(String uri) { + setLocalMember(uri); + return this; + } + /** * Sets all cluster member URIs. * diff --git a/core/src/main/java/net/kuujo/copycat/cluster/internal/coordinator/DefaultClusterCoordinator.java b/core/src/main/java/net/kuujo/copycat/cluster/internal/coordinator/DefaultClusterCoordinator.java index 2409af3195..2967cc6f9f 100644 --- a/core/src/main/java/net/kuujo/copycat/cluster/internal/coordinator/DefaultClusterCoordinator.java +++ b/core/src/main/java/net/kuujo/copycat/cluster/internal/coordinator/DefaultClusterCoordinator.java @@ -46,7 +46,6 @@ * @author Jordan Halterman */ public class DefaultClusterCoordinator implements ClusterCoordinator { - private final String uri; private final ThreadFactory threadFactory = new NamedThreadFactory("copycat-coordinator-%d"); private final ScheduledExecutorService executor; private final CoordinatorConfig config; @@ -57,14 +56,13 @@ public class DefaultClusterCoordinator implements ClusterCoordinator { private final Map resources = new ConcurrentHashMap<>(); private volatile boolean open; - public DefaultClusterCoordinator(String uri, CoordinatorConfig config) { - this.uri = uri; + public DefaultClusterCoordinator(CoordinatorConfig config) { this.config = config.copy(); this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory); // Set up permanent cluster members based on the given cluster configuration. - this.localMember = new DefaultLocalMemberCoordinator(new MemberInfo(uri, config.getClusterConfig().getMembers().contains(uri) ? Member.Type.ACTIVE : Member.Type.PASSIVE, Member.State.ALIVE), config.getClusterConfig().getProtocol(), Executors.newSingleThreadExecutor(threadFactory)); - this.members.put(uri, localMember); + this.localMember = new DefaultLocalMemberCoordinator(new MemberInfo(config.getClusterConfig().getLocalMember(), config.getClusterConfig().getMembers().contains(config.getClusterConfig().getLocalMember()) ? Member.Type.ACTIVE : Member.Type.PASSIVE, Member.State.ALIVE), config.getClusterConfig().getProtocol(), Executors.newSingleThreadExecutor(threadFactory)); + this.members.put(config.getClusterConfig().getLocalMember(), localMember); for (String member : config.getClusterConfig().getMembers()) { if (!this.members.containsKey(member)) { this.members.put(member, new DefaultRemoteMemberCoordinator(new MemberInfo(member, Member.Type.ACTIVE, Member.State.ALIVE), config.getClusterConfig().getProtocol(), Executors.newSingleThreadScheduledExecutor(threadFactory))); @@ -78,7 +76,7 @@ public DefaultClusterCoordinator(String uri, CoordinatorConfig config) { .withReplicas(config.getClusterConfig().getMembers()) .withLog(new BufferedLog()); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("copycat-coordinator")); - this.context = new CopycatStateContext(config.getName(), uri, resourceConfig, executor); + this.context = new CopycatStateContext(config.getName(), config.getClusterConfig().getLocalMember(), resourceConfig, executor); this.cluster = new CoordinatorCluster(0, this, context, new ResourceRouter(executor), new KryoSerializer(), executor, config.getExecutor()); } @@ -129,7 +127,7 @@ public > T getResource(String name) { public > T getResource(String name, CoordinatedResourceConfig config) { ResourceHolder resource = resources.computeIfAbsent(name, n -> { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("copycat-" + name + "-%d")); - CopycatStateContext state = new CopycatStateContext(name, uri, config, executor); + CopycatStateContext state = new CopycatStateContext(name, member().uri(), config, executor); ClusterManager cluster = new CoordinatedCluster(name.hashCode(), this, state, new ResourceRouter(executor), config.getSerializer(), executor, config.getExecutor()); ResourceContext context = new DefaultResourceContext(name, config, cluster, state, this); try { @@ -233,7 +231,8 @@ public synchronized CompletableFuture close() { return closeResources() .thenComposeAsync(v -> context.close(), executor) .thenComposeAsync(v -> cluster.close(), executor) - .thenComposeAsync(v -> CompletableFuture.allOf(futures)); + .thenComposeAsync(v -> CompletableFuture.allOf(futures)) + .thenRun(executor::shutdown); } @Override diff --git a/core/src/main/java/net/kuujo/copycat/util/internal/Assert.java b/core/src/main/java/net/kuujo/copycat/util/internal/Assert.java index 6c107dc4a1..c4f67373eb 100644 --- a/core/src/main/java/net/kuujo/copycat/util/internal/Assert.java +++ b/core/src/main/java/net/kuujo/copycat/util/internal/Assert.java @@ -14,6 +14,8 @@ */ package net.kuujo.copycat.util.internal; +import net.kuujo.copycat.util.ConfigurationException; + /** * Argument assertions. * @@ -64,6 +66,22 @@ public static void state(boolean state, String message, Object... args) { } } + /** + * Validates that a configuration condition applies. + * + * @param value The resulting value to passthrough + * @param condition The condition to assert. + * @param message The failure exception message. + * @param args A list of message string formatting arguments. + * @throws IllegalArgumentException if {@code condition} is not true + */ + public static T config(T value, boolean condition, String message, Object... args) { + if (!condition) { + throw new ConfigurationException(String.format(message, args)); + } + return value; + } + /** * Validates that a condition applies. * diff --git a/core/src/main/resources/cluster-defaults.conf b/core/src/main/resources/cluster-defaults.conf index 2832c2610f..6ee3271c6f 100644 --- a/core/src/main/resources/cluster-defaults.conf +++ b/core/src/main/resources/cluster-defaults.conf @@ -26,6 +26,9 @@ election.timeout = 300 # Configures the global Raft heartbeat interval heartbeat.interval = 150 +# Configures the local member URI +# local-member: "tcp://123.456.789.0:1234" + # Configures the set of members in the cluster members: [ # "tcp://123.456.789.0:1234" diff --git a/event-log/src/main/java/net/kuujo/copycat/event/EventLog.java b/event-log/src/main/java/net/kuujo/copycat/event/EventLog.java index 7184b48dba..e9fac4de43 100644 --- a/event-log/src/main/java/net/kuujo/copycat/event/EventLog.java +++ b/event-log/src/main/java/net/kuujo/copycat/event/EventLog.java @@ -44,11 +44,10 @@ public interface EventLog extends Resource> { * configurations will be loaded according to namespaces as well; for example, `event-logs.conf`. * * @param name The log resource name. - * @param uri The local log member URI. * @return A new event log instance. */ - static EventLog create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new EventLogConfig()); + static EventLog create(String name) { + return create(name, new ClusterConfig(), new EventLogConfig()); } /** @@ -61,25 +60,23 @@ static EventLog create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `event-logs.conf`. * * @param name The log resource name. - * @param uri The local log member URI. * @param cluster The event log cluster. * @return A new event log instance. */ - static EventLog create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new EventLogConfig()); + static EventLog create(String name, ClusterConfig cluster) { + return create(name, cluster, new EventLogConfig()); } /** * Creates a new event log with the given cluster and event log configurations. * * @param name The log name. - * @param uri The local log member URI. * @param cluster The event log cluster. * @param config The event log configuration. * @return A new event log instance. */ - static EventLog create(String name, String uri, ClusterConfig cluster, EventLogConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static EventLog create(String name, ClusterConfig cluster, EventLogConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.>getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/event-log/src/main/java/net/kuujo/copycat/event/EventLogConfig.java b/event-log/src/main/java/net/kuujo/copycat/event/EventLogConfig.java index 72cfca0c2e..15625e636a 100644 --- a/event-log/src/main/java/net/kuujo/copycat/event/EventLogConfig.java +++ b/event-log/src/main/java/net/kuujo/copycat/event/EventLogConfig.java @@ -124,6 +124,7 @@ public EventLogConfig withRetentionPolicy(RetentionPolicy retentionPolicy) { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new CoordinatedResourceConfig(super.toMap()) .withElectionTimeout(getElectionTimeout()) .withHeartbeatInterval(getHeartbeatInterval()) diff --git a/event-log/src/test/java/net/kuujo/copycat/event/EventLogTest.java b/event-log/src/test/java/net/kuujo/copycat/event/EventLogTest.java index c0fd574e8e..d66d04d031 100644 --- a/event-log/src/test/java/net/kuujo/copycat/event/EventLogTest.java +++ b/event-log/src/test/java/net/kuujo/copycat/event/EventLogTest.java @@ -39,7 +39,7 @@ public void testPassiveEvents() throws Throwable { .withPassiveMembers(2) .withUriFactory(id -> String.format("local://test%d", id)) .withClusterFactory(members -> new ClusterConfig().withProtocol(new LocalProtocol()).withMembers(members)) - .withResourceFactory((uri, config) -> EventLog.create("test", uri, config, new EventLogConfig().withLog(new BufferedLog()))) + .withResourceFactory(config -> EventLog.create("test", config, new EventLogConfig().withLog(new BufferedLog()))) .build(); expectResume(); cluster.open().thenRun(this::resume); diff --git a/examples/vertx-election/src/main/java/net/kuujo/copycat/example/election/LeaderElectingVerticle.java b/examples/vertx-election/src/main/java/net/kuujo/copycat/example/election/LeaderElectingVerticle.java index 7faa068a6d..a5ebe6ec0e 100644 --- a/examples/vertx-election/src/main/java/net/kuujo/copycat/example/election/LeaderElectingVerticle.java +++ b/examples/vertx-election/src/main/java/net/kuujo/copycat/example/election/LeaderElectingVerticle.java @@ -15,21 +15,20 @@ */ package net.kuujo.copycat.example.election; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.election.LeaderElection; import net.kuujo.copycat.election.LeaderElectionConfig; import net.kuujo.copycat.vertx.VertxEventBusProtocol; import net.kuujo.copycat.vertx.VertxEventLoopExecutor; - import org.vertx.java.core.Future; import org.vertx.java.core.json.JsonArray; import org.vertx.java.platform.Verticle; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + /** * Example verticle that performs an in-memory leader election. * @@ -49,6 +48,7 @@ public void start(final Future startResult) { // Because Copycat is a CP framework, we have to explicitly list all of the nodes in the cluster. ClusterConfig cluster = new ClusterConfig() .withProtocol(new VertxEventBusProtocol(vertx)) + .withLocalMember(String.format("eventbus://%s", address)) .withMembers(((List) members.toList()).stream() .map(member -> String.format("eventbus://%s", member)).collect(Collectors.toList())); @@ -57,7 +57,7 @@ public void start(final Future startResult) { .withExecutor(new VertxEventLoopExecutor(vertx)); // Create and open the leader election using the constructed cluster configuration. - LeaderElection.create("election", String.format("eventbus://%s", address), cluster, config).open().whenComplete((election, error) -> { + LeaderElection.create("election", cluster, config).open().whenComplete((election, error) -> { // Since we configured the election with a Vert.x event loop executor, CompletableFuture callbacks are executed // on the Vert.x event loop, so we don't have to use runOnContext. if (error != null) { diff --git a/examples/vertx-event-log/src/main/java/net/kuujo/copycat/example/eventlog/EventLogVerticle.java b/examples/vertx-event-log/src/main/java/net/kuujo/copycat/example/eventlog/EventLogVerticle.java index d6402a56f9..e7675f71fe 100644 --- a/examples/vertx-event-log/src/main/java/net/kuujo/copycat/example/eventlog/EventLogVerticle.java +++ b/examples/vertx-event-log/src/main/java/net/kuujo/copycat/example/eventlog/EventLogVerticle.java @@ -118,6 +118,7 @@ private void startEventLog(Handler> doneHandler) { // Because Copycat is a CP framework, we have to explicitly list all of the nodes in the cluster. ClusterConfig cluster = new ClusterConfig() .withProtocol(new VertxEventBusProtocol(vertx)) + .withLocalMember(String.format("eventbus://%s", id)) .withMembers(((List) members.toList()).stream().map(member -> String.format("eventbus://%s", member)).collect(Collectors.toList())); // Configure Copycat with the event bus cluster and Vert.x event loop executor. @@ -137,7 +138,7 @@ private void startEventLog(Handler> doneHandler) { // Create and open a new Copycat instance. The Copycat instance controls the cluster of verticles and manages // resources within the cluster - in this case just a single event log. - copycat = Copycat.create(String.format("eventbus://%s", id), config); + copycat = Copycat.create(config); // Once we create the Copycat instance, it needs to be opened. When the instance is opened, Copycat will begin // communicating with other nodes in the cluster and elect a leader that will control resources within the cluster. diff --git a/examples/vertx-lock-service/src/main/java/net/kuujo/copycat/example/lockservice/LockServiceVerticle.java b/examples/vertx-lock-service/src/main/java/net/kuujo/copycat/example/lockservice/LockServiceVerticle.java index b46d2d2ee2..a419b4adfe 100644 --- a/examples/vertx-lock-service/src/main/java/net/kuujo/copycat/example/lockservice/LockServiceVerticle.java +++ b/examples/vertx-lock-service/src/main/java/net/kuujo/copycat/example/lockservice/LockServiceVerticle.java @@ -68,6 +68,7 @@ public void start(final Future startResult) { // Because Copycat is a CP framework, we have to explicitly list all of the nodes in the cluster. ClusterConfig cluster = new ClusterConfig() .withProtocol(new VertxEventBusProtocol(vertx)) + .withLocalMember(String.format("eventbus://%s", id)) .withMembers(((List) members.toList()).stream() .map(member -> String.format("eventbus://%s", member)).collect(Collectors.toList())); @@ -89,7 +90,7 @@ public void start(final Future startResult) { // Create and open a new Copycat instance. The Copycat instance controls the cluster of verticles and manages // resources within the cluster - in this case just a single state machine. - copycat = Copycat.create(String.format("eventbus://%s", id), config); + copycat = Copycat.create(config); // Once we create the Copycat instance, it needs to be opened. When the instance is opened, Copycat will begin // communicating with other nodes in the cluster and elect a leader that will control resources within the cluster. diff --git a/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElection.java b/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElection.java index 1426fdb19a..0c52a9c529 100644 --- a/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElection.java +++ b/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElection.java @@ -16,12 +16,12 @@ package net.kuujo.copycat.election; import net.kuujo.copycat.EventListener; -import net.kuujo.copycat.resource.Resource; import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.cluster.Member; import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig; import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator; +import net.kuujo.copycat.resource.Resource; /** * Leader election. @@ -44,11 +44,10 @@ public interface LeaderElection extends Resource { * configurations will be loaded according to namespaces as well; for example, `elections.conf`. * * @param name The election name. - * @param uri The election member URI. * @return The leader election. */ - static LeaderElection create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new LeaderElectionConfig(name)); + static LeaderElection create(String name) { + return create(name, new ClusterConfig(), new LeaderElectionConfig(name)); } /** @@ -61,25 +60,23 @@ static LeaderElection create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `elections.conf`. * * @param name The election name. - * @param uri The election member URI. * @param cluster The Copycat cluster. * @return The leader election. */ - static LeaderElection create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new LeaderElectionConfig(name)); + static LeaderElection create(String name, ClusterConfig cluster) { + return create(name, cluster, new LeaderElectionConfig(name)); } /** * Creates a new leader election with the given cluster and election configurations. * * @param name The election name. - * @param uri The election member URI. * @param cluster The Copycat cluster. * @param config The leader election configuration. * @return The leader election. */ - static LeaderElection create(String name, String uri, ClusterConfig cluster, LeaderElectionConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static LeaderElection create(String name, ClusterConfig cluster, LeaderElectionConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElectionConfig.java b/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElectionConfig.java index 9e32b6b623..af467e092c 100644 --- a/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElectionConfig.java +++ b/leader-election/src/main/java/net/kuujo/copycat/election/LeaderElectionConfig.java @@ -19,6 +19,7 @@ import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; import net.kuujo.copycat.election.internal.DefaultLeaderElection; import net.kuujo.copycat.resource.ResourceConfig; +import net.kuujo.copycat.util.internal.Assert; import java.util.Map; @@ -54,6 +55,7 @@ public LeaderElectionConfig copy() { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new CoordinatedResourceConfig(super.toMap()) .withElectionTimeout(getElectionTimeout()) .withHeartbeatInterval(getHeartbeatInterval()) diff --git a/leader-election/src/test/java/net/kuujo/copycat/election/LeaderElectionTest.java b/leader-election/src/test/java/net/kuujo/copycat/election/LeaderElectionTest.java index 02d4921891..feeee85113 100644 --- a/leader-election/src/test/java/net/kuujo/copycat/election/LeaderElectionTest.java +++ b/leader-election/src/test/java/net/kuujo/copycat/election/LeaderElectionTest.java @@ -38,9 +38,9 @@ public void testLeaderElectionAsSeedMember() throws Exception { .withProtocol(new LocalProtocol()) .withMembers("local://foo", "local://bar", "local://baz"); - LeaderElection election1 = LeaderElection.create("test", "local://foo", cluster); - LeaderElection election2 = LeaderElection.create("test", "local://bar", cluster); - LeaderElection election3 = LeaderElection.create("test", "local://baz", cluster); + LeaderElection election1 = LeaderElection.create("test", cluster.copy().withLocalMember("local://foo")); + LeaderElection election2 = LeaderElection.create("test", cluster.copy().withLocalMember("local://bar")); + LeaderElection election3 = LeaderElection.create("test", cluster.copy().withLocalMember("local://baz")); CountDownLatch latch = new CountDownLatch(3); diff --git a/state-log/src/main/java/net/kuujo/copycat/state/StateLog.java b/state-log/src/main/java/net/kuujo/copycat/state/StateLog.java index 8c71c82684..f78917fb78 100644 --- a/state-log/src/main/java/net/kuujo/copycat/state/StateLog.java +++ b/state-log/src/main/java/net/kuujo/copycat/state/StateLog.java @@ -48,12 +48,11 @@ public interface StateLog extends Resource> { * configurations will be loaded according to namespaces as well; for example, `state-logs.conf`. * * @param name The state log resource name. - * @param uri The local member URI. * @param The state log entry type. * @return A new state log instance. */ - static StateLog create(String name, String uri) { - return create(name, uri, new ClusterConfig(), new StateLogConfig()); + static StateLog create(String name) { + return create(name, new ClusterConfig(), new StateLogConfig()); } /** @@ -66,27 +65,25 @@ static StateLog create(String name, String uri) { * configurations will be loaded according to namespaces as well; for example, `state-logs.conf`. * * @param name The state log resource name. - * @param uri The local member URI. * @param cluster The state log cluster configuration. * @param The state log entry type. * @return A new state log instance. */ - static StateLog create(String name, String uri, ClusterConfig cluster) { - return create(name, uri, cluster, new StateLogConfig()); + static StateLog create(String name, ClusterConfig cluster) { + return create(name, cluster, new StateLogConfig()); } /** * Creates a new state log. * * @param name The state log resource name. - * @param uri The local member URI. * @param cluster The state log cluster configuration. * @param config The state log configuration. * @param The state log entry type. * @return A new state log instance. */ - static StateLog create(String name, String uri, ClusterConfig cluster, StateLogConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static StateLog create(String name, ClusterConfig cluster, StateLogConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.>getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/state-log/src/main/java/net/kuujo/copycat/state/StateLogConfig.java b/state-log/src/main/java/net/kuujo/copycat/state/StateLogConfig.java index 902be771e7..596ca3a081 100644 --- a/state-log/src/main/java/net/kuujo/copycat/state/StateLogConfig.java +++ b/state-log/src/main/java/net/kuujo/copycat/state/StateLogConfig.java @@ -122,6 +122,7 @@ public StateLogConfig withDefaultConsistency(Consistency consistency) { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new CoordinatedResourceConfig(super.toMap()) .withElectionTimeout(getElectionTimeout()) .withHeartbeatInterval(getHeartbeatInterval()) diff --git a/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java b/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java index 68d676b0ce..3644429f35 100644 --- a/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java +++ b/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java @@ -42,13 +42,12 @@ public interface StateMachine extends Resource> { * configurations will be loaded according to namespaces as well; for example, `state-machines.conf`. * * @param name The state machine resource name. - * @param uri The state machine member URI. * @param stateType The state machine state type. * @param initialState The state machine state. * @return The state machine. */ - static StateMachine create(String name, String uri, Class stateType, Class initialState) { - return create(name, uri, new ClusterConfig(), new StateMachineConfig().withStateType(stateType).withInitialState(initialState)); + static StateMachine create(String name, Class stateType, Class initialState) { + return create(name, new ClusterConfig(), new StateMachineConfig().withStateType(stateType).withInitialState(initialState)); } /** @@ -61,27 +60,25 @@ static StateMachine create(String name, String uri, Class stateType, C * configurations will be loaded according to namespaces as well; for example, `state-machines.conf`. * * @param name The state machine resource name. - * @param uri The state machine member URI. * @param stateType The state machine state type. * @param initialState The state machine state. * @param cluster The state machine cluster configuration. * @return The state machine. */ - static StateMachine create(String name, String uri, Class stateType, Class initialState, ClusterConfig cluster) { - return create(name, uri, cluster, new StateMachineConfig().withStateType(stateType).withInitialState(initialState)); + static StateMachine create(String name, Class stateType, Class initialState, ClusterConfig cluster) { + return create(name, cluster, new StateMachineConfig().withStateType(stateType).withInitialState(initialState)); } /** * Creates a new state machine. * * @param name The state machine resource name. - * @param uri The state machine member URI. * @param cluster The state machine cluster configuration. * @param config The state machine configuration. * @return The state machine. */ - static StateMachine create(String name, String uri, ClusterConfig cluster, StateMachineConfig config) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName(name).withClusterConfig(cluster)); + static StateMachine create(String name, ClusterConfig cluster, StateMachineConfig config) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); return coordinator.>getResource(name, config.resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java b/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java index 8f2277bc5b..35cc5e5fab 100644 --- a/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java +++ b/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java @@ -303,6 +303,7 @@ public StateMachineConfig withDefaultConsistency(Consistency consistency) { @Override public CoordinatedResourceConfig resolve(ClusterConfig cluster) { + Assert.config(getReplicas(), getReplicas().isEmpty() || cluster.getMembers().containsAll(getReplicas()), "Resource replica set must contain only active cluster members"); return new StateLogConfig(toMap()) .resolve(cluster) .withResourceType(DefaultStateMachine.class) diff --git a/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java b/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java index 15c992a269..c3864a1b75 100644 --- a/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java +++ b/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java @@ -55,8 +55,8 @@ public abstract class ProtocolTest extends ConcurrentTestCase { /** * Creates a new test resource. */ - private TestResource createTestResource(String uri, ClusterConfig cluster) { - ClusterCoordinator coordinator = new DefaultClusterCoordinator(uri, new CoordinatorConfig().withName("test").withClusterConfig(cluster)); + private TestResource createTestResource(ClusterConfig cluster) { + ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName("test").withClusterConfig(cluster)); return coordinator.getResource("test", new TestResource.Config().withLog(new BufferedLog()).resolve(cluster)) .addStartupTask(() -> coordinator.open().thenApply(v -> null)) .addShutdownTask(coordinator::close); diff --git a/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java b/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java index 19112e8d70..3809c0b89c 100644 --- a/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java +++ b/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java @@ -21,7 +21,6 @@ import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; import java.util.function.Function; /** @@ -53,7 +52,7 @@ public static class Builder> { private int passiveMembers = 2; private Function uriFactory; private Function, ClusterConfig> clusterFactory; - private BiFunction resourceFactory; + private Function resourceFactory; /** * Sets the number of active members for the cluster. @@ -90,7 +89,7 @@ public Builder withClusterFactory(Function, ClusterConfig> /** * Sets the resource factory. */ - public Builder withResourceFactory(BiFunction resourceFactory) { + public Builder withResourceFactory(Function resourceFactory) { this.resourceFactory = resourceFactory; return this; } @@ -111,15 +110,15 @@ public TestCluster build() { } for (String member : members) { - ClusterConfig cluster = clusterFactory.apply(members); - activeResources.add(resourceFactory.apply(member, cluster)); + ClusterConfig cluster = clusterFactory.apply(members).withLocalMember(member); + activeResources.add(resourceFactory.apply(cluster)); } List passiveResources = new ArrayList<>(passiveMembers); while (i <= passiveMembers + activeMembers) { - String uri = uriFactory.apply(++i); - ClusterConfig cluster = clusterFactory.apply(members); - passiveResources.add(resourceFactory.apply(uri, cluster)); + String member = uriFactory.apply(++i); + ClusterConfig cluster = clusterFactory.apply(members).withLocalMember(member); + passiveResources.add(resourceFactory.apply(cluster)); } return new TestCluster(activeResources, passiveResources); }