Skip to content

Commit

Permalink
Update various APIs and configurations to use new membership APIs.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 26, 2018
1 parent e516aec commit d619f1b
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 103 deletions.
86 changes: 43 additions & 43 deletions agent/src/main/java/io/atomix/agent/AtomixAgent.java
Expand Up @@ -48,9 +48,9 @@ public class AtomixAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(AtomixAgent.class); private static final Logger LOGGER = LoggerFactory.getLogger(AtomixAgent.class);


public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Function<Member.Type, ArgumentType<MemberConfig>> nodeArgumentType = (type) -> (ArgumentParser argumentParser, Argument argument, String value) -> { Function<Member.Type, ArgumentType<MemberConfig>> memberArgumentType = (type) -> (ArgumentParser argumentParser, Argument argument, String value) -> {
return new MemberConfig() return new MemberConfig()
.setId(parseNodeId(value)) .setId(parseMemberId(value))
.setType(type) .setType(type)
.setAddress(parseAddress(value)); .setAddress(parseAddress(value));
}; };
Expand All @@ -62,38 +62,38 @@ public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers.newArgumentParser("AtomixServer") ArgumentParser parser = ArgumentParsers.newArgumentParser("AtomixServer")
.defaultHelp(true) .defaultHelp(true)
.description("Atomix server"); .description("Atomix server");
parser.addArgument("node") parser.addArgument("member")
.type(nodeArgumentType.apply(Member.Type.PERSISTENT)) .type(memberArgumentType.apply(Member.Type.PERSISTENT))
.nargs("?") .nargs("?")
.metavar("NAME@HOST:PORT") .metavar("NAME@HOST:PORT")
.required(false) .required(false)
.help("The node info for the local node. This should be in the format [NAME@]HOST[:PORT]. " + .help("The member info for the local member. This should be in the format [NAME@]HOST[:PORT]. " +
"If no name is provided, the node name will default to the host. " + "If no name is provided, the member name will default to the host. " +
"If no port is provided, the port will default to 5679."); "If no port is provided, the port will default to 5679.");
parser.addArgument("--type", "-t") parser.addArgument("--type", "-t")
.type(typeArgumentType) .type(typeArgumentType)
.metavar("TYPE") .metavar("TYPE")
.choices(Member.Type.PERSISTENT, Member.Type.EPHEMERAL) .choices(Member.Type.PERSISTENT, Member.Type.EPHEMERAL)
.setDefault(Member.Type.PERSISTENT) .setDefault(Member.Type.PERSISTENT)
.help("Indicates the local node type."); .help("Indicates the local member type.");
parser.addArgument("--config", "-c") parser.addArgument("--config", "-c")
.metavar("FILE|JSON|YAML") .metavar("FILE|JSON|YAML")
.required(false) .required(false)
.help("The Atomix configuration. Can be specified as a file path or JSON/YAML string."); .help("The Atomix configuration. Can be specified as a file path or JSON/YAML string.");
parser.addArgument("--core-nodes", "-n") parser.addArgument("--persistent-members", "-n")
.nargs("*") .nargs("*")
.type(nodeArgumentType.apply(Member.Type.PERSISTENT)) .type(memberArgumentType.apply(Member.Type.PERSISTENT))
.metavar("NAME@HOST:PORT") .metavar("NAME@HOST:PORT")
.required(false) .required(false)
.help("The set of core nodes, if any. When bootstrapping a new cluster, if the local node is a core node " + .help("The set of core members, if any. When bootstrapping a new cluster, if the local member is a core member " +
"then it should be present in the core configuration as well."); "then it should be present in the core configuration as well.");
parser.addArgument("--bootstrap-nodes", "-b") parser.addArgument("--ephemeral-members", "-b")
.nargs("*") .nargs("*")
.type(nodeArgumentType.apply(Member.Type.EPHEMERAL)) .type(memberArgumentType.apply(Member.Type.EPHEMERAL))
.metavar("NAME@HOST:PORT") .metavar("NAME@HOST:PORT")
.required(false) .required(false)
.help("The set of bootstrap nodes. If core nodes are provided, the cluster will be bootstrapped from the " + .help("The set of bootstrap members. If core members are provided, the cluster will be bootstrapped from the " +
"core nodes. For clusters without core nodes, at least one bootstrap node must be provided unless " + "core members. For clusters without core members, at least one bootstrap member must be provided unless " +
"using multicast discovery or bootstrapping a new cluster."); "using multicast discovery or bootstrapping a new cluster.");
parser.addArgument("--multicast", "-m") parser.addArgument("--multicast", "-m")
.action(new StoreTrueArgumentAction()) .action(new StoreTrueArgumentAction())
Expand All @@ -119,30 +119,30 @@ public static void main(String[] args) throws Exception {
} }


