diff --git a/collections/src/main/java/io/atomix/collections/DistributedMap.java b/collections/src/main/java/io/atomix/collections/DistributedMap.java index 4de9cdf461..811945cac5 100644 --- a/collections/src/main/java/io/atomix/collections/DistributedMap.java +++ b/collections/src/main/java/io/atomix/collections/DistributedMap.java @@ -47,7 +47,25 @@ * @author Jordan Halterman */ @ResourceTypeInfo(id=-11, stateMachine=MapState.class, typeResolver=MapCommands.TypeResolver.class) -public class DistributedMap extends Resource, Resource.Options> { +public class DistributedMap extends Resource> { + + /** + * Returns new map options. + * + * @return New map options. + */ + public static Options options() { + return new Options(); + } + + /** + * Returns a new map configuration. + * + * @return A new map configuration. + */ + public static Config config() { + return new Config(); + } public DistributedMap(CopycatClient client, Resource.Options options) { super(client, options); diff --git a/collections/src/main/java/io/atomix/collections/DistributedMultiMap.java b/collections/src/main/java/io/atomix/collections/DistributedMultiMap.java index aee860923c..49c7acc7b6 100644 --- a/collections/src/main/java/io/atomix/collections/DistributedMultiMap.java +++ b/collections/src/main/java/io/atomix/collections/DistributedMultiMap.java @@ -31,7 +31,25 @@ * @author Jordan Halterman */ @ResourceTypeInfo(id=-14, stateMachine=QueueState.class, typeResolver=QueueCommands.TypeResolver.class) -public class DistributedQueue extends Resource, Resource.Options> { +public class DistributedQueue extends Resource> { + + /** + * Returns new queue options. + * + * @return New queue options. + */ + public static Options options() { + return new Options(); + } + + /** + * Returns a new queue configuration. + * + * @return A new queue configuration. + */ + public static Config config() { + return new Config(); + } public DistributedQueue(CopycatClient client, Resource.Options options) { super(client, options); diff --git a/collections/src/main/java/io/atomix/collections/DistributedSet.java b/collections/src/main/java/io/atomix/collections/DistributedSet.java index 07cebda61f..a901797bd4 100644 --- a/collections/src/main/java/io/atomix/collections/DistributedSet.java +++ b/collections/src/main/java/io/atomix/collections/DistributedSet.java @@ -31,7 +31,25 @@ * @author Jordan Halterman */ @ResourceTypeInfo(id=-13, stateMachine=SetState.class, typeResolver=SetCommands.TypeResolver.class) -public class DistributedSet extends Resource, Resource.Options> { +public class DistributedSet extends Resource> { + + /** + * Returns new set options. + * + * @return New set options. + */ + public static Options options() { + return new Options(); + } + + /** + * Returns a new set configuration. + * + * @return A new set configuration. + */ + public static Config config() { + return new Config(); + } public DistributedSet(CopycatClient client, Resource.Options options) { super(client, options); diff --git a/coordination/src/main/java/io/atomix/coordination/DistributedGroup.java b/coordination/src/main/java/io/atomix/coordination/DistributedGroup.java index deabe88eb6..58bba668fa 100644 --- a/coordination/src/main/java/io/atomix/coordination/DistributedGroup.java +++ b/coordination/src/main/java/io/atomix/coordination/DistributedGroup.java @@ -92,7 +92,26 @@ * @author Jordan Halterman */ @ResourceTypeInfo(id=-22, stateMachine=LockState.class, typeResolver=LockCommands.TypeResolver.class) -public class DistributedLock extends Resource { +public class DistributedLock extends Resource { + + /** + * Returns new lock options. + * + * @return New lock options. + */ + public static Options options() { + return new Options(); + } + + /** + * Returns a new lock configuration. + * + * @return A new lock configuration. + */ + public static Config config() { + return new Config(); + } + private final Queue> queue = new ConcurrentLinkedQueue<>(); public DistributedLock(CopycatClient client, Resource.Options options) { diff --git a/core/src/main/java/io/atomix/Atomix.java b/core/src/main/java/io/atomix/Atomix.java index daa8f4f152..c966e185c5 100644 --- a/core/src/main/java/io/atomix/Atomix.java +++ b/core/src/main/java/io/atomix/Atomix.java @@ -22,8 +22,8 @@ import io.atomix.collections.DistributedMultiMap; import io.atomix.collections.DistributedQueue; import io.atomix.collections.DistributedSet; -import io.atomix.coordination.DistributedLock; import io.atomix.coordination.DistributedGroup; +import io.atomix.coordination.DistributedLock; import io.atomix.manager.ResourceClient; import io.atomix.manager.ResourceManager; import io.atomix.messaging.DistributedMessageBus; @@ -58,8 +58,16 @@ protected Atomix(ResourceClient client) { this.client = Assert.notNull(client, "client"); } + @Override + public ThreadContext context() { + return client.context(); + } + /** * Returns the Atomix serializer. + *

+ * Serializable types registered on the returned serializer are reflected throughout the system. Types + * registered on a client must also be registered on a server to be transported across the wire. * * @return The Atomix serializer. */ @@ -79,6 +87,46 @@ public CompletableFuture> getMap(String key) { return get(key, DistributedMap.class); } + /** + * Gets or creates a distributed map. + * + * @param key The resource key. + * @param options The map options. + * @param The map key type. + * @param The map value type. + * @return A completable future to be completed once the map has been created. + */ + public CompletableFuture> getMap(String key, DistributedMap.Options options) { + return get(key, DistributedMap.class, options); + } + + /** + * Gets or creates a distributed map. + * + * @param key The resource key. + * @param config The map configuration. + * @param The map key type. + * @param The map value type. + * @return A completable future to be completed once the map has been created. + */ + public CompletableFuture> getMap(String key, DistributedMap.Config config) { + return get(key, DistributedMap.class, config); + } + + /** + * Gets or creates a distributed map. + * + * @param key The resource key. + * @param config The map configuration. + * @param options The map options. + * @param The map key type. + * @param The map value type. + * @return A completable future to be completed once the map has been created. + */ + public CompletableFuture> getMap(String key, DistributedMap.Config config, DistributedMap.Options options) { + return get(key, DistributedMap.class, config, options); + } + /** * Gets or creates a distributed multi map. * @@ -91,6 +139,42 @@ public CompletableFuture> getMultiMap(String ke return get(key, DistributedMultiMap.class); } + /** + * Gets or creates a distributed multi map. + * + * @param key The resource key. + * @param The multi map key type. + * @param The multi map value type. + * @return A completable future to be completed once the multi map has been created. + */ + public CompletableFuture> getMultiMap(String key, DistributedMultiMap.Config config) { + return get(key, DistributedMultiMap.class, config); + } + + /** + * Gets or creates a distributed multi map. + * + * @param key The resource key. + * @param The multi map key type. + * @param The multi map value type. + * @return A completable future to be completed once the multi map has been created. + */ + public CompletableFuture> getMultiMap(String key, DistributedMultiMap.Options options) { + return get(key, DistributedMultiMap.class, options); + } + + /** + * Gets or creates a distributed multi map. + * + * @param key The resource key. + * @param The multi map key type. + * @param The multi map value type. + * @return A completable future to be completed once the multi map has been created. + */ + public CompletableFuture> getMultiMap(String key, DistributedMultiMap.Config config, DistributedMultiMap.Options options) { + return get(key, DistributedMultiMap.class, config, options); + } + /** * Gets or creates a distributed set. * @@ -102,6 +186,39 @@ public CompletableFuture> getSet(String key) { return get(key, DistributedSet.class); } + /** + * Gets or creates a distributed set. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the set has been created. + */ + public CompletableFuture> getSet(String key, DistributedSet.Config config) { + return get(key, DistributedSet.class, config); + } + + /** + * Gets or creates a distributed set. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the set has been created. + */ + public CompletableFuture> getSet(String key, DistributedSet.Options options) { + return get(key, DistributedSet.class, options); + } + + /** + * Gets or creates a distributed set. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the set has been created. + */ + public CompletableFuture> getSet(String key, DistributedSet.Config config, DistributedSet.Options options) { + return get(key, DistributedSet.class, config, options); + } + /** * Gets or creates a distributed queue. * @@ -113,6 +230,39 @@ public CompletableFuture> getQueue(String key) { return get(key, DistributedQueue.class); } + /** + * Gets or creates a distributed queue. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the queue has been created. + */ + public CompletableFuture> getQueue(String key, DistributedQueue.Config config) { + return get(key, DistributedQueue.class, config); + } + + /** + * Gets or creates a distributed queue. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the queue has been created. + */ + public CompletableFuture> getQueue(String key, DistributedQueue.Options options) { + return get(key, DistributedQueue.class, options); + } + + /** + * Gets or creates a distributed queue. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the queue has been created. + */ + public CompletableFuture> getQueue(String key, DistributedQueue.Config config, DistributedQueue.Options options) { + return get(key, DistributedQueue.class, config, options); + } + /** * Gets or creates a distributed value. * @@ -124,6 +274,39 @@ public CompletableFuture> getValue(String key) { return get(key, DistributedValue.class); } + /** + * Gets or creates a distributed value. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the value has been created. + */ + public CompletableFuture> getValue(String key, DistributedValue.Config config) { + return get(key, DistributedValue.class, config); + } + + /** + * Gets or creates a distributed value. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the value has been created. + */ + public CompletableFuture> getValue(String key, DistributedValue.Options options) { + return get(key, DistributedValue.class, options); + } + + /** + * Gets or creates a distributed value. + * + * @param key The resource key. + * @param The value type. + * @return A completable future to be completed once the value has been created. + */ + public CompletableFuture> getValue(String key, DistributedValue.Config config, DistributedValue.Options options) { + return get(key, DistributedValue.class, config, options); + } + /** * Gets or creates a distributed long. * @@ -134,6 +317,36 @@ public CompletableFuture getLong(String key) { return get(key, DistributedLong.class); } + /** + * Gets or creates a distributed long. + * + * @param key The resource key. + * @return A completable future to be completed once the long has been created. + */ + public CompletableFuture getLong(String key, DistributedLong.Config config) { + return get(key, DistributedLong.class, config); + } + + /** + * Gets or creates a distributed long. + * + * @param key The resource key. + * @return A completable future to be completed once the long has been created. + */ + public CompletableFuture getLong(String key, DistributedLong.Options options) { + return get(key, DistributedLong.class, options); + } + + /** + * Gets or creates a distributed long. + * + * @param key The resource key. + * @return A completable future to be completed once the long has been created. + */ + public CompletableFuture getLong(String key, DistributedLong.Config config, DistributedLong.Options options) { + return get(key, DistributedLong.class, config, options); + } + /** * Gets or creates a distributed lock. * @@ -144,6 +357,36 @@ public CompletableFuture getLock(String key) { return get(key, DistributedLock.class); } + /** + * Gets or creates a distributed lock. + * + * @param key The resource key. + * @return A completable future to be completed once the lock has been created. + */ + public CompletableFuture getLock(String key, DistributedLock.Config config) { + return get(key, DistributedLock.class, config); + } + + /** + * Gets or creates a distributed lock. + * + * @param key The resource key. + * @return A completable future to be completed once the lock has been created. + */ + public CompletableFuture getLock(String key, DistributedLock.Options options) { + return get(key, DistributedLock.class, options); + } + + /** + * Gets or creates a distributed lock. + * + * @param key The resource key. + * @return A completable future to be completed once the lock has been created. + */ + public CompletableFuture getLock(String key, DistributedLock.Config config, DistributedLock.Options options) { + return get(key, DistributedLock.class, config, options); + } + /** * Gets or creates a distributed group. * @@ -154,6 +397,36 @@ public CompletableFuture getGroup(String key) { return get(key, DistributedGroup.class); } + /** + * Gets or creates a distributed group. + * + * @param key The resource key. + * @return A completable future to be completed once the group has been created. + */ + public CompletableFuture getGroup(String key, DistributedGroup.Config config) { + return get(key, DistributedGroup.class, config); + } + + /** + * Gets or creates a distributed group. + * + * @param key The resource key. + * @return A completable future to be completed once the group has been created. + */ + public CompletableFuture getGroup(String key, DistributedGroup.Options options) { + return get(key, DistributedGroup.class, options); + } + + /** + * Gets or creates a distributed group. + * + * @param key The resource key. + * @return A completable future to be completed once the group has been created. + */ + public CompletableFuture getGroup(String key, DistributedGroup.Config config, DistributedGroup.Options options) { + return get(key, DistributedGroup.class, config, options); + } + /** * Gets or creates a distributed topic. * @@ -165,6 +438,39 @@ public CompletableFuture> getTopic(String key) { return get(key, DistributedTopic.class); } + /** + * Gets or creates a distributed topic. + * + * @param key The resource key. + * @param The topic message type. + * @return A completable future to be completed once the topic has been created. + */ + public CompletableFuture> getTopic(String key, DistributedTopic.Config config) { + return get(key, DistributedTopic.class, config); + } + + /** + * Gets or creates a distributed topic. + * + * @param key The resource key. + * @param The topic message type. + * @return A completable future to be completed once the topic has been created. + */ + public CompletableFuture> getTopic(String key, DistributedTopic.Options options) { + return get(key, DistributedTopic.class, options); + } + + /** + * Gets or creates a distributed topic. + * + * @param key The resource key. + * @param The topic message type. + * @return A completable future to be completed once the topic has been created. + */ + public CompletableFuture> getTopic(String key, DistributedTopic.Config config, DistributedTopic.Options options) { + return get(key, DistributedTopic.class, config, options); + } + /** * Gets or creates a distributed queue. * @@ -176,6 +482,39 @@ public CompletableFuture> getTaskQueue(String key) { return get(key, DistributedTaskQueue.class); } + /** + * Gets or creates a distributed queue. + * + * @param key The resource key. + * @param The queue message type. + * @return A completable future to be completed once the queue has been created. + */ + public CompletableFuture> getTaskQueue(String key, DistributedTaskQueue.Config config) { + return get(key, DistributedTaskQueue.class, config); + } + + /** + * Gets or creates a distributed queue. + * + * @param key The resource key. + * @param The queue message type. + * @return A completable future to be completed once the queue has been created. + */ + public CompletableFuture> getTaskQueue(String key, DistributedTaskQueue.Options options) { + return get(key, DistributedTaskQueue.class, options); + } + + /** + * Gets or creates a distributed queue. + * + * @param key The resource key. + * @param The queue message type. + * @return A completable future to be completed once the queue has been created. + */ + public CompletableFuture> getTaskQueue(String key, DistributedTaskQueue.Config config, DistributedTaskQueue.Options options) { + return get(key, DistributedTaskQueue.class, config, options); + } + /** * Gets or creates a distributed message bus. * @@ -186,13 +525,38 @@ public CompletableFuture getMessageBus(String key) { return get(key, DistributedMessageBus.class); } - @Override - public ThreadContext context() { - return client.context(); + /** + * Gets or creates a distributed message bus. + * + * @param key The resource key. + * @return A completable future to be completed once the message bus has been created. + */ + public CompletableFuture getMessageBus(String key, DistributedMessageBus.Config config) { + return get(key, DistributedMessageBus.class, config); + } + + /** + * Gets or creates a distributed message bus. + * + * @param key The resource key. + * @return A completable future to be completed once the message bus has been created. + */ + public CompletableFuture getMessageBus(String key, DistributedMessageBus.Options options) { + return get(key, DistributedMessageBus.class, options); + } + + /** + * Gets or creates a distributed message bus. + * + * @param key The resource key. + * @return A completable future to be completed once the message bus has been created. + */ + public CompletableFuture getMessageBus(String key, DistributedMessageBus.Config config, DistributedMessageBus.Options options) { + return get(key, DistributedMessageBus.class, config, options); } @Override - public ResourceType type(Class> type) { + public ResourceType type(Class> type) { return client.type(type); } @@ -227,25 +591,49 @@ private Set cleanKeys(Set keys) { @Override public CompletableFuture get(String key, Class type) { Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); - return client.get(key, type); + return client.get(key, type, null, null); + } + + @Override + public CompletableFuture get(String key, Class type, Resource.Config config) { + Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); + return client.get(key, type, config, null); } @Override - public , U extends Resource.Options> CompletableFuture get(String key, Class type, U options) { + public CompletableFuture get(String key, Class type, Resource.Options options) { Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); - return client.get(key, type, options); + return client.get(key, type, null, options); + } + + @Override + public CompletableFuture get(String key, Class type, Resource.Config config, Resource.Options options) { + Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); + return client.get(key, type, config, options); } @Override public CompletableFuture get(String key, ResourceType type) { Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); - return client.get(key, type); + return client.get(key, type, null, null); + } + + @Override + public CompletableFuture get(String key, ResourceType type, Resource.Config config) { + Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); + return client.get(key, type, config, null); + } + + @Override + public CompletableFuture get(String key, ResourceType type, Resource.Options options) { + Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); + return client.get(key, type, null, options); } @Override - public , U extends Resource.Options> CompletableFuture get(String key, ResourceType type, U options) { + public CompletableFuture get(String key, ResourceType type, Resource.Config config, Resource.Options options) { Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length"); - return client.get(key, type, options); + return client.get(key, type, config, options); } @Override diff --git a/core/src/main/java/io/atomix/AtomixProperties.java b/core/src/main/java/io/atomix/AtomixProperties.java index 4ebaef718f..c7aa37a907 100644 --- a/core/src/main/java/io/atomix/AtomixProperties.java +++ b/core/src/main/java/io/atomix/AtomixProperties.java @@ -27,7 +27,7 @@ * * @author { + public static class TestResource extends Resource { public TestResource(CopycatClient client, Resource.Options options) { super(client, options); @@ -311,7 +311,7 @@ public String value() { * Value resource. */ @ResourceTypeInfo(id=2, stateMachine=ValueStateMachine.class, typeResolver=ValueResource.TypeResolver.class) - public static class ValueResource extends Resource { + public static class ValueResource extends Resource { public ValueResource(CopycatClient client, Resource.Options options) { super(client, options); diff --git a/core/src/test/java/io/atomix/AtomixReplicaTest.java b/core/src/test/java/io/atomix/AtomixReplicaTest.java index 03f6ff350b..2ebd335c63 100644 --- a/core/src/test/java/io/atomix/AtomixReplicaTest.java +++ b/core/src/test/java/io/atomix/AtomixReplicaTest.java @@ -193,7 +193,7 @@ public void testOperateMany() throws Throwable { * Test resource. */ @ResourceTypeInfo(id=3, stateMachine=TestStateMachine.class, typeResolver=TestResource.TypeResolver.class) - public static class TestResource extends Resource { + public static class TestResource extends Resource { public TestResource(CopycatClient client, Resource.Options options) { super(client, options); } @@ -271,7 +271,7 @@ public String value() { * Value resource. */ @ResourceTypeInfo(id=4, stateMachine=ValueStateMachine.class, typeResolver=ValueResource.TypeResolver.class) - public static class ValueResource extends Resource { + public static class ValueResource extends Resource { public ValueResource(CopycatClient client, Resource.Options options) { super(client, options); } diff --git a/manager/src/main/java/io/atomix/manager/ResourceClient.java b/manager/src/main/java/io/atomix/manager/ResourceClient.java index 23eed23cb6..2ffdb81a09 100644 --- a/manager/src/main/java/io/atomix/manager/ResourceClient.java +++ b/manager/src/main/java/io/atomix/manager/ResourceClient.java @@ -109,8 +109,8 @@ public static Builder builder(Collection

members) { final CopycatClient client; private final ResourceRegistry registry; - private final Map>, ResourceType> types = new ConcurrentHashMap<>(); - private final Map> instances = new HashMap<>(); + private final Map>, ResourceType> types = new ConcurrentHashMap<>(); + private final Map> instances = new HashMap<>(); private final Map futures = new HashMap<>(); /** @@ -136,7 +136,12 @@ public ThreadContext context() { } @Override - public final ResourceType type(Class> type) { + public Serializer serializer() { + return client.serializer(); + } + + @Override + public final ResourceType type(Class> type) { return types.computeIfAbsent(type, t -> { ResourceType resourceType = new ResourceType(type); if (registry.lookup(resourceType.id()) == null) @@ -158,7 +163,7 @@ public CompletableFuture> keys() { @Override @SuppressWarnings("unchecked") public CompletableFuture> keys(Class type) { - return keys(type((Class>) type)); + return keys(type((Class>) type)); } @Override @@ -169,31 +174,57 @@ public CompletableFuture> keys(ResourceType type) { @Override @SuppressWarnings("unchecked") public CompletableFuture get(String key, Class type) { - return get(key, type((Class>) type)); + return get(key, type((Class>) type), null, null); + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture get(String key, Class type, Resource.Config config) { + return this.get(key, type((Class>) type), config, null); + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture get(String key, Class type, Resource.Options options) { + return this.get(key, type((Class>) type), null, options); } @Override @SuppressWarnings("unchecked") - public , U extends Resource.Options> CompletableFuture get(String key, Class type, U options) { - return this.get(key, type((Class>) type), options); + public CompletableFuture get(String key, Class type, Resource.Config config, Resource.Options options) { + return this.get(key, type((Class>) type), config, options); } @Override @SuppressWarnings("unchecked") public CompletableFuture get(String key, ResourceType type) { - return get(key, type, null); + return get(key, type, null, null); + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture get(String key, ResourceType type, Resource.Config config) { + return this.get(key, type, config, null); } @Override @SuppressWarnings("unchecked") - public synchronized , U extends Resource.Options> CompletableFuture get(String key, ResourceType type, U options) { + public CompletableFuture get(String key, ResourceType type, Resource.Options options) { + return this.get(key, type, null, options); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized CompletableFuture get(String key, ResourceType type, Resource.Config config, Resource.Options options) { Assert.notNull(key, "key"); Assert.notNull(type, "type"); T resource; // Determine whether a singleton instance of the given resource key already exists. - Resource check = instances.get(key); + Resource check = instances.get(key); if (check == null) { + if (options == null) + options = new Resource.Options(); Instance instance = new Instance(key, type, Instance.Method.GET, this::close); InstanceClient client = new InstanceClient(instance, this.client); check = type.factory().create(client, options); @@ -213,6 +244,9 @@ public synchronized , U extends Resource.Options> Compl CompletableFuture future = futures.get(key); if (future == null) { future = resource.open(); + if (config != null) { + future = future.thenCompose(v -> resource.configure(config)); + } futures.put(key, future); } return future; @@ -246,7 +280,7 @@ public boolean isOpen() { public CompletableFuture close() { CompletableFuture[] futures = new CompletableFuture[instances.size()]; int i = 0; - for (Resource instance : instances.values()) { + for (Resource instance : instances.values()) { futures[i++] = instance.close(); } return CompletableFuture.allOf(futures).thenCompose(v -> client.close()); @@ -265,8 +299,8 @@ public String toString() { /** * Builds a {@link ResourceClient}. *

- * The client builder configures a {@link ResourceClient} to connect to a cluster of {@link ResourceServer}s - * or {@link ResourceReplica}. To create a client builder, use the {@link #builder(Address...)} method. + * The client builder configures a {@link ResourceClient} to connect to a cluster of {@link ResourceServer}s. + * To create a client builder, use the {@link #builder(Address...)} method. *

    *   {@code
    *   ResourceClient client = ResourceClient.builder(servers)
diff --git a/manager/src/main/java/io/atomix/manager/ResourceManager.java b/manager/src/main/java/io/atomix/manager/ResourceManager.java
index a3af444411..ba09f28a26 100644
--- a/manager/src/main/java/io/atomix/manager/ResourceManager.java
+++ b/manager/src/main/java/io/atomix/manager/ResourceManager.java
@@ -44,6 +44,17 @@ public interface ResourceManager> extends Managed
+   * The serializer applies to all resources controlled by this instance. Serializable types registered
+   * on the serializer will be reflected at the {@link io.atomix.catalyst.transport.Transport} layer.
+   * Types registered on the client must also be registered on the server for deserialization.
+   *
+   * @return The resource manager serializer.
+   */
+  Serializer serializer();
+
   /**
    * Returns the resource type for the given resource class.
    *
@@ -51,7 +62,7 @@ public interface ResourceManager> extends Managed> type);
+  ResourceType type(Class> type);
 
   /**
    * Checks whether a resource exists with the given key.
@@ -242,12 +253,92 @@ public interface ResourceManager> extends Managed The resource type.
    * @return A completable future to be completed once the resource has been loaded.
    * @throws NullPointerException if {@code key} or {@code type} are null
    * @throws IllegalArgumentException if {@code type} is inconsistent with a previously created type
    */
-  , U extends Resource.Options> CompletableFuture get(String key, Class type, U options);
+   CompletableFuture get(String key, Class type, Resource.Config config);
+
+  /**
+   * Gets or creates the given resource and acquires a singleton reference to it.
+   * 

+ * If a resource at the given key already exists, the resource will be validated to verify that its type + * matches the given type. If no resource yet exists, a new resource will be created in the cluster. Once + * the session for the resource has been opened, a resource instance will be returned. + *

+ * The returned {@link Resource} instance will be a singleton reference to an global instance for this node. + * That is, multiple calls to this method for the same resource will result in the same {@link Resource} + * instance being returned. + *

+ * This method returns a {@link CompletableFuture} which can be used to block until the operation completes + * or to be notified in a separate thread once the operation completes. To block until the operation completes, + * use the {@link CompletableFuture#get()} method: + *

+   *   {@code
+   *   DistributedLock lock = atomix.get("lock", DistributedLock.class).get();
+   *   }
+   * 
+ * Alternatively, to execute the operation asynchronous and be notified once the result is received in a different + * thread, use one of the many completable future callbacks: + *
+   *   {@code
+   *   atomix.get("lock", DistributedLock.class).thenAccept(lock -> {
+   *     ...
+   *   });
+   *   }
+   * 
+ * + * @param key The key at which to get the resource. + * @param type The expected resource type. + * @param options The local resource options. + * @param The resource type. + * @return A completable future to be completed once the resource has been loaded. + * @throws NullPointerException if {@code key} or {@code type} are null + * @throws IllegalArgumentException if {@code type} is inconsistent with a previously created type + */ + CompletableFuture get(String key, Class type, Resource.Options options); + + /** + * Gets or creates the given resource and acquires a singleton reference to it. + *

+ * If a resource at the given key already exists, the resource will be validated to verify that its type + * matches the given type. If no resource yet exists, a new resource will be created in the cluster. Once + * the session for the resource has been opened, a resource instance will be returned. + *

+ * The returned {@link Resource} instance will be a singleton reference to an global instance for this node. + * That is, multiple calls to this method for the same resource will result in the same {@link Resource} + * instance being returned. + *

+ * This method returns a {@link CompletableFuture} which can be used to block until the operation completes + * or to be notified in a separate thread once the operation completes. To block until the operation completes, + * use the {@link CompletableFuture#get()} method: + *

+   *   {@code
+   *   DistributedLock lock = atomix.get("lock", DistributedLock.class).get();
+   *   }
+   * 
+ * Alternatively, to execute the operation asynchronous and be notified once the result is received in a different + * thread, use one of the many completable future callbacks: + *
+   *   {@code
+   *   atomix.get("lock", DistributedLock.class).thenAccept(lock -> {
+   *     ...
+   *   });
+   *   }
+   * 
+ * + * @param key The key at which to get the resource. + * @param type The expected resource type. + * @param config The cluster-wide resource configuration. + * @param options The local resource options. + * @param The resource type. + * @return A completable future to be completed once the resource has been loaded. + * @throws NullPointerException if {@code key} or {@code type} are null + * @throws IllegalArgumentException if {@code type} is inconsistent with a previously created type + */ + CompletableFuture get(String key, Class type, Resource.Config config, Resource.Options options); /** * Gets or creates the given resource and acquires a singleton reference to it. @@ -318,11 +409,91 @@ public interface ResourceManager> extends Managed The resource type. + * @return A completable future to be completed once the resource has been loaded. + * @throws NullPointerException if {@code key} or {@code type} are null + * @throws IllegalArgumentException if {@code type} is inconsistent with a previously created type + */ + CompletableFuture get(String key, ResourceType type, Resource.Config config); + + /** + * Gets or creates the given resource and acquires a singleton reference to it. + *

+ * If a resource at the given key already exists, the resource will be validated to verify that its type + * matches the given type. If no resource yet exists, a new resource will be created in the cluster. Once + * the session for the resource has been opened, a resource instance will be returned. + *

+ * The returned {@link Resource} instance will be a singleton reference to an global instance for this node. + * That is, multiple calls to this method for the same resource will result in the same {@link Resource} + * instance being returned. + *

+ * This method returns a {@link CompletableFuture} which can be used to block until the operation completes + * or to be notified in a separate thread once the operation completes. To block until the operation completes, + * use the {@link CompletableFuture#get()} method: + *

+   *   {@code
+   *   DistributedLock lock = atomix.get("lock", DistributedLock.class).get();
+   *   }
+   * 
+ * Alternatively, to execute the operation asynchronous and be notified once the result is received in a different + * thread, use one of the many completable future callbacks: + *
+   *   {@code
+   *   atomix.get("lock", DistributedLock.class).thenAccept(lock -> {
+   *     ...
+   *   });
+   *   }
+   * 
+ * + * @param key The key at which to get the resource. + * @param type The expected resource type. + * @param options The local resource options. + * @param The resource type. + * @return A completable future to be completed once the resource has been loaded. + * @throws NullPointerException if {@code key} or {@code type} are null + * @throws IllegalArgumentException if {@code type} is inconsistent with a previously created type + */ + CompletableFuture get(String key, ResourceType type, Resource.Options options); + + /** + * Gets or creates the given resource and acquires a singleton reference to it. + *

+ * If a resource at the given key already exists, the resource will be validated to verify that its type + * matches the given type. If no resource yet exists, a new resource will be created in the cluster. Once + * the session for the resource has been opened, a resource instance will be returned. + *

+ * The returned {@link Resource} instance will be a singleton reference to an global instance for this node. + * That is, multiple calls to this method for the same resource will result in the same {@link Resource} + * instance being returned. + *

+ * This method returns a {@link CompletableFuture} which can be used to block until the operation completes + * or to be notified in a separate thread once the operation completes. To block until the operation completes, + * use the {@link CompletableFuture#get()} method: + *

+   *   {@code
+   *   DistributedLock lock = atomix.get("lock", DistributedLock.class).get();
+   *   }
+   * 
+ * Alternatively, to execute the operation asynchronous and be notified once the result is received in a different + * thread, use one of the many completable future callbacks: + *
+   *   {@code
+   *   atomix.get("lock", DistributedLock.class).thenAccept(lock -> {
+   *     ...
+   *   });
+   *   }
+   * 
+ * + * @param key The key at which to get the resource. + * @param type The expected resource type. + * @param config The cluster-wide resource configuration. + * @param options The local resource options. * @param The resource type. * @return A completable future to be completed once the resource has been loaded. * @throws NullPointerException if {@code key} or {@code type} are null * @throws IllegalArgumentException if {@code type} is inconsistent with a previously created type */ - , U extends Resource.Options> CompletableFuture get(String key, ResourceType type, U options); + CompletableFuture get(String key, ResourceType type, Resource.Config config, Resource.Options options); } diff --git a/manager/src/main/java/io/atomix/manager/ResourceServer.java b/manager/src/main/java/io/atomix/manager/ResourceServer.java index 590ccfe78a..1f22c4621e 100644 --- a/manager/src/main/java/io/atomix/manager/ResourceServer.java +++ b/manager/src/main/java/io/atomix/manager/ResourceServer.java @@ -21,6 +21,7 @@ import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.ConfigurationException; import io.atomix.catalyst.util.Managed; +import io.atomix.catalyst.util.concurrent.ThreadContext; import io.atomix.copycat.server.CopycatServer; import io.atomix.copycat.server.storage.Storage; import io.atomix.manager.state.ResourceManagerState; @@ -143,6 +144,28 @@ public ResourceServer(CopycatServer server) { this.server = Assert.notNull(server, "server"); } + /** + * Returns the server thread context. + * + * @return The server thread context. + */ + public ThreadContext context() { + return server.context(); + } + + /** + * Returns the server serializer. + *

+ * The server serializer handles serialization for all operations within the resource server. Serializable + * types registered on the server serializer will be reflected in the {@link Storage} and {@link Transport} + * layers. + * + * @return The server serializer. + */ + public Serializer serializer() { + return server.serializer(); + } + /** * Returns the underlying Copycat server. * diff --git a/messaging/src/main/java/io/atomix/messaging/DistributedMessageBus.java b/messaging/src/main/java/io/atomix/messaging/DistributedMessageBus.java index 51e1010445..82cea9334c 100644 --- a/messaging/src/main/java/io/atomix/messaging/DistributedMessageBus.java +++ b/messaging/src/main/java/io/atomix/messaging/DistributedMessageBus.java @@ -19,6 +19,7 @@ import io.atomix.catalyst.transport.Client; import io.atomix.catalyst.transport.Connection; import io.atomix.catalyst.transport.Server; +import io.atomix.catalyst.util.ConfigurationException; import io.atomix.catalyst.util.concurrent.Futures; import io.atomix.copycat.client.CopycatClient; import io.atomix.messaging.state.MessageBusCommands; @@ -46,11 +47,6 @@ * }); * } *

- * Once a message bus instance has been created, it's not immediately opened. The message bus instance must be explicitly - * opened by calling {@link #open(Address)}, providing an {@link Address} to which to bind the message bus server. Because - * each message bus instance runs on a separate server, it's recommended that nodes use a singleton instance of this - * resource by using {@code get(...)} rather than {@code create(...)} to get a reference to the resource. - *

* Messages are produced and consumed by {@link MessageProducer producers} and {@link MessageConsumer consumers} respectively. * Each producer and consumer is associated with a string message bus topic. *

@@ -69,9 +65,29 @@
  * @author 
-   * Alternatively, to execute the operation asynchronous and be notified once the lock is acquired in a different
-   * thread, use one of the many completable future callbacks:
-   * 
-   *   {@code
-   *   bus.open(new Address("123.456.789.0", 5000)).thenRun(() -> System.out.println("Message bus open!"));
-   *   }
-   * 
- * - * @param address The address on which to listen. - * @return A completable future to be completed once the message bus is started. + * Starts listening on the server. */ - public synchronized CompletableFuture open(Address address) { + public synchronized CompletableFuture listen(Address address) { if (openFuture != null) return openFuture; @@ -455,4 +457,50 @@ private Address next() { } } + /** + * Message bus options. + */ + public static class Options extends Resource.Options { + private static final String ADDRESS = "address"; + + public Options() { + } + + public Options(Properties defaults) { + super(defaults); + } + + /** + * Returns the message bus address. + * + * @return The message bus address. + */ + public Address getAddress() { + String addressString = getProperty(ADDRESS); + if (addressString == null) + throw new ConfigurationException("missing required message bus property: " + ADDRESS); + + String[] split = addressString.split(":"); + if (split.length != 2) + throw new ConfigurationException("malformed address string: " + addressString); + + try { + return new Address(split[0], Integer.valueOf(split[1])); + } catch (NumberFormatException e) { + throw new ConfigurationException("malformed port: " + split[1]); + } + } + + /** + * Sets the local message bus server address. + * + * @param address The local message bus server address. + * @return The message bus options. + */ + public Options withAddress(Address address) { + setProperty(ADDRESS, String.format("%s:%s", address.host(), address.port())); + return this; + } + } + } diff --git a/messaging/src/main/java/io/atomix/messaging/DistributedTaskQueue.java b/messaging/src/main/java/io/atomix/messaging/DistributedTaskQueue.java index 6eb5bce070..cf63f74330 100644 --- a/messaging/src/main/java/io/atomix/messaging/DistributedTaskQueue.java +++ b/messaging/src/main/java/io/atomix/messaging/DistributedTaskQueue.java @@ -71,7 +71,26 @@ * @author
Jordan Halterman */ @ResourceTypeInfo(id=-31, stateMachine=TopicState.class, typeResolver=TopicCommands.TypeResolver.class) -public class DistributedTopic extends Resource, Resource.Options> { +public class DistributedTopic extends Resource> { + + /** + * Returns new topic options. + * + * @return New topic options. + */ + public static Options options() { + return new Options(); + } + + /** + * Returns a new topic configuration. + * + * @return A new topic configuration. + */ + public static Config config() { + return new Config(); + } + private final Set> listeners = new HashSet<>(); @SuppressWarnings("unchecked") diff --git a/messaging/src/test/java/io/atomix/messaging/DistributedMessageBusTest.java b/messaging/src/test/java/io/atomix/messaging/DistributedMessageBusTest.java index 94f0725385..d01d9d731d 100644 --- a/messaging/src/test/java/io/atomix/messaging/DistributedMessageBusTest.java +++ b/messaging/src/test/java/io/atomix/messaging/DistributedMessageBusTest.java @@ -16,7 +16,6 @@ package io.atomix.messaging; import io.atomix.catalyst.transport.Address; -import io.atomix.resource.ResourceType; import io.atomix.testing.AbstractCopycatTest; import org.testng.annotations.Test; @@ -39,11 +38,8 @@ protected Class type() { public void testSend() throws Throwable { createServers(3); - DistributedMessageBus bus1 = createResource(); - DistributedMessageBus bus2 = createResource(); - - bus1.open(new Address("localhost", 6000)).join(); - bus2.open(new Address("localhost", 6001)).join(); + DistributedMessageBus bus1 = createResource(DistributedMessageBus.options().withAddress(new Address("localhost", 6000))); + DistributedMessageBus bus2 = createResource(DistributedMessageBus.options().withAddress(new Address("localhost", 6001))); bus1.consumer("test", message -> { threadAssertEquals(message, "Hello world!"); diff --git a/resource/src/main/java/io/atomix/resource/Configurable.java b/resource/src/main/java/io/atomix/resource/Configurable.java deleted file mode 100644 index c332975d7c..0000000000 --- a/resource/src/main/java/io/atomix/resource/Configurable.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ -package io.atomix.resource; - -import java.util.concurrent.CompletableFuture; - -/** - * Interface for configurable resources. - * - * @author configure(V config) { - return client.submit(new ResourceStateMachine.ConfigureCommand(config)).thenApply(v -> (T) this); - } - -} diff --git a/resource/src/main/java/io/atomix/resource/Resource.java b/resource/src/main/java/io/atomix/resource/Resource.java index c61dcded73..d9226936e0 100644 --- a/resource/src/main/java/io/atomix/resource/Resource.java +++ b/resource/src/main/java/io/atomix/resource/Resource.java @@ -25,6 +25,7 @@ import io.atomix.copycat.client.session.Session; import java.io.Serializable; +import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; @@ -51,21 +52,32 @@ * and the semantics of the specific resource implementation. * * @param resource type - * @param option type * @author Jordan Halterman */ -public abstract class Resource, U extends Resource.Options> implements Managed { +public abstract class Resource> implements Managed { /** * Resource configuration. */ - public interface Config extends Serializable { + public static class Config extends Properties implements Serializable { + public Config() { + } + + public Config(Properties defaults) { + super(defaults); + } } /** * Resource options. */ - public interface Options { + public static class Options extends Properties { + public Options() { + } + + public Options(Properties defaults) { + super(defaults); + } } /** @@ -141,12 +153,12 @@ public enum State { private final ResourceType type; protected final CopycatClient client; - protected final U options; + protected final Options options; private State state; private final Set changeListeners = new CopyOnWriteArraySet<>(); private Consistency consistency = Consistency.ATOMIC; - protected Resource(CopycatClient client, U options) { + protected Resource(CopycatClient client, Options options) { this.type = new ResourceType(getClass()); this.client = Assert.notNull(client, "client"); @@ -215,6 +227,17 @@ public ThreadContext context() { return client.context(); } + /** + * Configures the resource. + * + * @param config The resource configuration. + * @return A completable future to be completed once the resource has been configured. + */ + @SuppressWarnings("unchecked") + public CompletableFuture configure(Config config) { + return client.submit(new ResourceStateMachine.ConfigureCommand(config)).thenApply(v -> (T) this); + } + /** * Returns the configured resource consistency level. *

diff --git a/resource/src/main/java/io/atomix/resource/ResourceFactory.java b/resource/src/main/java/io/atomix/resource/ResourceFactory.java index 18cf312b87..8b95e1a56d 100644 --- a/resource/src/main/java/io/atomix/resource/ResourceFactory.java +++ b/resource/src/main/java/io/atomix/resource/ResourceFactory.java @@ -23,7 +23,7 @@ * @author ) type()); - T resource = (T) type.factory().create(client, null); + T resource = (T) type.factory().create(client, options); resource.open().thenRun(this::resume); resources.add(resource); await(10000); diff --git a/variables/src/main/java/io/atomix/variables/AbstractDistributedValue.java b/variables/src/main/java/io/atomix/variables/AbstractDistributedValue.java index c0f92c4f51..9f6b0e85db 100644 --- a/variables/src/main/java/io/atomix/variables/AbstractDistributedValue.java +++ b/variables/src/main/java/io/atomix/variables/AbstractDistributedValue.java @@ -28,9 +28,9 @@ * @author , U extends Resource.Options, V> extends Resource { +public abstract class AbstractDistributedValue, U> extends Resource { - protected AbstractDistributedValue(CopycatClient client, U options) { + protected AbstractDistributedValue(CopycatClient client, Options options) { super(client, options); } @@ -39,7 +39,7 @@ protected AbstractDistributedValue(CopycatClient client, U options) { * * @return A completable future to be completed with the current value. */ - public CompletableFuture get() { + public CompletableFuture get() { return submit(new ValueCommands.Get<>()); } @@ -49,7 +49,7 @@ public CompletableFuture get() { * @param value The current value. * @return A completable future to be completed once the value has been set. */ - public CompletableFuture set(V value) { + public CompletableFuture set(U value) { return submit(new ValueCommands.Set(value)); } @@ -60,7 +60,7 @@ public CompletableFuture set(V value) { * @param ttl The time after which to expire the value. * @return A completable future to be completed once the value has been set. */ - public CompletableFuture set(V value, Duration ttl) { + public CompletableFuture set(U value, Duration ttl) { return submit(new ValueCommands.Set(value, ttl.toMillis())); } @@ -70,7 +70,7 @@ public CompletableFuture set(V value, Duration ttl) { * @param value The updated value. * @return A completable future to be completed with the previous value. */ - public CompletableFuture getAndSet(V value) { + public CompletableFuture getAndSet(U value) { return submit(new ValueCommands.GetAndSet<>(value)); } @@ -81,7 +81,7 @@ public CompletableFuture getAndSet(V value) { * @param ttl The time after which to expire the value. * @return A completable future to be completed with the previous value. */ - public CompletableFuture getAndSet(V value, Duration ttl) { + public CompletableFuture getAndSet(U value, Duration ttl) { return submit(new ValueCommands.GetAndSet<>(value, ttl.toMillis())); } @@ -92,7 +92,7 @@ public CompletableFuture getAndSet(V value, Duration ttl) { * @param update The updated value. * @return A completable future to be completed with a boolean value indicating whether the value was updated. */ - public CompletableFuture compareAndSet(V expect, V update) { + public CompletableFuture compareAndSet(U expect, U update) { return submit(new ValueCommands.CompareAndSet(expect, update)); } @@ -104,7 +104,7 @@ public CompletableFuture compareAndSet(V expect, V update) { * @param ttl The time after which to expire the value. * @return A completable future to be completed with a boolean value indicating whether the value was updated. */ - public CompletableFuture compareAndSet(V expect, V update, Duration ttl) { + public CompletableFuture compareAndSet(U expect, U update, Duration ttl) { return submit(new ValueCommands.CompareAndSet(expect, update, ttl.toMillis())); } diff --git a/variables/src/main/java/io/atomix/variables/DistributedLong.java b/variables/src/main/java/io/atomix/variables/DistributedLong.java index c5d0f6055b..cd3fbd3844 100644 --- a/variables/src/main/java/io/atomix/variables/DistributedLong.java +++ b/variables/src/main/java/io/atomix/variables/DistributedLong.java @@ -30,7 +30,25 @@ * @author Jordan Halterman */ @ResourceTypeInfo(id=-1, stateMachine=ValueState.class, typeResolver=ValueCommands.TypeResolver.class) -public class DistributedValue extends AbstractDistributedValue, Resource.Options, T> { +public class DistributedValue extends AbstractDistributedValue, T> { + + /** + * Returns new value options. + * + * @return New value options. + */ + public static Options options() { + return new Options(); + } + + /** + * Returns a new value configuration. + * + * @return A new value configuration. + */ + public static Config config() { + return new Config(); + } public DistributedValue(CopycatClient client, Resource.Options options) { super(client, options);