From 994b4d3f642701fc4d3aa28a440daca9e24c1800 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Thu, 22 Mar 2018 15:49:07 -0700 Subject: [PATCH] Add CONSENSUS node type and separate primary-backup partitions from consensus nodes. --- .../logs/atomix.log | 0 .../java/io/atomix/agent/AtomixAgent.java | 67 +++++++-------- .../io/atomix/cluster/ClusterMetadata.java | 2 +- .../src/main/java/io/atomix/cluster/Node.java | 19 +++++ .../cluster/impl/DefaultClusterService.java | 9 +- .../DefaultClusterMetadataServiceTest.java | 16 ++-- .../impl/DefaultClusterServiceTest.java | 82 ++++++++++++++----- .../DefaultClusterEventingServiceTest.java | 8 +- core/src/main/java/io/atomix/core/Atomix.java | 64 ++++++++------- .../io/atomix/core/AbstractAtomixTest.java | 31 +++++-- .../io/atomix/core/AbstractPrimitiveTest.java | 6 +- .../test/java/io/atomix/core/AtomixTest.java | 20 ++--- .../impl/PrimaryBackupPartitionServer.java | 3 +- .../io/atomix/cluster/TestClusterService.java | 4 +- .../raft/partition/RaftPartitionGroup.java | 4 +- .../protocols/raft/RaftPerformanceTest.java | 2 +- 16 files changed, 203 insertions(+), 134 deletions(-) rename core/src/test/java/io/atomix/core/BootstrapElection.java => agent/logs/atomix.log (100%) diff --git a/core/src/test/java/io/atomix/core/BootstrapElection.java b/agent/logs/atomix.log similarity index 100% rename from core/src/test/java/io/atomix/core/BootstrapElection.java rename to agent/logs/atomix.log diff --git a/agent/src/main/java/io/atomix/agent/AtomixAgent.java b/agent/src/main/java/io/atomix/agent/AtomixAgent.java index 4df7f67943..2d424f83d0 100644 --- a/agent/src/main/java/io/atomix/agent/AtomixAgent.java +++ b/agent/src/main/java/io/atomix/agent/AtomixAgent.java @@ -23,7 +23,6 @@ import io.atomix.rest.ManagedRestService; import io.atomix.rest.RestService; import net.sourceforge.argparse4j.ArgumentParsers; -import net.sourceforge.argparse4j.impl.action.StoreTrueArgumentAction; import net.sourceforge.argparse4j.inf.Argument; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; @@ -45,45 +44,38 @@ public class AtomixAgent { private static final Logger LOGGER = LoggerFactory.getLogger(AtomixAgent.class); public static void main(String[] args) throws Exception { - ArgumentType nodeType = new ArgumentType() { - @Override - public Node convert(ArgumentParser argumentParser, Argument argument, String value) throws ArgumentParserException { - String[] address = parseAddress(value); - return Node.builder(parseNodeId(address)) - .withType(Node.Type.DATA) - .withEndpoint(parseEndpoint(address)) - .build(); - } + ArgumentType nodeArgumentType = (ArgumentParser argumentParser, Argument argument, String value) -> { + String[] address = parseAddress(value); + return Node.builder(parseNodeId(address)) + .withType(Node.Type.CORE) + .withEndpoint(parseEndpoint(address)) + .build(); }; - ArgumentType fileType = new ArgumentType() { - @Override - public File convert(ArgumentParser argumentParser, Argument argument, String value) throws ArgumentParserException { - return new File(value); - } - }; + ArgumentType typeArgumentType = (ArgumentParser argumentParser, Argument argument, String value) -> Node.Type.valueOf(value.toUpperCase()); + ArgumentType fileArgumentType = (ArgumentParser argumentParser, Argument argument, String value) -> new File(value); ArgumentParser parser = ArgumentParsers.newArgumentParser("AtomixServer") .defaultHelp(true) .description("Atomix server"); parser.addArgument("node") - .type(nodeType) + .type(nodeArgumentType) .nargs("?") .metavar("NAME:HOST:PORT") .setDefault(Node.builder("local") - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(new Endpoint(InetAddress.getByName("127.0.0.1"), NettyMessagingService.DEFAULT_PORT)) .build()) .help("The local node info"); - parser.addArgument("--client", "-c") - .action(new StoreTrueArgumentAction()) - .help("Indicates this is a client node"); - parser.addArgument("--server", "-s") - .action(new StoreTrueArgumentAction()) - .help("Indicates that this is a server node"); + parser.addArgument("--type", "-t") + .type(typeArgumentType) + .metavar("TYPE") + .choices("core", "data", "client") + .setDefault(Node.Type.CORE) + .help("Indicates the local node type"); parser.addArgument("--bootstrap", "-b") .nargs("*") - .type(nodeType) + .type(nodeArgumentType) .metavar("NAME:HOST:PORT") .required(false) .help("Bootstraps a new cluster"); @@ -93,8 +85,8 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val .required(false) .setDefault(5678) .help("An optional HTTP server port"); - parser.addArgument("--data-dir", "-d") - .type(fileType) + parser.addArgument("--data-dir", "-dd") + .type(fileArgumentType) .metavar("FILE") .required(false) .setDefault(new File(System.getProperty("user.dir"), "data")) @@ -121,12 +113,11 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val } Node localNode = namespace.get("node"); - if (namespace.getBoolean("client")) { - localNode = Node.builder(localNode.id()) - .withType(Node.Type.CLIENT) - .withEndpoint(localNode.endpoint()) - .build(); - } + Node.Type type = namespace.get("type"); + localNode = Node.builder(localNode.id()) + .withType(type) + .withEndpoint(localNode.endpoint()) + .build(); List bootstrap = namespace.getList("bootstrap"); if (bootstrap == null) { @@ -138,15 +129,15 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val Integer corePartitions = namespace.getInt("core_partitions"); Integer dataPartitions = namespace.getInt("data_partitions"); - LOGGER.info("Node: {}", localNode); - LOGGER.info("Bootstrap: {}", bootstrap); - LOGGER.info("Data: {}", dataDir); + LOGGER.info("node: {}", localNode); + LOGGER.info("bootstrap: {}", bootstrap); + LOGGER.info("data-dir: {}", dataDir); Atomix atomix = Atomix.builder() .withLocalNode(localNode) .withBootstrapNodes(bootstrap) .withDataDirectory(dataDir) - .withCoordinationPartitions(corePartitions) + .withCorePartitions(corePartitions) .withDataPartitions(dataPartitions) .build(); @@ -165,7 +156,7 @@ public File convert(ArgumentParser argumentParser, Argument argument, String val rest.start().join(); - LOGGER.info("Server listening at {}:{}", localNode.endpoint().host().getHostAddress(), httpPort); + LOGGER.info("HTTP server listening at {}:{}", localNode.endpoint().host().getHostAddress(), httpPort); synchronized (Atomix.class) { while (atomix.isRunning()) { diff --git a/cluster/src/main/java/io/atomix/cluster/ClusterMetadata.java b/cluster/src/main/java/io/atomix/cluster/ClusterMetadata.java index 3269bc1416..f01f224c7b 100644 --- a/cluster/src/main/java/io/atomix/cluster/ClusterMetadata.java +++ b/cluster/src/main/java/io/atomix/cluster/ClusterMetadata.java @@ -42,7 +42,7 @@ public static Builder builder() { public ClusterMetadata(Collection bootstrapNodes) { this.bootstrapNodes = bootstrapNodes.stream() - .filter(node -> node.type() == Type.DATA) + .filter(node -> node.type() == Type.CORE) .collect(Collectors.toSet()); } diff --git a/cluster/src/main/java/io/atomix/cluster/Node.java b/cluster/src/main/java/io/atomix/cluster/Node.java index 7953e8ac7e..03925704fe 100644 --- a/cluster/src/main/java/io/atomix/cluster/Node.java +++ b/cluster/src/main/java/io/atomix/cluster/Node.java @@ -58,6 +58,20 @@ public static Builder builder(NodeId nodeId) { return new Builder(checkNotNull(nodeId, "nodeId cannot be null")); } + /** + * Returns a new core node. + * + * @param nodeId the core node ID + * @param endpoint the core node endpoint + * @return a new core node + */ + public static Node core(NodeId nodeId, Endpoint endpoint) { + return builder(nodeId) + .withType(Type.CORE) + .withEndpoint(endpoint) + .build(); + } + /** * Returns a new data node. * @@ -91,6 +105,11 @@ public static Node client(NodeId nodeId, Endpoint endpoint) { */ public enum Type { + /** + * Represents a core node. + */ + CORE, + /** * Represents a data node. */ diff --git a/cluster/src/main/java/io/atomix/cluster/impl/DefaultClusterService.java b/cluster/src/main/java/io/atomix/cluster/impl/DefaultClusterService.java index a0a8f0dc98..6a921dda20 100644 --- a/cluster/src/main/java/io/atomix/cluster/impl/DefaultClusterService.java +++ b/cluster/src/main/java/io/atomix/cluster/impl/DefaultClusterService.java @@ -111,14 +111,14 @@ public Node getLocalNode() { public Set getNodes() { return ImmutableSet.copyOf(nodes.values() .stream() - .filter(node -> node.type() == Node.Type.DATA || node.getState() == State.ACTIVE) + .filter(node -> node.type() == Node.Type.CORE || node.getState() == State.ACTIVE) .collect(Collectors.toList())); } @Override public Node getNode(NodeId nodeId) { Node node = nodes.get(nodeId); - return node != null && (node.type() == Node.Type.DATA || node.getState() == State.ACTIVE) ? node : null; + return node != null && (node.type() == Node.Type.CORE || node.getState() == State.ACTIVE) ? node : null; } /** @@ -211,9 +211,10 @@ private void deactivateNode(StatefulNode node) { if (existingNode != null && existingNode.getState() == State.ACTIVE) { existingNode.setState(State.INACTIVE); switch (existingNode.type()) { - case DATA: + case CORE: post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode)); break; + case DATA: case CLIENT: post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode)); post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode)); @@ -243,7 +244,7 @@ private void handleMetadataEvent(ClusterMetadataEvent event) { // Filter the set of data node IDs from the local node information. Set dataNodes = nodes.entrySet().stream() - .filter(entry -> entry.getValue().type() == Node.Type.DATA) + .filter(entry -> entry.getValue().type() == Node.Type.CORE) .map(entry -> entry.getKey()) .collect(Collectors.toSet()); diff --git a/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterMetadataServiceTest.java b/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterMetadataServiceTest.java index 4cc52f9d90..14f032d5a2 100644 --- a/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterMetadataServiceTest.java +++ b/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterMetadataServiceTest.java @@ -57,7 +57,7 @@ public void testSingleNodeBootstrap() throws Exception { ClusterMetadata clusterMetadata = buildClusterMetadata(1); - Node localNode1 = buildNode(1, Node.Type.DATA); + Node localNode1 = buildNode(1, Node.Type.CORE); ManagedClusterMetadataService metadataService1 = new DefaultClusterMetadataService( clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.endpoint()).start().join()); @@ -65,7 +65,7 @@ public void testSingleNodeBootstrap() throws Exception { assertEquals(1, metadataService1.getMetadata().bootstrapNodes().size()); - Node localNode2 = buildNode(2, Node.Type.DATA); + Node localNode2 = buildNode(2, Node.Type.CORE); ManagedClusterMetadataService metadataService2 = new DefaultClusterMetadataService( clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.endpoint()).start().join()); metadataService2.start().join(); @@ -80,15 +80,15 @@ public void testClusterMetadataService() throws Exception { ClusterMetadata clusterMetadata = buildClusterMetadata(1, 2, 3); - Node localNode1 = buildNode(1, Node.Type.DATA); + Node localNode1 = buildNode(1, Node.Type.CORE); ManagedClusterMetadataService metadataService1 = new DefaultClusterMetadataService( clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.endpoint()).start().join()); - Node localNode2 = buildNode(2, Node.Type.DATA); + Node localNode2 = buildNode(2, Node.Type.CORE); ManagedClusterMetadataService metadataService2 = new DefaultClusterMetadataService( clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.endpoint()).start().join()); - Node localNode3 = buildNode(3, Node.Type.DATA); + Node localNode3 = buildNode(3, Node.Type.CORE); ManagedClusterMetadataService metadataService3 = new DefaultClusterMetadataService( clusterMetadata, messagingServiceFactory.newMessagingService(localNode3.endpoint()).start().join()); @@ -102,7 +102,7 @@ public void testClusterMetadataService() throws Exception { assertEquals(3, metadataService2.getMetadata().bootstrapNodes().size()); assertEquals(3, metadataService3.getMetadata().bootstrapNodes().size()); - Node localNode4 = buildNode(4, Node.Type.DATA); + Node localNode4 = buildNode(4, Node.Type.CORE); ManagedClusterMetadataService metadataService4 = new DefaultClusterMetadataService( clusterMetadata, messagingServiceFactory.newMessagingService(localNode4.endpoint()).start().join()); metadataService4.start().join(); @@ -132,7 +132,7 @@ public void testClusterMetadataService() throws Exception { assertEquals(4, remoteEventListener3.event().subject().bootstrapNodes().size()); assertEquals(4, metadataService3.getMetadata().bootstrapNodes().size()); - Node localNode5 = buildNode(5, Node.Type.DATA); + Node localNode5 = buildNode(5, Node.Type.CORE); ManagedClusterMetadataService metadataService5 = new DefaultClusterMetadataService( clusterMetadata, messagingServiceFactory.newMessagingService(localNode5.endpoint()).start().join()); metadataService5.start().join(); @@ -150,7 +150,7 @@ private ClusterMetadata buildClusterMetadata(Integer... bootstrapNodes) { List bootstrap = new ArrayList<>(); for (int bootstrapNode : bootstrapNodes) { bootstrap.add(Node.builder(String.valueOf(bootstrapNode)) - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(new Endpoint(localhost, bootstrapNode)) .build()); } diff --git a/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterServiceTest.java b/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterServiceTest.java index 8e3ba60367..ec5974d116 100644 --- a/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterServiceTest.java +++ b/cluster/src/test/java/io/atomix/cluster/impl/DefaultClusterServiceTest.java @@ -59,7 +59,7 @@ private ClusterMetadata buildClusterMetadata(Integer... bootstrapNodes) { List bootstrap = new ArrayList<>(); for (int bootstrapNode : bootstrapNodes) { bootstrap.add(Node.builder(String.valueOf(bootstrapNode)) - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(new Endpoint(localhost, bootstrapNode)) .build()); } @@ -72,19 +72,19 @@ public void testClusterService() throws Exception { ClusterMetadata clusterMetadata = buildClusterMetadata(1, 2, 3); - Node localNode1 = buildNode(1, Node.Type.DATA); + Node localNode1 = buildNode(1, Node.Type.CORE); ManagedClusterService clusterService1 = new DefaultClusterService( localNode1, new TestClusterMetadataService(clusterMetadata), messagingServiceFactory.newMessagingService(localNode1.endpoint()).start().join()); - Node localNode2 = buildNode(2, Node.Type.DATA); + Node localNode2 = buildNode(2, Node.Type.CORE); ManagedClusterService clusterService2 = new DefaultClusterService( localNode2, new TestClusterMetadataService(clusterMetadata), messagingServiceFactory.newMessagingService(localNode2.endpoint()).start().join()); - Node localNode3 = buildNode(3, Node.Type.DATA); + Node localNode3 = buildNode(3, Node.Type.CORE); ManagedClusterService clusterService3 = new DefaultClusterService( localNode3, new TestClusterMetadataService(clusterMetadata), @@ -107,17 +107,41 @@ public void testClusterService() throws Exception { assertEquals(3, clusterService2.getNodes().size()); assertEquals(3, clusterService3.getNodes().size()); - assertEquals(Node.Type.DATA, clusterService1.getLocalNode().type()); - assertEquals(Node.Type.DATA, clusterService1.getNode(NodeId.from("1")).type()); - assertEquals(Node.Type.DATA, clusterService1.getNode(NodeId.from("2")).type()); - assertEquals(Node.Type.DATA, clusterService1.getNode(NodeId.from("3")).type()); + assertEquals(Node.Type.CORE, clusterService1.getLocalNode().type()); + assertEquals(Node.Type.CORE, clusterService1.getNode(NodeId.from("1")).type()); + assertEquals(Node.Type.CORE, clusterService1.getNode(NodeId.from("2")).type()); + assertEquals(Node.Type.CORE, clusterService1.getNode(NodeId.from("3")).type()); assertEquals(State.ACTIVE, clusterService1.getLocalNode().getState()); assertEquals(State.ACTIVE, clusterService1.getNode(NodeId.from("1")).getState()); assertEquals(State.ACTIVE, clusterService1.getNode(NodeId.from("2")).getState()); assertEquals(State.ACTIVE, clusterService1.getNode(NodeId.from("3")).getState()); - Node clientNode = buildNode(4, Node.Type.CLIENT); + Node dataNode = buildNode(4, Node.Type.DATA); + + ManagedClusterService dataClusterService = new DefaultClusterService( + dataNode, + new TestClusterMetadataService(clusterMetadata), + messagingServiceFactory.newMessagingService(dataNode.endpoint()).start().join()); + + assertEquals(State.INACTIVE, dataClusterService.getLocalNode().getState()); + + assertNull(dataClusterService.getNode(NodeId.from("1"))); + assertNull(dataClusterService.getNode(NodeId.from("2"))); + assertNull(dataClusterService.getNode(NodeId.from("3"))); + assertNull(dataClusterService.getNode(NodeId.from("4"))); + assertNull(dataClusterService.getNode(NodeId.from("5"))); + + dataClusterService.start().join(); + + Thread.sleep(1000); + + assertEquals(4, clusterService1.getNodes().size()); + assertEquals(4, clusterService2.getNodes().size()); + assertEquals(4, clusterService3.getNodes().size()); + assertEquals(4, dataClusterService.getNodes().size()); + + Node clientNode = buildNode(5, Node.Type.CLIENT); ManagedClusterService clientClusterService = new DefaultClusterService( clientNode, @@ -130,22 +154,25 @@ public void testClusterService() throws Exception { assertNull(clientClusterService.getNode(NodeId.from("2"))); assertNull(clientClusterService.getNode(NodeId.from("3"))); assertNull(clientClusterService.getNode(NodeId.from("4"))); + assertNull(clientClusterService.getNode(NodeId.from("5"))); clientClusterService.start().join(); - Thread.sleep(100); + Thread.sleep(1000); - assertEquals(4, clusterService1.getNodes().size()); - assertEquals(4, clusterService2.getNodes().size()); - assertEquals(4, clusterService3.getNodes().size()); - assertEquals(4, clientClusterService.getNodes().size()); + assertEquals(5, clusterService1.getNodes().size()); + assertEquals(5, clusterService2.getNodes().size()); + assertEquals(5, clusterService3.getNodes().size()); + assertEquals(5, dataClusterService.getNodes().size()); + assertEquals(5, clientClusterService.getNodes().size()); assertEquals(Node.Type.CLIENT, clientClusterService.getLocalNode().type()); - assertEquals(Node.Type.DATA, clientClusterService.getNode(NodeId.from("1")).type()); - assertEquals(Node.Type.DATA, clientClusterService.getNode(NodeId.from("2")).type()); - assertEquals(Node.Type.DATA, clientClusterService.getNode(NodeId.from("3")).type()); - assertEquals(Node.Type.CLIENT, clientClusterService.getNode(NodeId.from("4")).type()); + assertEquals(Node.Type.CORE, clientClusterService.getNode(NodeId.from("1")).type()); + assertEquals(Node.Type.CORE, clientClusterService.getNode(NodeId.from("2")).type()); + assertEquals(Node.Type.CORE, clientClusterService.getNode(NodeId.from("3")).type()); + assertEquals(Node.Type.DATA, clientClusterService.getNode(NodeId.from("4")).type()); + assertEquals(Node.Type.CLIENT, clientClusterService.getNode(NodeId.from("5")).type()); assertEquals(State.ACTIVE, clientClusterService.getLocalNode().getState()); @@ -153,6 +180,7 @@ public void testClusterService() throws Exception { assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("2")).getState()); assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("3")).getState()); assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("4")).getState()); + assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("5")).getState()); Thread.sleep(2500); @@ -160,18 +188,31 @@ public void testClusterService() throws Exception { Thread.sleep(2500); - assertEquals(4, clusterService2.getNodes().size()); - assertEquals(Node.Type.DATA, clusterService2.getNode(NodeId.from("1")).type()); + assertEquals(5, clusterService2.getNodes().size()); + assertEquals(Node.Type.CORE, clusterService2.getNode(NodeId.from("1")).type()); assertEquals(State.INACTIVE, clusterService2.getNode(NodeId.from("1")).getState()); assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("2")).getState()); assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("3")).getState()); assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("4")).getState()); + assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("5")).getState()); assertEquals(State.INACTIVE, clientClusterService.getNode(NodeId.from("1")).getState()); assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("2")).getState()); assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("3")).getState()); assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("4")).getState()); + assertEquals(State.ACTIVE, clientClusterService.getNode(NodeId.from("5")).getState()); + + dataClusterService.stop().join(); + + Thread.sleep(2500); + + assertEquals(4, clusterService2.getNodes().size()); + assertEquals(State.INACTIVE, clusterService2.getNode(NodeId.from("1")).getState()); + assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("2")).getState()); + assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("3")).getState()); + assertNull(clusterService2.getNode(NodeId.from("4"))); + assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("5")).getState()); clientClusterService.stop().join(); @@ -183,5 +224,6 @@ public void testClusterService() throws Exception { assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("2")).getState()); assertEquals(State.ACTIVE, clusterService2.getNode(NodeId.from("3")).getState()); assertNull(clusterService2.getNode(NodeId.from("4"))); + assertNull(clusterService2.getNode(NodeId.from("5"))); } } diff --git a/cluster/src/test/java/io/atomix/cluster/messaging/impl/DefaultClusterEventingServiceTest.java b/cluster/src/test/java/io/atomix/cluster/messaging/impl/DefaultClusterEventingServiceTest.java index 3a401bad4a..575a03aa5d 100644 --- a/cluster/src/test/java/io/atomix/cluster/messaging/impl/DefaultClusterEventingServiceTest.java +++ b/cluster/src/test/java/io/atomix/cluster/messaging/impl/DefaultClusterEventingServiceTest.java @@ -64,7 +64,7 @@ private ClusterMetadata buildClusterMetadata(Integer... bootstrapNodes) { List bootstrap = new ArrayList<>(); for (int bootstrapNode : bootstrapNodes) { bootstrap.add(Node.builder(String.valueOf(bootstrapNode)) - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(new Endpoint(localhost, bootstrapNode)) .build()); } @@ -77,17 +77,17 @@ public void testClusterEventService() throws Exception { ClusterMetadata clusterMetadata = buildClusterMetadata(1, 1, 2, 3); - Node localNode1 = buildNode(1, Node.Type.DATA); + Node localNode1 = buildNode(1, Node.Type.CORE); MessagingService messagingService1 = factory.newMessagingService(localNode1.endpoint()).start().join(); ClusterService clusterService1 = new DefaultClusterService(localNode1, new TestClusterMetadataService(clusterMetadata), messagingService1).start().join(); ClusterEventingService eventService1 = new DefaultClusterEventingService(clusterService1, messagingService1).start().join(); - Node localNode2 = buildNode(2, Node.Type.DATA); + Node localNode2 = buildNode(2, Node.Type.CORE); MessagingService messagingService2 = factory.newMessagingService(localNode2.endpoint()).start().join(); ClusterService clusterService2 = new DefaultClusterService(localNode2, new TestClusterMetadataService(clusterMetadata), messagingService2).start().join(); ClusterEventingService eventService2 = new DefaultClusterEventingService(clusterService2, messagingService2).start().join(); - Node localNode3 = buildNode(3, Node.Type.DATA); + Node localNode3 = buildNode(3, Node.Type.CORE); MessagingService messagingService3 = factory.newMessagingService(localNode3.endpoint()).start().join(); ClusterService clusterService3 = new DefaultClusterService(localNode3, new TestClusterMetadataService(clusterMetadata), messagingService3).start().join(); ClusterEventingService eventService3 = new DefaultClusterEventingService(clusterService3, messagingService3).start().join(); diff --git a/core/src/main/java/io/atomix/core/Atomix.java b/core/src/main/java/io/atomix/core/Atomix.java index 8c2db11693..f1cba2c3ad 100644 --- a/core/src/main/java/io/atomix/core/Atomix.java +++ b/core/src/main/java/io/atomix/core/Atomix.java @@ -93,7 +93,7 @@ public static Builder builder() { private final ManagedClusterService clusterService; private final ManagedClusterMessagingService clusterMessagingService; private final ManagedClusterEventingService clusterEventingService; - private final ManagedPartitionGroup corePartitionGroup; + private final ManagedPartitionGroup systemPartitionGroup; private final ManagedPartitionService partitions; private final ManagedPrimitivesService primitives; private final PrimitiveTypeRegistry primitiveTypes; @@ -108,7 +108,7 @@ protected Atomix( ManagedClusterService cluster, ManagedClusterMessagingService clusterMessagingService, ManagedClusterEventingService clusterEventingService, - ManagedPartitionGroup corePartitionGroup, + ManagedPartitionGroup systemPartitionGroup, ManagedPartitionService partitions, PrimitiveTypeRegistry primitiveTypes) { PrimitiveTypes.register(primitiveTypes); @@ -117,7 +117,7 @@ protected Atomix( this.clusterService = checkNotNull(cluster, "cluster cannot be null"); this.clusterMessagingService = checkNotNull(clusterMessagingService, "clusterCommunicator cannot be null"); this.clusterEventingService = checkNotNull(clusterEventingService, "clusterEventService cannot be null"); - this.corePartitionGroup = checkNotNull(corePartitionGroup, "corePartitionGroup cannot be null"); + this.systemPartitionGroup = checkNotNull(systemPartitionGroup, "systemPartitionGroup cannot be null"); this.partitions = checkNotNull(partitions, "partitions cannot be null"); this.primitiveTypes = checkNotNull(primitiveTypes, "primitiveTypes cannot be null"); this.primitives = new CorePrimitivesService(cluster, clusterMessagingService, clusterEventingService, partitions); @@ -222,11 +222,11 @@ public synchronized CompletableFuture start() { .thenComposeAsync(v -> clusterService.start(), context) .thenComposeAsync(v -> clusterMessagingService.start(), context) .thenComposeAsync(v -> clusterEventingService.start(), context) - .thenComposeAsync(v -> corePartitionGroup.open( + .thenComposeAsync(v -> systemPartitionGroup.open( new DefaultPartitionManagementService(metadataService, clusterService, clusterMessagingService, primitiveTypes, null, null)), context) .thenComposeAsync(v -> { - ManagedPrimaryElectionService electionService = new LeaderElectorPrimaryElectionService(corePartitionGroup); - ManagedSessionIdService sessionIdService = new IdGeneratorSessionIdService(corePartitionGroup); + ManagedPrimaryElectionService electionService = new LeaderElectorPrimaryElectionService(systemPartitionGroup); + ManagedSessionIdService sessionIdService = new IdGeneratorSessionIdService(systemPartitionGroup); return electionService.start() .thenComposeAsync(v2 -> sessionIdService.start(), context) .thenApply(v2 -> new DefaultPartitionManagementService(metadataService, clusterService, clusterMessagingService, primitiveTypes, electionService, sessionIdService)); @@ -256,7 +256,7 @@ public synchronized CompletableFuture stop() { metadataService.removeNode(clusterService.getLocalNode()); closeFuture = primitives.stop() .thenComposeAsync(v -> partitions.close(), context) - .thenComposeAsync(v -> corePartitionGroup.close(), context) + .thenComposeAsync(v -> systemPartitionGroup.close(), context) .thenComposeAsync(v -> clusterMessagingService.stop(), context) .thenComposeAsync(v -> clusterEventingService.stop(), context) .thenComposeAsync(v -> clusterService.stop(), context) @@ -283,20 +283,22 @@ public String toString() { public static class Builder implements io.atomix.utils.Builder { protected static final String DEFAULT_CLUSTER_NAME = "atomix"; // Default to 7 Raft partitions to allow a leader per node in 7 node clusters - protected static final int DEFAULT_COORDINATION_PARTITIONS = 7; + protected static final int DEFAULT_CORE_PARTITIONS = 7; // Default to 3-node partitions for the best latency/throughput per Raft partition - protected static final int DEFAULT_COORDINATION_PARTITION_SIZE = 3; + protected static final int DEFAULT_CORE_PARTITION_SIZE = 3; // Default to 71 primary-backup partitions - a prime number that creates about 10 partitions per node in a 7-node cluster protected static final int DEFAULT_DATA_PARTITIONS = 71; - protected static final String COORDINATION_GROUP_NAME = "coordination"; + + protected static final String SYSTEM_GROUP_NAME = "system"; + protected static final String CORE_GROUP_NAME = "core"; protected static final String DATA_GROUP_NAME = "data"; protected String name = DEFAULT_CLUSTER_NAME; protected Node localNode; protected Collection bootstrapNodes; protected File dataDirectory = new File(System.getProperty("user.dir"), "data"); - protected int numCoordinationPartitions = DEFAULT_COORDINATION_PARTITIONS; - protected int coordinationPartitionSize = DEFAULT_COORDINATION_PARTITION_SIZE; + protected int numCorePartitions = DEFAULT_CORE_PARTITIONS; + protected int corePartitionSize = DEFAULT_CORE_PARTITION_SIZE; protected int numDataPartitions = DEFAULT_DATA_PARTITIONS; protected Collection partitionGroups = new ArrayList<>(); protected PrimitiveTypeRegistry primitiveTypes = new PrimitiveTypeRegistry(); @@ -360,28 +362,28 @@ public Builder withDataDirectory(File dataDirectory) { } /** - * Sets the number of coordination (Raft) partitions. + * Sets the number of core (Raft) partitions. * - * @param corePartitions the number of coordination partitions + * @param corePartitions the number of core partitions * @return the Atomix builder * @throws IllegalArgumentException if the number of partitions is not positive */ - public Builder withCoordinationPartitions(int corePartitions) { + public Builder withCorePartitions(int corePartitions) { checkArgument(corePartitions > 0, "corePartitions must be positive"); - this.numCoordinationPartitions = corePartitions; + this.numCorePartitions = corePartitions; return this; } /** - * Sets the coordination (Raft) partition size. + * Sets the core (Raft) partition size. * - * @param partitionSize the coordination partition size + * @param partitionSize the core partition size * @return the Atomix builder * @throws IllegalArgumentException if the partition size is not positive */ - public Builder withCoordinationPartitionSize(int partitionSize) { + public Builder withCorePartitionSize(int partitionSize) { checkArgument(partitionSize > 0, "partitionSize must be positive"); - this.coordinationPartitionSize = partitionSize; + this.corePartitionSize = partitionSize; return this; } @@ -480,7 +482,7 @@ public Atomix build() { try { InetAddress address = getLocalAddress(); localNode = Node.builder(address.getHostName()) - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(new Endpoint(address, NettyMessagingService.DEFAULT_PORT)) .build(); } catch (UnknownHostException e) { @@ -490,7 +492,7 @@ public Atomix build() { // If the bootstrap nodes have not been configured, default to the local node if possible. if (bootstrapNodes == null) { - if (localNode.type() == Node.Type.DATA) { + if (localNode.type() == Node.Type.CORE) { bootstrapNodes = Collections.singleton(localNode); } else { throw new ConfigurationException("No bootstrap nodes configured"); @@ -502,7 +504,7 @@ public Atomix build() { ManagedClusterService clusterService = buildClusterService(metadataService, messagingService); ManagedClusterMessagingService clusterMessagingService = buildClusterMessagingService(clusterService, messagingService); ManagedClusterEventingService clusterEventService = buildClusterEventService(clusterService, messagingService); - ManagedPartitionGroup corePartitionGroup = buildCorePartitionGroup(); + ManagedPartitionGroup systemPartitionGroup = buildSystemPartitionGroup(); ManagedPartitionService partitionService = buildPartitionService(); return new Atomix( messagingService, @@ -510,7 +512,7 @@ public Atomix build() { clusterService, clusterMessagingService, clusterEventService, - corePartitionGroup, + systemPartitionGroup, partitionService, primitiveTypes); } @@ -567,10 +569,10 @@ protected ManagedClusterEventingService buildClusterEventService( /** * Builds the core partition group. */ - protected ManagedPartitionGroup buildCorePartitionGroup() { - return RaftPartitionGroup.builder("core") + protected ManagedPartitionGroup buildSystemPartitionGroup() { + return RaftPartitionGroup.builder(SYSTEM_GROUP_NAME) .withNumPartitions(1) - .withDataDirectory(new File(dataDirectory, "core")) + .withDataDirectory(new File(dataDirectory, SYSTEM_GROUP_NAME)) .build(); } @@ -579,10 +581,10 @@ protected ManagedPartitionGroup buildCorePartitionGroup() { */ protected ManagedPartitionService buildPartitionService() { if (partitionGroups.isEmpty()) { - partitionGroups.add(RaftPartitionGroup.builder(COORDINATION_GROUP_NAME) - .withDataDirectory(new File(dataDirectory, COORDINATION_GROUP_NAME)) - .withNumPartitions(numCoordinationPartitions) - .withPartitionSize(coordinationPartitionSize) + partitionGroups.add(RaftPartitionGroup.builder(CORE_GROUP_NAME) + .withDataDirectory(new File(dataDirectory, CORE_GROUP_NAME)) + .withNumPartitions(numCorePartitions) + .withPartitionSize(corePartitionSize) .build()); partitionGroups.add(PrimaryBackupPartitionGroup.builder(DATA_GROUP_NAME) .withNumPartitions(numDataPartitions) diff --git a/core/src/test/java/io/atomix/core/AbstractAtomixTest.java b/core/src/test/java/io/atomix/core/AbstractAtomixTest.java index 40dab2be7f..4e65137360 100644 --- a/core/src/test/java/io/atomix/core/AbstractAtomixTest.java +++ b/core/src/test/java/io/atomix/core/AbstractAtomixTest.java @@ -25,8 +25,10 @@ import io.atomix.primitive.PrimitiveTypeRegistry; import io.atomix.primitive.partition.ManagedPartitionGroup; import io.atomix.primitive.partition.ManagedPartitionService; +import io.atomix.primitive.partition.PartitionGroup; import io.atomix.primitive.partition.impl.DefaultPartitionService; import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup; +import io.atomix.protocols.raft.RaftProtocol; import io.atomix.protocols.raft.partition.RaftPartitionGroup; import io.atomix.storage.StorageLevel; import org.junit.AfterClass; @@ -68,7 +70,7 @@ protected static Atomix createAtomix(Node.Type type, int id, Integer... ids) { Collection bootstrapNodes = Stream.of(ids) .map(nodeId -> Node.builder(String.valueOf(nodeId)) - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(Endpoint.from("localhost", BASE_PORT + nodeId)) .build()) .collect(Collectors.toList()); @@ -78,7 +80,7 @@ protected static Atomix createAtomix(Node.Type type, int id, Integer... ids) { .withDataDirectory(new File("target/test-logs/" + id)) .withLocalNode(localNode) .withBootstrapNodes(bootstrapNodes) - .withCoordinationPartitions(3) + .withCorePartitions(3) .withDataPartitions(3) // Lower number of partitions for faster testing .build(); } @@ -125,10 +127,10 @@ protected ManagedMessagingService buildMessagingService() { } @Override - protected ManagedPartitionGroup buildCorePartitionGroup() { - return RaftPartitionGroup.builder("core") + protected ManagedPartitionGroup buildSystemPartitionGroup() { + return RaftPartitionGroup.builder(SYSTEM_GROUP_NAME) .withStorageLevel(StorageLevel.MEMORY) - .withDataDirectory(new File(dataDirectory, "core")) + .withDataDirectory(new File(dataDirectory, SYSTEM_GROUP_NAME)) .withNumPartitions(1) .build(); } @@ -136,15 +138,26 @@ protected ManagedPartitionGroup buildCorePartitionGroup() { @Override protected ManagedPartitionService buildPartitionService() { if (partitionGroups.isEmpty()) { - partitionGroups.add(RaftPartitionGroup.builder(COORDINATION_GROUP_NAME) + partitionGroups.add(RaftPartitionGroup.builder(CORE_GROUP_NAME) .withStorageLevel(StorageLevel.MEMORY) - .withDataDirectory(new File(dataDirectory, "coordination")) - .withNumPartitions(numCoordinationPartitions > 0 ? numCoordinationPartitions : bootstrapNodes.size()) - .withPartitionSize(coordinationPartitionSize) + .withDataDirectory(new File(dataDirectory, CORE_GROUP_NAME)) + .withNumPartitions(numCorePartitions > 0 ? numCorePartitions : bootstrapNodes.size()) + .withPartitionSize(corePartitionSize) .build()); partitionGroups.add(PrimaryBackupPartitionGroup.builder(DATA_GROUP_NAME) .withNumPartitions(numDataPartitions) .build()); + } else { + boolean hasConsensus = partitionGroups.stream() + .anyMatch(group -> group.type() == RaftProtocol.TYPE); + if (!hasConsensus) { + partitionGroups.add(RaftPartitionGroup.builder(CORE_GROUP_NAME) + .withStorageLevel(StorageLevel.MEMORY) + .withDataDirectory(new File(dataDirectory, CORE_GROUP_NAME)) + .withNumPartitions(numCorePartitions > 0 ? numCorePartitions : bootstrapNodes.size()) + .withPartitionSize(corePartitionSize) + .build()); + } } return new DefaultPartitionService(partitionGroups); } diff --git a/core/src/test/java/io/atomix/core/AbstractPrimitiveTest.java b/core/src/test/java/io/atomix/core/AbstractPrimitiveTest.java index 50048d7ad5..7fe443284b 100644 --- a/core/src/test/java/io/atomix/core/AbstractPrimitiveTest.java +++ b/core/src/test/java/io/atomix/core/AbstractPrimitiveTest.java @@ -47,9 +47,9 @@ protected Atomix atomix() throws Exception { public static void setupAtomix() throws Exception { AbstractAtomixTest.setupAtomix(); instances = new ArrayList<>(); - instances.add(createAtomix(Node.Type.DATA, 1, 1, 2, 3)); - instances.add(createAtomix(Node.Type.DATA, 2, 1, 2, 3)); - instances.add(createAtomix(Node.Type.DATA, 3, 1, 2, 3)); + instances.add(createAtomix(Node.Type.CORE, 1, 1, 2, 3)); + instances.add(createAtomix(Node.Type.CORE, 2, 1, 2, 3)); + instances.add(createAtomix(Node.Type.CORE, 3, 1, 2, 3)); List> futures = instances.stream().map(Atomix::start).collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get(30, TimeUnit.SECONDS); } diff --git a/core/src/test/java/io/atomix/core/AtomixTest.java b/core/src/test/java/io/atomix/core/AtomixTest.java index 6c2151aa3a..1a841b77d4 100644 --- a/core/src/test/java/io/atomix/core/AtomixTest.java +++ b/core/src/test/java/io/atomix/core/AtomixTest.java @@ -72,14 +72,14 @@ protected CompletableFuture startAtomix(Node.Type type, int id, Integer. */ @Test public void testScaleUp() throws Exception { - Atomix atomix1 = startAtomix(Node.Type.DATA, 1, 1).join(); - Atomix atomix2 = startAtomix(Node.Type.DATA, 2, 1, 2).join(); - Atomix atomix3 = startAtomix(Node.Type.DATA, 3, 1, 2, 3).join(); + Atomix atomix1 = startAtomix(Node.Type.CORE, 1, 1).join(); + Atomix atomix2 = startAtomix(Node.Type.CORE, 2, 1, 2).join(); + Atomix atomix3 = startAtomix(Node.Type.CORE, 3, 1, 2, 3).join(); } @Test public void testStopStart() throws Exception { - Atomix atomix1 = startAtomix(Node.Type.DATA, 1, 1).join(); + Atomix atomix1 = startAtomix(Node.Type.CORE, 1, 1).join(); atomix1.stop().join(); try { atomix1.start().join(); @@ -96,9 +96,9 @@ public void testStopStart() throws Exception { @Test public void testScaleDown() throws Exception { List> futures = new ArrayList<>(); - futures.add(startAtomix(Node.Type.DATA, 1, 1, 2, 3)); - futures.add(startAtomix(Node.Type.DATA, 2, 1, 2, 3)); - futures.add(startAtomix(Node.Type.DATA, 3, 1, 2, 3)); + futures.add(startAtomix(Node.Type.CORE, 1, 1, 2, 3)); + futures.add(startAtomix(Node.Type.CORE, 2, 1, 2, 3)); + futures.add(startAtomix(Node.Type.CORE, 3, 1, 2, 3)); Futures.allOf(futures).join(); instances.get(0).stop().join(); instances.get(1).stop().join(); @@ -111,9 +111,9 @@ public void testScaleDown() throws Exception { @Test public void testClientJoinLeave() throws Exception { List> futures = new ArrayList<>(); - futures.add(startAtomix(Node.Type.DATA, 1, 1, 2, 3)); - futures.add(startAtomix(Node.Type.DATA, 2, 1, 2, 3)); - futures.add(startAtomix(Node.Type.DATA, 3, 1, 2, 3)); + futures.add(startAtomix(Node.Type.CORE, 1, 1, 2, 3)); + futures.add(startAtomix(Node.Type.CORE, 2, 1, 2, 3)); + futures.add(startAtomix(Node.Type.CORE, 3, 1, 2, 3)); Futures.allOf(futures).join(); TestClusterEventListener dataListener = new TestClusterEventListener(); diff --git a/protocols/primary-backup/src/main/java/io/atomix/protocols/backup/partition/impl/PrimaryBackupPartitionServer.java b/protocols/primary-backup/src/main/java/io/atomix/protocols/backup/partition/impl/PrimaryBackupPartitionServer.java index 9068a002c1..7e02fbad23 100644 --- a/protocols/primary-backup/src/main/java/io/atomix/protocols/backup/partition/impl/PrimaryBackupPartitionServer.java +++ b/protocols/primary-backup/src/main/java/io/atomix/protocols/backup/partition/impl/PrimaryBackupPartitionServer.java @@ -51,7 +51,8 @@ public PrimaryBackupPartitionServer( @Override public CompletableFuture start() { - if (managementService.getClusterService().getLocalNode().type() == Node.Type.DATA) { + Node.Type localNodeType = managementService.getClusterService().getLocalNode().type(); + if (localNodeType == Node.Type.CORE || localNodeType == Node.Type.DATA) { synchronized (this) { server = buildServer(); } diff --git a/protocols/primary-backup/src/test/java/io/atomix/cluster/TestClusterService.java b/protocols/primary-backup/src/test/java/io/atomix/cluster/TestClusterService.java index a3bab719d1..1f38468261 100644 --- a/protocols/primary-backup/src/test/java/io/atomix/cluster/TestClusterService.java +++ b/protocols/primary-backup/src/test/java/io/atomix/cluster/TestClusterService.java @@ -36,7 +36,7 @@ public TestClusterService(NodeId localNode, Collection nodes) { @Override public Node getLocalNode() { return Node.builder(localNode) - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(Endpoint.from("localhost", localNode.hashCode())) .build(); } @@ -45,7 +45,7 @@ public Node getLocalNode() { public Set getNodes() { return nodes.stream() .map(node -> Node.builder(node) - .withType(Node.Type.DATA) + .withType(Node.Type.CORE) .withEndpoint(Endpoint.from("localhost", node.hashCode())) .build()) .collect(Collectors.toSet()); diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/partition/RaftPartitionGroup.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/partition/RaftPartitionGroup.java index 951c6ac3f1..bcaf7415b1 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/partition/RaftPartitionGroup.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/partition/RaftPartitionGroup.java @@ -131,7 +131,7 @@ public CompletableFuture open(PartitionManagementService } private synchronized void handleClusterEvent(ClusterEvent event) { - if (event.type() == ClusterEvent.Type.NODE_ADDED && event.subject().type() == Node.Type.DATA) { + if (event.type() == ClusterEvent.Type.NODE_ADDED && event.subject().type() == Node.Type.CORE) { metadataChangeFuture = metadataChangeFuture.thenCompose(v -> { Collection partitions = buildPartitions(managementService.getClusterService()); if (!this.metadata.equals(partitions)) { @@ -153,7 +153,7 @@ private Collection buildPartitions(ClusterService clusterServ } List sorted = new ArrayList<>(clusterService.getNodes().stream() - .filter(node -> node.type() == Node.Type.DATA) + .filter(node -> node.type() == Node.Type.CORE) .map(Node::id) .collect(Collectors.toSet())); Collections.sort(sorted); diff --git a/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java b/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java index 9b52874b95..ffc708c6ae 100644 --- a/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java +++ b/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java @@ -441,7 +441,7 @@ private List createServers(int nodes) throws Exception { List servers = new ArrayList<>(); for (int i = 0; i < nodes; i++) { - members.add(nextNode(Node.Type.DATA)); + members.add(nextNode(Node.Type.CORE)); } CountDownLatch latch = new CountDownLatch(nodes);