final String configString = namespace.get("config"); final String configString = namespace.get("config");
final MemberConfig localNode = namespace.get("node"); final MemberConfig localMember = namespace.get("member");
final Member.Type localNodeType = namespace.get("type"); final Member.Type localMemberType = namespace.get("type");
final List<MemberConfig> coreNodes = namespace.getList("core_nodes"); final List<MemberConfig> persistentMembers = namespace.getList("persistent_members");
final List<MemberConfig> bootstrapNodes = namespace.getList("bootstrap_nodes"); final List<MemberConfig> ephemeralMembers = namespace.getList("ephemeral_members");
final boolean multicastEnabled = namespace.getBoolean("multicast"); final boolean multicastEnabled = namespace.getBoolean("multicast");
final Address multicastAddress = namespace.get("multicast_address"); final Address multicastAddress = namespace.get("multicast_address");
final Integer httpPort = namespace.getInt("http_port"); final Integer httpPort = namespace.getInt("http_port");


// If a configuration was provided, merge the configuration's node information with the provided command line arguments. // If a configuration was provided, merge the configuration's member information with the provided command line arguments.
final Atomix.Builder builder; final Atomix.Builder builder;
if (configString != null) { if (configString != null) {
AtomixConfig config = loadConfig(configString); AtomixConfig config = loadConfig(configString);
if (localNode != null) { if (localMember != null) {
config.getClusterConfig().getNodes().stream() config.getClusterConfig().getMembers().stream()
.filter(node -> node.getId().equals(localNode.getId())) .filter(member -> member.getId().equals(localMember.getId()))
.findAny() .findAny()
.ifPresent(localMemberConfig -> { .ifPresent(localMemberConfig -> {
if (localNodeType == null) { if (localMemberType == null) {
localNode.setType(localMemberConfig.getType()); localMember.setType(localMemberConfig.getType());
} }
localNode.setAddress(localMemberConfig.getAddress()); localMember.setAddress(localMemberConfig.getAddress());
localNode.setZone(localMemberConfig.getZone()); localMember.setZone(localMemberConfig.getZone());
localNode.setRack(localMemberConfig.getRack()); localMember.setRack(localMemberConfig.getRack());
localNode.setHost(localMemberConfig.getHost()); localMember.setHost(localMemberConfig.getHost());
}); });
} }
builder = Atomix.builder(config); builder = Atomix.builder(config);
Expand All @@ -152,25 +152,25 @@ public static void main(String[] args) throws Exception {


builder.withShutdownHook(true); builder.withShutdownHook(true);


// If a local node was provided, add the local node to the builder. // If a local member was provided, add the local member to the builder.
if (localNode != null) { if (localMember != null) {
localNode.setType(localNodeType); localMember.setType(localMemberType);
Member member = new Member(localNode); Member member = new Member(localMember);
builder.withLocalNode(member); builder.withLocalMember(member);
LOGGER.info("node: {}", member); LOGGER.info("member: {}", member);
} }


// If a cluster configuration was provided, add all the cluster nodes to the builder. // If a cluster configuration was provided, add all the cluster members to the builder.
if (coreNodes != null || bootstrapNodes != null) { if (persistentMembers != null || ephemeralMembers != null) {
List<Member> members = Stream.concat( List<Member> members = Stream.concat(
coreNodes != null ? coreNodes.stream() : Stream.empty(), persistentMembers != null ? persistentMembers.stream() : Stream.empty(),
bootstrapNodes != null ? bootstrapNodes.stream() : Stream.empty()) ephemeralMembers != null ? ephemeralMembers.stream() : Stream.empty())
.map(node -> Member.builder(node.getId()) .map(member -> Member.builder(member.getId())
.withType(node.getType()) .withType(member.getType())
.withAddress(node.getAddress()) .withAddress(member.getAddress())
.build()) .build())
.collect(Collectors.toList()); .collect(Collectors.toList());
builder.withNodes(members); builder.withMembers(members);
} }


// Enable multicast if provided. // Enable multicast if provided.
Expand Down Expand Up @@ -215,7 +215,7 @@ private static AtomixConfig loadConfig(String config) {
} }
} }


