Skip to content

Commit

Permalink
Simplify Atomix/AtomixCluster internals.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 26, 2018
1 parent c7624ed commit 0cf0e99
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 509 deletions.
281 changes: 100 additions & 181 deletions cluster/src/main/java/io/atomix/cluster/AtomixCluster.java
Expand Up @@ -85,36 +85,66 @@ public static Builder builder(ClusterConfig config) {


private static final Logger LOGGER = LoggerFactory.getLogger(AtomixCluster.class); private static final Logger LOGGER = LoggerFactory.getLogger(AtomixCluster.class);


private final Context context; protected final ManagedMessagingService messagingService;
protected final ManagedBroadcastService broadcastService;
protected final ManagedBootstrapMetadataService bootstrapMetadataService;
protected final ManagedPersistentMetadataService persistentMetadataService;
protected final ManagedClusterService clusterService;
protected final ManagedClusterMessagingService clusterMessagingService;
protected final ManagedClusterEventingService clusterEventingService;
protected volatile CompletableFuture openFuture; protected volatile CompletableFuture openFuture;
protected volatile CompletableFuture<Void> closeFuture; protected volatile CompletableFuture<Void> closeFuture;
private final ThreadContext threadContext = new SingleThreadContext("atomix-cluster-%d");
private final AtomicBoolean started = new AtomicBoolean();


public AtomixCluster(String configFile) { public AtomixCluster(String configFile) {
this(loadContext(new File(System.getProperty("user.dir"), configFile))); this(loadConfig(new File(System.getProperty("user.dir"), configFile)));
} }


public AtomixCluster(File configFile) { public AtomixCluster(File configFile) {
this(loadContext(configFile)); this(loadConfig(configFile));
} }


public AtomixCluster(ClusterConfig config) { public AtomixCluster(ClusterConfig config) {
this(buildContext(config)); // Apply profiles to all configurations.
} config.getProfile().configure(config);
config.getLocalNode().getProfile().configure(config.getLocalNode());
config.getNodes().forEach(node -> node.getProfile().configure(node));


protected AtomixCluster(Context context) { this.messagingService = buildMessagingService(config);
this.context = context; this.broadcastService = buildBroadcastService(config);
this.bootstrapMetadataService = buildBootstrapMetadataService(config);
this.persistentMetadataService = buildPersistentMetadataService(config, messagingService);
this.clusterService = buildClusterService(config, bootstrapMetadataService, persistentMetadataService, messagingService, broadcastService);
this.clusterMessagingService = buildClusterMessagingService(clusterService, messagingService);
this.clusterEventingService = buildClusterEventService(clusterService, messagingService);
} }


/**
* Returns the cluster service.
*
* @return the cluster service
*/
public ClusterService clusterService() { public ClusterService clusterService() {
return context.clusterService(); return clusterService;
} }


/**
* Returns the cluster messaging service.
*
* @return the cluster messaging service
*/
public ClusterMessagingService messagingService() { public ClusterMessagingService messagingService() {
return context.clusterMessagingService(); return clusterMessagingService;
} }


/**
* Returns the cluster event service.
*
* @return the cluster event service
*/
public ClusterEventingService eventingService() { public ClusterEventingService eventingService() {
return context.clusterEventingService(); return clusterEventingService;
} }


@Override @Override
Expand All @@ -129,14 +159,39 @@ public synchronized CompletableFuture<T> start() {
return openFuture; return openFuture;
} }


openFuture = context.start(); openFuture = startServices()
.thenComposeAsync(v -> joinCluster(), threadContext)
.thenComposeAsync(v -> completeStartup(), threadContext)
.thenApply(v -> this);


return openFuture; return openFuture;
} }


protected CompletableFuture<Void> startServices() {
return messagingService.start()
.thenComposeAsync(v -> broadcastService.start(), threadContext)
.thenComposeAsync(v -> bootstrapMetadataService.start(), threadContext)
.thenComposeAsync(v -> persistentMetadataService.start(), threadContext)
.thenComposeAsync(v -> clusterService.start(), threadContext)
.thenComposeAsync(v -> clusterMessagingService.start(), threadContext)
.thenComposeAsync(v -> clusterEventingService.start(), threadContext)
.thenApply(v -> null);
}

protected CompletableFuture<Void> joinCluster() {
persistentMetadataService.addNode(clusterService().getLocalNode());
return CompletableFuture.completedFuture(null);
}

