Skip to content

Commit

Permalink
Add CONSENSUS node type and separate primary-backup partitions from c…
Browse files Browse the repository at this point in the history
…onsensus nodes.
  • Loading branch information
kuujo committed Mar 28, 2018
1 parent f9e41bd commit 994b4d3
Show file tree
Hide file tree
Showing 16 changed files with 203 additions and 134 deletions.
File renamed without changes.
67 changes: 29 additions & 38 deletions agent/src/main/java/io/atomix/agent/AtomixAgent.java
Expand Up @@ -23,7 +23,6 @@
import io.atomix.rest.ManagedRestService; import io.atomix.rest.ManagedRestService;
import io.atomix.rest.RestService; import io.atomix.rest.RestService;
import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.action.StoreTrueArgumentAction;
import net.sourceforge.argparse4j.inf.Argument; import net.sourceforge.argparse4j.inf.Argument;
import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.ArgumentParserException;
Expand All @@ -45,45 +44,38 @@ public class AtomixAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(AtomixAgent.class); private static final Logger LOGGER = LoggerFactory.getLogger(AtomixAgent.class);


public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ArgumentType<Node> nodeType = new ArgumentType<Node>() { ArgumentType<Node> nodeArgumentType = (ArgumentParser argumentParser, Argument argument, String value) -> {
@Override String[] address = parseAddress(value);
public Node convert(ArgumentParser argumentParser, Argument argument, String value) throws ArgumentParserException { return Node.builder(parseNodeId(address))
String[] address = parseAddress(value); .withType(Node.Type.CORE)
return Node.builder(parseNodeId(address)) .withEndpoint(parseEndpoint(address))
.withType(Node.Type.DATA) .build();
.withEndpoint(parseEndpoint(address))
.build();
}
}; };


ArgumentType<File> fileType = new ArgumentType<File>() { ArgumentType<Node.Type> typeArgumentType = (ArgumentParser argumentParser, Argument argument, String value) -> Node.Type.valueOf(value.toUpperCase());
@Override ArgumentType<File> fileArgumentType = (ArgumentParser argumentParser, Argument argument, String value) -> new File(value);
public File convert(ArgumentParser argumentParser, Argument argument, String value) throws ArgumentParserException {
return new File(value);
}
};


ArgumentParser parser = ArgumentParsers.newArgumentParser("AtomixServer") ArgumentParser parser = ArgumentParsers.newArgumentParser("AtomixServer")
.defaultHelp(true) .defaultHelp(true)
.description("Atomix server"); .description("Atomix server");
parser.addArgument("node") parser.addArgument("node")
.type(nodeType) .type(nodeArgumentType)
.nargs("?") .nargs("?")
.metavar("NAME:HOST:PORT") .metavar("NAME:HOST:PORT")
.setDefault(Node.builder("local") .setDefault(Node.builder("local")
.withType(Node.Type.DATA) .withType(Node.Type.CORE)
.withEndpoint(new Endpoint(InetAddress.getByName("127.0.0.1"), NettyMessagingService.DEFAULT_PORT)) .withEndpoint(new Endpoint(InetAddress.getByName("127.0.0.1"), NettyMessagingService.DEFAULT_PORT))
.build()) .build())
.help("The local node info"); .help("The local node info");
parser.addArgument("--client", "-c") parser.addArgument("--type", "-t")
.action(new StoreTrueArgumentAction()) .type(typeArgumentType)
.help("Indicates this is a client node"); .metavar("TYPE")
parser.addArgument("--server", "-s") .choices("core", "data", "client")
.action(new StoreTrueArgumentAction()) .setDefault(Node.Type.CORE)
.help("Indicates that this is a server node"); .help("Indicates the local node type");
parser.addArgument("--bootstrap", "-b") parser.addArgument("--bootstrap", "-b")
.nargs("*") .nargs("*")
.type(nodeType) .type(nodeArgumentType)
.metavar("NAME:HOST:PORT") .metavar("NAME:HOST:PORT")
.required(false) .required(false)
.help("Bootstraps a new cluster"); .help("Bootstraps a new cluster");
Expand All @@ -93,8 +85,8 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val
.required(false) .required(false)
.setDefault(5678) .setDefault(5678)
.help("An optional HTTP server port"); .help("An optional HTTP server port");
parser.addArgument("--data-dir", "-d") parser.addArgument("--data-dir", "-dd")
.type(fileType) .type(fileArgumentType)
.metavar("FILE") .metavar("FILE")
.required(false) .required(false)
.setDefault(new File(System.getProperty("user.dir"), "data")) .setDefault(new File(System.getProperty("user.dir"), "data"))
Expand All @@ -121,12 +113,11 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val
} }


Node localNode = namespace.get("node"); Node localNode = namespace.get("node");
if (namespace.getBoolean("client")) { Node.Type type = namespace.get("type");
localNode = Node.builder(localNode.id()) localNode = Node.builder(localNode.id())
.withType(Node.Type.CLIENT) .withType(type)
.withEndpoint(localNode.endpoint()) .withEndpoint(localNode.endpoint())
.build(); .build();
}


