From 836df8a167490fea9ca341528a53c29c342965e1 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Fri, 15 Dec 2017 15:00:06 -0800 Subject: [PATCH] Synchronously initialize Atomix node on build. --- README.md | 26 +++++--------- core/src/main/java/io/atomix/Atomix.java | 43 +++++++++++++++++++++--- 2 files changed, 48 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index d1aab21482..6c790bd821 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ The builder should be configured with the local node configuration: builder.withLocalNode(Node.builder() .withId("foo") .withType(Node.Type.DATA) - .withEndpoint(Endpoint.endpoint("localhost", 5000)) + .withEndpoint(Endpoint.from("localhost", 5000)) .build()); ``` @@ -62,15 +62,15 @@ Each instance should provide the same set of bootstrap nodes: builder.withBootstrapNodes( Node.builder("foo") .withType(Node.Type.DATA) - .withEndpoint(Endpoint.endpoint("localhost", 5000) + .withEndpoint(Endpoint.from("localhost", 5000) .build(), Node.builder("bar") .withType(Node.Type.DATA) - .withEndpoint(Endpoint.endpoint("localhost", 5001) + .withEndpoint(Endpoint.from("localhost", 5001) .build(), Node.builder("baz") .withType(Node.Type.DATA) - .withEndpoint(Endpoint.endpoint("localhost", 5002) + .withEndpoint(Endpoint.from("localhost", 5002) .build()); ``` @@ -82,13 +82,7 @@ been configured, build the instance by calling `build()`: Atomix atomix = builder.build(); ``` -Finally, connect the instance by calling `open()`: - -```java -atomix.open().join(); -``` - -Note that in order to form a cluster, a majority of instance must be `open()`ed simultaneously +Note that in order to form a cluster, a majority of instance must be created simultaneously to allow Raft partitions to form a quorum. ### Connecting a client node @@ -101,24 +95,22 @@ node builder that the node is a `CLIENT`: Atomix atomix = Atomix.builder() .withLocalNode(Node.builder("client") .withType(Node.Type.CLIENT) - .withEndpoint(Endpoint.endpoint("localhost", 5003)) + .withEndpoint(Endpoint.from("localhost", 5003)) .build()) .withBootstrapNodes( Node.builder("foo") .withType(Node.Type.DATA) - .withEndpoint(Endpoint.endpoint("localhost", 5000) + .withEndpoint(Endpoint.from("localhost", 5000) .build(), Node.builder("bar") .withType(Node.Type.DATA) - .withEndpoint(Endpoint.endpoint("localhost", 5001) + .withEndpoint(Endpoint.from("localhost", 5001) .build(), Node.builder("baz") .withType(Node.Type.DATA) - .withEndpoint(Endpoint.endpoint("localhost", 5002) + .withEndpoint(Endpoint.from("localhost", 5002) .build()) .build(); - -atomix.open().join(); ``` ## Cluster management diff --git a/core/src/main/java/io/atomix/Atomix.java b/core/src/main/java/io/atomix/Atomix.java index 80103d4a57..05adb3c953 100644 --- a/core/src/main/java/io/atomix/Atomix.java +++ b/core/src/main/java/io/atomix/Atomix.java @@ -97,6 +97,8 @@ public static Builder builder() { private final PrimitiveTypeRegistry primitiveTypes; private final AtomicBoolean open = new AtomicBoolean(); private final ThreadContext context = new SingleThreadContext("atomix-%d"); + private volatile CompletableFuture openFuture; + private volatile CompletableFuture closeFuture; protected Atomix( ManagedMessagingService messagingService, @@ -213,8 +215,12 @@ public Set getPrimitiveNames(PrimitiveType primitiveType) { } @Override - public CompletableFuture open() { - return messagingService.open() + public synchronized CompletableFuture open() { + if (openFuture != null) { + return openFuture; + } + + openFuture = messagingService.open() .thenComposeAsync(v -> metadataService.open(), context) .thenComposeAsync(v -> clusterService.open(), context) .thenComposeAsync(v -> clusterCommunicator.open(), context) @@ -236,6 +242,7 @@ public CompletableFuture open() { LOGGER.info("Started"); return this; }, context); + return openFuture; } @Override @@ -244,9 +251,13 @@ public boolean isOpen() { } @Override - public CompletableFuture close() { + public synchronized CompletableFuture close() { + if (closeFuture != null) { + return closeFuture; + } + metadataService.removeNode(clusterService.getLocalNode()); - return primitives.close() + closeFuture = primitives.close() .thenComposeAsync(v -> partitions.close(), context) .thenComposeAsync(v -> corePartitionGroup.close(), context) .thenComposeAsync(v -> clusterCommunicator.close(), context) @@ -259,6 +270,7 @@ public CompletableFuture close() { open.set(false); LOGGER.info("Stopped"); }); + return closeFuture; } @Override @@ -460,8 +472,31 @@ public Builder addPrimitiveType(PrimitiveType primitiveType) { return this; } + /** + * Builds a new Atomix instance. + * + * @return a new Atomix instance + */ @Override public Atomix build() { + return buildInstance().open().join(); + } + + /** + * Asynchronously builds and starts a new Atomix instance. + * + * @return a future to be completed with a new Atomix instance + */ + public CompletableFuture buildAsync() { + return buildInstance().open(); + } + + /** + * Builds a new Atomix instance. + * + * @return a new Atomix instance + */ + private Atomix buildInstance() { // If the local node has not be configured, create a default node. if (localNode == null) { try {