static MemberId parseNodeId(String address) { static MemberId parseMemberId(String address) {
int endIndex = address.indexOf('@'); int endIndex = address.indexOf('@');
if (endIndex > 0) { if (endIndex > 0) {
return MemberId.from(address.substring(0, endIndex)); return MemberId.from(address.substring(0, endIndex));
Expand Down
18 changes: 9 additions & 9 deletions agent/src/test/java/io/atomix/agent/AtomixAgentTest.java
Expand Up @@ -48,11 +48,11 @@ public class AtomixAgentTest {


@Test @Test
public void testParseNodeId() throws Exception { public void testParseNodeId() throws Exception {
assertEquals(MemberId.from("127.0.0.1"), AtomixAgent.parseNodeId("127.0.0.1")); assertEquals(MemberId.from("127.0.0.1"), AtomixAgent.parseMemberId("127.0.0.1"));
assertEquals(MemberId.from("foo"), AtomixAgent.parseNodeId("foo")); assertEquals(MemberId.from("foo"), AtomixAgent.parseMemberId("foo"));
assertEquals(MemberId.from("127.0.0.1"), AtomixAgent.parseNodeId("127.0.0.1:1234")); assertEquals(MemberId.from("127.0.0.1"), AtomixAgent.parseMemberId("127.0.0.1:1234"));
assertEquals(MemberId.from("foo"), AtomixAgent.parseNodeId("foo@127.0.0.1:1234")); assertEquals(MemberId.from("foo"), AtomixAgent.parseMemberId("foo@127.0.0.1:1234"));
assertEquals(MemberId.from("foo"), AtomixAgent.parseNodeId("foo@127.0.0.1")); assertEquals(MemberId.from("foo"), AtomixAgent.parseMemberId("foo@127.0.0.1"));
} }


@Test @Test
Expand Down Expand Up @@ -113,15 +113,15 @@ private void testFormCluster(String path) throws Exception {
Thread.sleep(5000); Thread.sleep(5000);


Atomix client1 = Atomix.builder(path) Atomix client1 = Atomix.builder(path)
.withLocalNode(Member.builder("client1") .withLocalMember(Member.builder("client1")
.withType(Member.Type.EPHEMERAL) .withType(Member.Type.EPHEMERAL)
.withAddress("localhost:5003") .withAddress("localhost:5003")
.build()) .build())
.build(); .build();
client1.start().join(); client1.start().join();


Atomix client2 = Atomix.builder(path) Atomix client2 = Atomix.builder(path)
.withLocalNode(Member.builder("client2") .withLocalMember(Member.builder("client2")
.withType(Member.Type.EPHEMERAL) .withType(Member.Type.EPHEMERAL)
.withAddress("localhost:5004") .withAddress("localhost:5004")
.build()) .build())
Expand Down Expand Up @@ -193,15 +193,15 @@ public void testFormDataCluster() throws Exception {
Thread.sleep(10000); Thread.sleep(10000);


Atomix client1 = Atomix.builder(Joiner.on('\n').join(config)) Atomix client1 = Atomix.builder(Joiner.on('\n').join(config))
.withLocalNode(Member.builder("client1") .withLocalMember(Member.builder("client1")
.withType(Member.Type.EPHEMERAL) .withType(Member.Type.EPHEMERAL)
.withAddress("localhost:5003") .withAddress("localhost:5003")
.build()) .build())
.build(); .build();
client1.start().join(); client1.start().join();


Atomix client2 = Atomix.builder(Joiner.on('\n').join(config)) Atomix client2 = Atomix.builder(Joiner.on('\n').join(config))
.withLocalNode(Member.builder("client2") .withLocalMember(Member.builder("client2")
.withType(Member.Type.EPHEMERAL) .withType(Member.Type.EPHEMERAL)
.withAddress("localhost:5004") .withAddress("localhost:5004")
.build()) .build())
Expand Down
10 changes: 9 additions & 1 deletion agent/src/test/resources/atomix.yaml
@@ -1,7 +1,7 @@
# Cluster configuration # Cluster configuration
cluster: cluster:
name: test name: test
nodes: members:
- id: node1 - id: node1
type: persistent type: persistent
address: localhost:5000 address: localhost:5000
Expand All @@ -15,6 +15,10 @@ system-partition-group:
name: system name: system
type: raft type: raft
data-directory: target/test-logs/system data-directory: target/test-logs/system
members:
- node1
- node2
- node3
# A list of partition groups # A list of partition groups
partition-groups: partition-groups:
- type: raft - type: raft
Expand All @@ -23,6 +27,10 @@ partition-groups:
partitions: 7 partitions: 7
partition-size: 3 partition-size: 3
data-directory: target/test-logs/core data-directory: target/test-logs/core
members:
- node1
- node2
- node3
- type: multi-primary - type: multi-primary
name: data name: data
partitions: 7 partitions: 7
Expand Down
32 changes: 16 additions & 16 deletions cluster/src/main/java/io/atomix/cluster/AtomixCluster.java
Expand Up @@ -174,7 +174,7 @@ protected CompletableFuture<Void> startServices() {
} }


protected CompletableFuture<Void> joinCluster() { protected CompletableFuture<Void> joinCluster() {
persistentMetadataService.addNode(membershipService().getLocalMember()); persistentMetadataService.addMember(membershipService().getLocalMember());
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }


Expand Down Expand Up @@ -202,7 +202,7 @@ public synchronized CompletableFuture<Void> stop() {
} }


protected CompletableFuture<Void> leaveCluster() { protected CompletableFuture<Void> leaveCluster() {
persistentMetadataService.removeNode(membershipService().getLocalMember()); persistentMetadataService.removeMember(membershipService().getLocalMember());
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }


Expand Down Expand Up @@ -249,7 +249,7 @@ private static ClusterConfig loadConfig(File config) {
protected static ManagedMessagingService buildMessagingService(ClusterConfig config) { protected static ManagedMessagingService buildMessagingService(ClusterConfig config) {
return NettyMessagingService.builder() return NettyMessagingService.builder()
.withName(config.getName()) .withName(config.getName())
.withAddress(config.getLocalNode().getAddress()) .withAddress(config.getLocalMember().getAddress())
.build(); .build();
} }


Expand All @@ -258,7 +258,7 @@ protected static ManagedMessagingService buildMessagingService(ClusterConfig con
*/ */
protected static ManagedBroadcastService buildBroadcastService(ClusterConfig config) { protected static ManagedBroadcastService buildBroadcastService(ClusterConfig config) {
return NettyBroadcastService.builder() return NettyBroadcastService.builder()
.withLocalAddress(config.getLocalNode().getAddress()) .withLocalAddress(config.getLocalMember().getAddress())
.withGroupAddress(config.getMulticastAddress()) .withGroupAddress(config.getMulticastAddress())
.withEnabled(config.isMulticastEnabled()) .withEnabled(config.isMulticastEnabled())
.build(); .build();
Expand All @@ -268,9 +268,9 @@ protected static ManagedBroadcastService buildBroadcastService(ClusterConfig con
* Builds a bootstrap metadata service. * Builds a bootstrap metadata service.
*/ */
protected static ManagedBootstrapMetadataService buildBootstrapMetadataService(ClusterConfig config) { protected static ManagedBootstrapMetadataService buildBootstrapMetadataService(ClusterConfig config) {
boolean hasCoreNodes = config.getNodes().stream().anyMatch(node -> node.getType() == Member.Type.PERSISTENT); boolean hasCoreNodes = config.getMembers().stream().anyMatch(node -> node.getType() == Member.Type.PERSISTENT);
ClusterMetadata metadata = ClusterMetadata.builder() ClusterMetadata metadata = ClusterMetadata.builder()
.withNodes(config.getNodes() .withNodes(config.getMembers()
.stream() .stream()
.filter(node -> (!hasCoreNodes && node.getType() == Member.Type.EPHEMERAL) || (hasCoreNodes && node.getType() == Member.Type.PERSISTENT)) .filter(node -> (!hasCoreNodes && node.getType() == Member.Type.EPHEMERAL) || (hasCoreNodes && node.getType() == Member.Type.PERSISTENT))
.map(Member::new) .map(Member::new)
Expand All @@ -284,7 +284,7 @@ protected static ManagedBootstrapMetadataService buildBootstrapMetadataService(C
*/ */
protected static ManagedPersistentMetadataService buildPersistentMetadataService(ClusterConfig config, MessagingService messagingService) { protected static ManagedPersistentMetadataService buildPersistentMetadataService(ClusterConfig config, MessagingService messagingService) {
ClusterMetadata metadata = ClusterMetadata.builder() ClusterMetadata metadata = ClusterMetadata.builder()
.withNodes(config.getNodes() .withNodes(config.getMembers()
.stream() .stream()
.filter(node -> node.getType() == Member.Type.PERSISTENT) .filter(node -> node.getType() == Member.Type.PERSISTENT)
.map(Member::new) .map(Member::new)
Expand All @@ -304,14 +304,14 @@ protected static ManagedClusterMembershipService buildClusterMembershipService(
BroadcastService broadcastService) { BroadcastService broadcastService) {
// If the local node has not be configured, create a default node. // If the local node has not be configured, create a default node.
Member localMember; Member localMember;
if (config.getLocalNode() == null) { if (config.getLocalMember() == null) {
Address address = Address.empty(); Address address = Address.empty();
localMember = Member.builder(address.toString()) localMember = Member.builder(address.toString())
.withType(Member.Type.PERSISTENT) .withType(Member.Type.PERSISTENT)
.withAddress(address) .withAddress(address)
.build(); .build();
} else { } else {
localMember = new Member(config.getLocalNode()); localMember = new Member(config.getLocalMember());
} }
return new DefaultClusterMembershipService(localMember, bootstrapMetadataService, persistentMetadataService, messagingService, broadcastService, config.getMembership()); return new DefaultClusterMembershipService(localMember, bootstrapMetadataService, persistentMetadataService, messagingService, broadcastService, config.getMembership());
} }
Expand Down Expand Up @@ -363,19 +363,19 @@ public Builder withClusterName(String clusterName) {
* @param localMember the local node metadata * @param localMember the local node metadata
* @return the cluster metadata builder * @return the cluster metadata builder
*/ */
public Builder withLocalNode(Member localMember) { public Builder withLocalMember(Member localMember) {
config.setLocalNode(localMember.config()); config.setLocalMember(localMember.config());
return this; return this;
} }


/** /**
* Sets the core nodes. * Sets the core nodes.
* *
* @param coreMembers the core nodes * @param members the core nodes
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder withNodes(Member... coreMembers) { public Builder withMembers(Member... members) {
return withNodes(Arrays.asList(checkNotNull(coreMembers))); return withMembers(Arrays.asList(checkNotNull(members)));
} }


/** /**
Expand All @@ -384,8 +384,8 @@ public Builder withNodes(Member... coreMembers) {
* @param members the core nodes * @param members the core nodes
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder withNodes(Collection<Member> members) { public Builder withMembers(Collection<Member> members) {
config.setNodes(members.stream().map(n -> n.config()).collect(Collectors.toList())); config.setMembers(members.stream().map(n -> n.config()).collect(Collectors.toList()));
return this; return this;
} }


Expand Down

0 comments on commit d619f1b

Please sign in to comment.