protected CompletableFuture<Void> completeStartup() {
started.set(true);
LOGGER.info("Started");
return CompletableFuture.completedFuture(null);
}

@Override @Override
public boolean isRunning() { public boolean isRunning() {
return context.isRunning(); return started.get();
} }


@Override @Override
Expand All @@ -145,155 +200,45 @@ public synchronized CompletableFuture<Void> stop() {
return closeFuture; return closeFuture;
} }


closeFuture = context.stop(); closeFuture = leaveCluster()
.thenComposeAsync(v -> stopServices(), threadContext)
.thenComposeAsync(v -> completeShutdown(), threadContext);
return closeFuture; return closeFuture;
} }


@Override protected CompletableFuture<Void> leaveCluster() {
public String toString() { persistentMetadataService.removeNode(clusterService().getLocalNode());
return toStringHelper(this) return CompletableFuture.completedFuture(null);
.toString();
} }


/** protected CompletableFuture<Void> stopServices() {
* Atomix cluster context. return clusterMessagingService.stop()
*/ .exceptionally(e -> null)
protected static class Context implements Managed<Void> { .thenComposeAsync(v -> clusterEventingService.stop(), threadContext)
private final ManagedMessagingService messagingService; .exceptionally(e -> null)
private final ManagedBroadcastService broadcastService; .thenComposeAsync(v -> clusterService.stop(), threadContext)
private final ManagedBootstrapMetadataService bootstrapMetadataService; .exceptionally(e -> null)
private final ManagedPersistentMetadataService persistentMetadataService; .thenComposeAsync(v -> persistentMetadataService.stop(), threadContext)
private final ManagedClusterService clusterService; .exceptionally(e -> null)
private final ManagedClusterMessagingService clusterMessagingService; .thenComposeAsync(v -> bootstrapMetadataService.stop(), threadContext)
private final ManagedClusterEventingService clusterEventingService; .exceptionally(e -> null)
private final ThreadContext threadContext = new SingleThreadContext("atomix-%d"); .thenComposeAsync(v -> broadcastService.stop(), threadContext)
private final AtomicBoolean started = new AtomicBoolean(); .exceptionally(e -> null)

.thenComposeAsync(v -> messagingService.stop(), threadContext)
public Context( .exceptionally(e -> null);
ManagedMessagingService messagingService, }
ManagedBroadcastService broadcastService,
ManagedBootstrapMetadataService bootstrapMetadataService,
ManagedPersistentMetadataService persistentMetadataService,
ManagedClusterService clusterService,
ManagedClusterMessagingService clusterMessagingService,
ManagedClusterEventingService clusterEventingService) {
this.messagingService = messagingService;
this.broadcastService = broadcastService;
this.bootstrapMetadataService = bootstrapMetadataService;
this.persistentMetadataService = persistentMetadataService;
this.clusterService = clusterService;
this.clusterMessagingService = clusterMessagingService;
this.clusterEventingService = clusterEventingService;
}

public ManagedMessagingService messagingService() {
return messagingService;
}

public ManagedBroadcastService broadcastService() {
return broadcastService;
}

public ManagedBootstrapMetadataService bootstrapMetadataService() {
return bootstrapMetadataService;
}

public ManagedPersistentMetadataService persistentMetadataService() {
return persistentMetadataService;
}

public ManagedClusterService clusterService() {
return clusterService;
}

public ManagedClusterMessagingService clusterMessagingService() {
return clusterMessagingService;
}

public ManagedClusterEventingService clusterEventingService() {
return clusterEventingService;
}

public ThreadContext threadContext() {
return threadContext;
}

@Override
public CompletableFuture<Void> start() {
return startServices()
.thenComposeAsync(v -> joinCluster(), threadContext())
.thenComposeAsync(v -> completeStartup(), threadContext());
}

protected CompletableFuture<Void> startServices() {
return messagingService().start()
.thenComposeAsync(v -> broadcastService().start(), threadContext())
.thenComposeAsync(v -> bootstrapMetadataService().start(), threadContext())
.thenComposeAsync(v -> persistentMetadataService().start(), threadContext())
.thenComposeAsync(v -> clusterService().start(), threadContext())
.thenComposeAsync(v -> clusterMessagingService().start(), threadContext())
.thenComposeAsync(v -> clusterEventingService().start(), threadContext())
.thenApply(v -> null);
}

