Skip to content

Commit

Permalink
Synchronously initialize Atomix node on build.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Dec 15, 2017
1 parent c7b4817 commit 836df8a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
26 changes: 9 additions & 17 deletions README.md
Expand Up @@ -51,7 +51,7 @@ The builder should be configured with the local node configuration:
builder.withLocalNode(Node.builder()
.withId("foo")
.withType(Node.Type.DATA)
.withEndpoint(Endpoint.endpoint("localhost", 5000))
.withEndpoint(Endpoint.from("localhost", 5000))
.build());
```

Expand All @@ -62,15 +62,15 @@ Each instance should provide the same set of bootstrap nodes:
builder.withBootstrapNodes(
Node.builder("foo")
.withType(Node.Type.DATA)
.withEndpoint(Endpoint.endpoint("localhost", 5000)
.withEndpoint(Endpoint.from("localhost", 5000)
.build(),
Node.builder("bar")
.withType(Node.Type.DATA)
.withEndpoint(Endpoint.endpoint("localhost", 5001)
.withEndpoint(Endpoint.from("localhost", 5001)
.build(),
Node.builder("baz")
.withType(Node.Type.DATA)
.withEndpoint(Endpoint.endpoint("localhost", 5002)
.withEndpoint(Endpoint.from("localhost", 5002)
.build());
```

Expand All @@ -82,13 +82,7 @@ been configured, build the instance by calling `build()`:
Atomix atomix = builder.build();
```

Finally, connect the instance by calling `open()`:

```java
atomix.open().join();
```

Note that in order to form a cluster, a majority of instance must be `open()`ed simultaneously
Note that in order to form a cluster, a majority of instance must be created simultaneously
to allow Raft partitions to form a quorum.

### Connecting a client node
Expand All @@ -101,24 +95,22 @@ node builder that the node is a `CLIENT`:
Atomix atomix = Atomix.builder()
.withLocalNode(Node.builder("client")
.withType(Node.Type.CLIENT)
.withEndpoint(Endpoint.endpoint("localhost", 5003))
.withEndpoint(Endpoint.from("localhost", 5003))
.build())
.withBootstrapNodes(
Node.builder("foo")
.withType(Node.Type.DATA)
.withEndpoint(Endpoint.endpoint("localhost", 5000)
.withEndpoint(Endpoint.from("localhost", 5000)
.build(),
Node.builder("bar")
.withType(Node.Type.DATA)
.withEndpoint(Endpoint.endpoint("localhost", 5001)
.withEndpoint(Endpoint.from("localhost", 5001)
.build(),
Node.builder("baz")
.withType(Node.Type.DATA)
.withEndpoint(Endpoint.endpoint("localhost", 5002)
.withEndpoint(Endpoint.from("localhost", 5002)
.build())
.build();

atomix.open().join();
```

## Cluster management
Expand Down
43 changes: 39 additions & 4 deletions core/src/main/java/io/atomix/Atomix.java
Expand Up @@ -97,6 +97,8 @@ public static Builder builder() {
private final PrimitiveTypeRegistry primitiveTypes;
private final AtomicBoolean open = new AtomicBoolean();
private final ThreadContext context = new SingleThreadContext("atomix-%d");
private volatile CompletableFuture<Atomix> openFuture;
private volatile CompletableFuture<Void> closeFuture;

protected Atomix(
ManagedMessagingService messagingService,
Expand Down Expand Up @@ -213,8 +215,12 @@ public Set<String> getPrimitiveNames(PrimitiveType primitiveType) {
}

@Override
public CompletableFuture<Atomix> open() {
return messagingService.open()
public synchronized CompletableFuture<Atomix> open() {
if (openFuture != null) {
return openFuture;
}

openFuture = messagingService.open()
.thenComposeAsync(v -> metadataService.open(), context)
.thenComposeAsync(v -> clusterService.open(), context)
.thenComposeAsync(v -> clusterCommunicator.open(), context)
Expand All @@ -236,6 +242,7 @@ public CompletableFuture<Atomix> open() {
LOGGER.info("Started");
return this;
}, context);
return openFuture;
}

@Override
Expand All @@ -244,9 +251,13 @@ public boolean isOpen() {
}

@Override
public CompletableFuture<Void> close() {
public synchronized CompletableFuture<Void> close() {
if (closeFuture != null) {
return closeFuture;
}

metadataService.removeNode(clusterService.getLocalNode());
return primitives.close()
closeFuture = primitives.close()
.thenComposeAsync(v -> partitions.close(), context)
.thenComposeAsync(v -> corePartitionGroup.close(), context)
.thenComposeAsync(v -> clusterCommunicator.close(), context)
Expand All @@ -259,6 +270,7 @@ public CompletableFuture<Void> close() {
open.set(false);
LOGGER.info("Stopped");
});
return closeFuture;
}

@Override
Expand Down Expand Up @@ -460,8 +472,31 @@ public Builder addPrimitiveType(PrimitiveType primitiveType) {
return this;
}

/**
* Builds a new Atomix instance.
*
* @return a new Atomix instance
*/
@Override
public Atomix build() {
return buildInstance().open().join();
}

/**
* Asynchronously builds and starts a new Atomix instance.
*
* @return a future to be completed with a new Atomix instance
*/
public CompletableFuture<Atomix> buildAsync() {
return buildInstance().open();
}

/**
* Builds a new Atomix instance.
*
* @return a new Atomix instance
*/
private Atomix buildInstance() {
// If the local node has not be configured, create a default node.
if (localNode == null) {
try {
Expand Down

0 comments on commit 836df8a

Please sign in to comment.