List<Node> bootstrap = namespace.getList("bootstrap"); List<Node> bootstrap = namespace.getList("bootstrap");
if (bootstrap == null) { if (bootstrap == null) {
Expand All @@ -138,15 +129,15 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val
Integer corePartitions = namespace.getInt("core_partitions"); Integer corePartitions = namespace.getInt("core_partitions");
Integer dataPartitions = namespace.getInt("data_partitions"); Integer dataPartitions = namespace.getInt("data_partitions");


LOGGER.info("Node: {}", localNode); LOGGER.info("node: {}", localNode);
LOGGER.info("Bootstrap: {}", bootstrap); LOGGER.info("bootstrap: {}", bootstrap);
LOGGER.info("Data: {}", dataDir); LOGGER.info("data-dir: {}", dataDir);


Atomix atomix = Atomix.builder() Atomix atomix = Atomix.builder()
.withLocalNode(localNode) .withLocalNode(localNode)
.withBootstrapNodes(bootstrap) .withBootstrapNodes(bootstrap)
.withDataDirectory(dataDir) .withDataDirectory(dataDir)
.withCoordinationPartitions(corePartitions) .withCorePartitions(corePartitions)
.withDataPartitions(dataPartitions) .withDataPartitions(dataPartitions)
.build(); .build();


Expand All @@ -165,7 +156,7 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val


rest.start().join(); rest.start().join();


LOGGER.info("Server listening at {}:{}", localNode.endpoint().host().getHostAddress(), httpPort); LOGGER.info("HTTP server listening at {}:{}", localNode.endpoint().host().getHostAddress(), httpPort);


synchronized (Atomix.class) { synchronized (Atomix.class) {
while (atomix.isRunning()) { while (atomix.isRunning()) {
Expand Down
Expand Up @@ -42,7 +42,7 @@ public static Builder builder() {


public ClusterMetadata(Collection<Node> bootstrapNodes) { public ClusterMetadata(Collection<Node> bootstrapNodes) {
this.bootstrapNodes = bootstrapNodes.stream() this.bootstrapNodes = bootstrapNodes.stream()
.filter(node -> node.type() == Type.DATA) .filter(node -> node.type() == Type.CORE)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
} }


Expand Down
19 changes: 19 additions & 0 deletions cluster/src/main/java/io/atomix/cluster/Node.java
Expand Up @@ -58,6 +58,20 @@ public static Builder builder(NodeId nodeId) {
return new Builder(checkNotNull(nodeId, "nodeId cannot be null")); return new Builder(checkNotNull(nodeId, "nodeId cannot be null"));
} }


/**
* Returns a new core node.
*
* @param nodeId the core node ID
* @param endpoint the core node endpoint
* @return a new core node
*/
public static Node core(NodeId nodeId, Endpoint endpoint) {
return builder(nodeId)
.withType(Type.CORE)
.withEndpoint(endpoint)
.build();
}

/** /**
* Returns a new data node. * Returns a new data node.
* *
Expand Down Expand Up @@ -91,6 +105,11 @@ public static Node client(NodeId nodeId, Endpoint endpoint) {
*/ */
public enum Type { public enum Type {


/**
* Represents a core node.
*/
CORE,

/** /**
* Represents a data node. * Represents a data node.
*/ */
Expand Down
Expand Up @@ -111,14 +111,14 @@ public Node getLocalNode() {
public Set<Node> getNodes() { public Set<Node> getNodes() {
return ImmutableSet.copyOf(nodes.values() return ImmutableSet.copyOf(nodes.values()
.stream() .stream()
.filter(node -> node.type() == Node.Type.DATA || node.getState() == State.ACTIVE) .filter(node -> node.type() == Node.Type.CORE || node.getState() == State.ACTIVE)
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }


@Override @Override
public Node getNode(NodeId nodeId) { public Node getNode(NodeId nodeId) {
Node node = nodes.get(nodeId); Node node = nodes.get(nodeId);
return node != null && (node.type() == Node.Type.DATA || node.getState() == State.ACTIVE) ? node : null; return node != null && (node.type() == Node.Type.CORE || node.getState() == State.ACTIVE) ? node : null;
} }


/** /**
Expand Down Expand Up @@ -211,9 +211,10 @@ private void deactivateNode(StatefulNode node) {
if (existingNode != null && existingNode.getState() == State.ACTIVE) { if (existingNode != null && existingNode.getState() == State.ACTIVE) {
existingNode.setState(State.INACTIVE); existingNode.setState(State.INACTIVE);
switch (existingNode.type()) { switch (existingNode.type()) {
case DATA: case CORE:
post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode)); post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
break; break;
case DATA:
case CLIENT: case CLIENT:
post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode)); post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode)); post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode));
Expand Down Expand Up @@ -243,7 +244,7 @@ private void handleMetadataEvent(ClusterMetadataEvent event) {


// Filter the set of data node IDs from the local node information. // Filter the set of data node IDs from the local node information.
Set<NodeId> dataNodes = nodes.entrySet().stream() Set<NodeId> dataNodes = nodes.entrySet().stream()
.filter(entry -> entry.getValue().type() == Node.Type.DATA) .filter(entry -> entry.getValue().type() == Node.Type.CORE)
.map(entry -> entry.getKey()) .map(entry -> entry.getKey())
.collect(Collectors.toSet()); .collect(Collectors.toSet());


Expand Down
Expand Up @@ -57,15 +57,15 @@ public void testSingleNodeBootstrap() throws Exception {


ClusterMetadata clusterMetadata = buildClusterMetadata(1); ClusterMetadata clusterMetadata = buildClusterMetadata(1);


Node localNode1 = buildNode(1, Node.Type.DATA); Node localNode1 = buildNode(1, Node.Type.CORE);
ManagedClusterMetadataService metadataService1 = new DefaultClusterMetadataService( ManagedClusterMetadataService metadataService1 = new DefaultClusterMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.endpoint()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.endpoint()).start().join());


metadataService1.start().join(); metadataService1.start().join();


assertEquals(1, metadataService1.getMetadata().bootstrapNodes().size()); assertEquals(1, metadataService1.getMetadata().bootstrapNodes().size());


Node localNode2 = buildNode(2, Node.Type.DATA); Node localNode2 = buildNode(2, Node.Type.CORE);
ManagedClusterMetadataService metadataService2 = new DefaultClusterMetadataService( ManagedClusterMetadataService metadataService2 = new DefaultClusterMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.endpoint()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.endpoint()).start().join());
metadataService2.start().join(); metadataService2.start().join();
Expand All @@ -80,15 +80,15 @@ public void testClusterMetadataService() throws Exception {


ClusterMetadata clusterMetadata = buildClusterMetadata(1, 2, 3); ClusterMetadata clusterMetadata = buildClusterMetadata(1, 2, 3);


Node localNode1 = buildNode(1, Node.Type.DATA); Node localNode1 = buildNode(1, Node.Type.CORE);
ManagedClusterMetadataService metadataService1 = new DefaultClusterMetadataService( ManagedClusterMetadataService metadataService1 = new DefaultClusterMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.endpoint()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.endpoint()).start().join());


Node localNode2 = buildNode(2, Node.Type.DATA); Node localNode2 = buildNode(2, Node.Type.CORE);
ManagedClusterMetadataService metadataService2 = new DefaultClusterMetadataService( ManagedClusterMetadataService metadataService2 = new DefaultClusterMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.endpoint()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.endpoint()).start().join());


Node localNode3 = buildNode(3, Node.Type.DATA); Node localNode3 = buildNode(3, Node.Type.CORE);
ManagedClusterMetadataService metadataService3 = new DefaultClusterMetadataService( ManagedClusterMetadataService metadataService3 = new DefaultClusterMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode3.endpoint()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode3.endpoint()).start().join());


Expand All @@ -102,7 +102,7 @@ public void testClusterMetadataService() throws Exception {
assertEquals(3, metadataService2.getMetadata().bootstrapNodes().size()); assertEquals(3, metadataService2.getMetadata().bootstrapNodes().size());
assertEquals(3, metadataService3.getMetadata().bootstrapNodes().size()); assertEquals(3, metadataService3.getMetadata().bootstrapNodes().size());


Node localNode4 = buildNode(4, Node.Type.DATA); Node localNode4 = buildNode(4, Node.Type.CORE);
ManagedClusterMetadataService metadataService4 = new DefaultClusterMetadataService( ManagedClusterMetadataService metadataService4 = new DefaultClusterMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode4.endpoint()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode4.endpoint()).start().join());
metadataService4.start().join(); metadataService4.start().join();
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testClusterMetadataService() throws Exception {
assertEquals(4, remoteEventListener3.event().subject().bootstrapNodes().size()); assertEquals(4, remoteEventListener3.event().subject().bootstrapNodes().size());
assertEquals(4, metadataService3.getMetadata().bootstrapNodes().size()); assertEquals(4, metadataService3.getMetadata().bootstrapNodes().size());


Node localNode5 = buildNode(5, Node.Type.DATA); Node localNode5 = buildNode(5, Node.Type.CORE);
ManagedClusterMetadataService metadataService5 = new DefaultClusterMetadataService( ManagedClusterMetadataService metadataService5 = new DefaultClusterMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode5.endpoint()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode5.endpoint()).start().join());
metadataService5.start().join(); metadataService5.start().join();
Expand All @@ -150,7 +150,7 @@ private ClusterMetadata buildClusterMetadata(Integer... bootstrapNodes) {
List<Node> bootstrap = new ArrayList<>(); List<Node> bootstrap = new ArrayList<>();
for (int bootstrapNode : bootstrapNodes) { for (int bootstrapNode : bootstrapNodes) {
bootstrap.add(Node.builder(String.valueOf(bootstrapNode)) bootstrap.add(Node.builder(String.valueOf(bootstrapNode))
.withType(Node.Type.DATA) .withType(Node.Type.CORE)
.withEndpoint(new Endpoint(localhost, bootstrapNode)) .withEndpoint(new Endpoint(localhost, bootstrapNode))
.build()); .build());
} }
Expand Down

0 comments on commit 994b4d3

Please sign in to comment.