protected CompletableFuture<Void> joinCluster() {
persistentMetadataService().addNode(clusterService().getLocalNode());
return CompletableFuture.completedFuture(null);
}

protected CompletableFuture<Void> completeStartup() {
started.set(true);
LOGGER.info("Started");
return CompletableFuture.completedFuture(null);
}

@Override
public boolean isRunning() {
return started.get();
}

@Override
public CompletableFuture<Void> stop() {
return leaveCluster()
.thenComposeAsync(v -> stopServices(), threadContext())
.thenComposeAsync(v -> completeShutdown(), threadContext());
}

protected CompletableFuture<Void> leaveCluster() {
persistentMetadataService().removeNode(clusterService().getLocalNode());
return CompletableFuture.completedFuture(null);
}

protected CompletableFuture<Void> stopServices() {
return clusterMessagingService().stop()
.exceptionally(e -> null)
.thenComposeAsync(v -> clusterEventingService().stop(), threadContext())
.exceptionally(e -> null)
.thenComposeAsync(v -> clusterService().stop(), threadContext())
.exceptionally(e -> null)
.thenComposeAsync(v -> persistentMetadataService().stop(), threadContext())
.exceptionally(e -> null)
.thenComposeAsync(v -> bootstrapMetadataService().stop(), threadContext())
.exceptionally(e -> null)
.thenComposeAsync(v -> broadcastService().stop(), threadContext())
.exceptionally(e -> null)
.thenComposeAsync(v -> messagingService().stop(), threadContext())
.exceptionally(e -> null);
}


protected CompletableFuture<Void> completeShutdown() { protected CompletableFuture<Void> completeShutdown() {
threadContext().close(); threadContext.close();
started.set(false); started.set(false);
LOGGER.info("Stopped"); LOGGER.info("Stopped");
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
}
} }


/** @Override
* Loads a context from the given configuration file. public String toString() {
*/ return toStringHelper(this)
private static Context loadContext(File config) { .toString();
return buildContext(loadConfig(config));
} }


/** /**
Expand All @@ -303,32 +248,6 @@ private static ClusterConfig loadConfig(File config) {
return Configs.load(config, ClusterConfig.class); return Configs.load(config, ClusterConfig.class);
} }


/**
* Builds a context from the given configuration.
*/
private static Context buildContext(ClusterConfig config) {
// Apply profiles to all configurations.
config.getProfile().configure(config);
config.getLocalNode().getProfile().configure(config.getLocalNode());
config.getNodes().forEach(node -> node.getProfile().configure(node));

ManagedMessagingService messagingService = buildMessagingService(config);
ManagedBroadcastService broadcastService = buildBroadcastService(config);
ManagedBootstrapMetadataService bootstrapMetadataService = buildBootstrapMetadataService(config);
ManagedPersistentMetadataService persistentMetadataService = buildPersistentMetadataService(config, messagingService);
ManagedClusterService clusterService = buildClusterService(config, bootstrapMetadataService, persistentMetadataService, messagingService, broadcastService);
ManagedClusterMessagingService clusterMessagingService = buildClusterMessagingService(clusterService, messagingService);
ManagedClusterEventingService clusterEventingService = buildClusterEventService(clusterService, messagingService);
return new Context(
messagingService,
broadcastService,
bootstrapMetadataService,
persistentMetadataService,
clusterService,
clusterMessagingService,
clusterEventingService);
}

/** /**
* Builds a default messaging service. * Builds a default messaging service.
*/ */
Expand Down Expand Up @@ -450,7 +369,7 @@ public Builder withClusterName(String clusterName) {
* @return the cluster metadata builder * @return the cluster metadata builder
*/ */
public Builder withLocalNode(Node localNode) { public Builder withLocalNode(Node localNode) {
config.setLocalNode(localNode.config); config.setLocalNode(localNode.config());
return this; return this;
} }


Expand All @@ -471,7 +390,7 @@ public Builder withNodes(Node... coreNodes) {
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder withNodes(Collection<Node> nodes) { public Builder withNodes(Collection<Node> nodes) {
config.setNodes(nodes.stream().map(n -> n.config).collect(Collectors.toList())); config.setNodes(nodes.stream().map(n -> n.config()).collect(Collectors.toList()));
return this; return this;
} }


Expand Down

0 comments on commit 0cf0e99

Please sign in to comment.