Skip to content

Commit

Permalink
Add nested cluster multicast/membership configurations.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 4, 2018
1 parent 8e0185a commit cfe3c9c
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 132 deletions.
24 changes: 17 additions & 7 deletions agent/src/main/java/io/atomix/agent/AtomixAgent.java
Expand Up @@ -95,10 +95,14 @@ public static void main(String[] args) throws Exception {
.action(new StoreTrueArgumentAction()) .action(new StoreTrueArgumentAction())
.setDefault(false) .setDefault(false)
.help("Enables multicast discovery. Note that the network must support multicast for this feature to work."); .help("Enables multicast discovery. Note that the network must support multicast for this feature to work.");
parser.addArgument("--multicast-address") parser.addArgument("--multicast-group")
.type(addressArgumentType) .type(String.class)
.metavar("IP:PORT") .metavar("IP")
.help("Sets the multicast discovery address. Defaults to 230.0.0.1:54321"); .help("Sets the multicast group. Defaults to 230.0.0.1");
parser.addArgument("--multicast-port")
.type(Integer.class)
.metavar("PORT")
.help("Sets the multicast port. Defaults to 54321");
parser.addArgument("--http-port", "-p") parser.addArgument("--http-port", "-p")
.type(Integer.class) .type(Integer.class)
.metavar("PORT") .metavar("PORT")
Expand All @@ -122,7 +126,8 @@ public static void main(String[] args) throws Exception {
final String zone = namespace.getString("zone"); final String zone = namespace.getString("zone");
final List<Node> bootstrap = namespace.getList("bootstrap"); final List<Node> bootstrap = namespace.getList("bootstrap");
final boolean multicastEnabled = namespace.getBoolean("multicast"); final boolean multicastEnabled = namespace.getBoolean("multicast");
final Address multicastAddress = namespace.get("multicast_address"); final String multicastGroup = namespace.get("multicast_group");
final Integer multicastPort = namespace.get("multicast_port");
final Integer httpPort = namespace.getInt("http_port"); final Integer httpPort = namespace.getInt("http_port");


// If a configuration was provided, merge the configuration's member information with the provided command line arguments. // If a configuration was provided, merge the configuration's member information with the provided command line arguments.
Expand Down Expand Up @@ -156,8 +161,13 @@ public static void main(String[] args) throws Exception {
} }


if (multicastEnabled) { if (multicastEnabled) {
config.getClusterConfig().setMulticastEnabled(true); config.getClusterConfig().getMulticastConfig().setEnabled(true);
config.getClusterConfig().setMulticastAddress(multicastAddress); if (multicastGroup != null) {
config.getClusterConfig().getMulticastConfig().setGroup(multicastGroup);
}
if (multicastPort != null) {
config.getClusterConfig().getMulticastConfig().setPort(multicastPort);
}
if (bootstrap == null || bootstrap.isEmpty()) { if (bootstrap == null || bootstrap.isEmpty()) {
config.getClusterConfig().setDiscoveryConfig(new MulticastDiscoveryProvider.Config()); config.getClusterConfig().setDiscoveryConfig(new MulticastDiscoveryProvider.Config());
} }
Expand Down
2 changes: 1 addition & 1 deletion agent/src/test/resources/test.conf
@@ -1,6 +1,6 @@
cluster { cluster {
name: test name: test
multicast-enabled: true multicast.enabled: true
} }


managementGroup { managementGroup {
Expand Down
28 changes: 16 additions & 12 deletions cluster/src/main/java/io/atomix/cluster/AtomixCluster.java
Expand Up @@ -321,8 +321,11 @@ protected static ManagedMessagingService buildMessagingService(ClusterConfig con
protected static ManagedBroadcastService buildBroadcastService(ClusterConfig config) { protected static ManagedBroadcastService buildBroadcastService(ClusterConfig config) {
return NettyBroadcastService.builder() return NettyBroadcastService.builder()
.withLocalAddress(config.getAddress()) .withLocalAddress(config.getAddress())
.withGroupAddress(config.getMulticastAddress()) .withGroupAddress(new Address(
.withEnabled(config.isMulticastEnabled()) config.getMulticastConfig().getGroup().getHostName(),
config.getMulticastConfig().getPort(),
config.getMulticastConfig().getGroup()))
.withEnabled(config.getMulticastConfig().isEnabled())
.build(); .build();
} }


Expand All @@ -331,11 +334,11 @@ protected static ManagedBroadcastService buildBroadcastService(ClusterConfig con
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected static NodeDiscoveryProvider buildLocationProvider(ClusterConfig config) { protected static NodeDiscoveryProvider buildLocationProvider(ClusterConfig config) {
NodeDiscoveryProvider.Config locationProviderConfig = config.getDiscoveryConfig(); NodeDiscoveryProvider.Config discoveryProviderConfig = config.getDiscoveryConfig();
if (locationProviderConfig != null) { if (discoveryProviderConfig != null) {
return locationProviderConfig.getType().newProvider(locationProviderConfig); return discoveryProviderConfig.getType().newProvider(discoveryProviderConfig);
} }
if (config.isMulticastEnabled()) { if (config.getMulticastConfig().isEnabled()) {
return new MulticastDiscoveryProvider(new MulticastDiscoveryProvider.Config()); return new MulticastDiscoveryProvider(new MulticastDiscoveryProvider.Config());
} else { } else {
return new BootstrapDiscoveryProvider(Collections.emptyList()); return new BootstrapDiscoveryProvider(Collections.emptyList());
Expand All @@ -362,7 +365,7 @@ protected static ManagedClusterMembershipService buildClusterMembershipService(
localMember, localMember,
new DefaultNodeDiscoveryService(bootstrapService, localMember, discoveryProvider), new DefaultNodeDiscoveryService(bootstrapService, localMember, discoveryProvider),
bootstrapService, bootstrapService,
config); config.getMembershipConfig());
} }


/** /**
Expand Down Expand Up @@ -547,7 +550,7 @@ public Builder withMulticastEnabled() {
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder withMulticastEnabled(boolean multicastEnabled) { public Builder withMulticastEnabled(boolean multicastEnabled) {
config.setMulticastEnabled(multicastEnabled); config.getMulticastConfig().setEnabled(multicastEnabled);
return this; return this;
} }


Expand All @@ -558,7 +561,8 @@ public Builder withMulticastEnabled(boolean multicastEnabled) {
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder withMulticastAddress(Address address) { public Builder withMulticastAddress(Address address) {
config.setMulticastAddress(address); config.getMulticastConfig().setGroup(address.address());
config.getMulticastConfig().setPort(address.port());
return this; return this;
} }


Expand All @@ -569,7 +573,7 @@ public Builder withMulticastAddress(Address address) {
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder setBroadcastInterval(Duration interval) { public Builder setBroadcastInterval(Duration interval) {
config.setBroadcastInterval((int) interval.toMillis()); config.getMembershipConfig().setBroadcastInterval((int) interval.toMillis());
return this; return this;
} }


Expand All @@ -580,7 +584,7 @@ public Builder setBroadcastInterval(Duration interval) {
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder setReachabilityThreshold(int threshold) { public Builder setReachabilityThreshold(int threshold) {
config.setReachabilityThreshold(threshold); config.getMembershipConfig().setReachabilityThreshold(threshold);
return this; return this;
} }


Expand All @@ -591,7 +595,7 @@ public Builder setReachabilityThreshold(int threshold) {
* @return the Atomix builder * @return the Atomix builder
*/ */
public Builder withReachabilityTimeout(Duration timeout) { public Builder withReachabilityTimeout(Duration timeout) {
config.setReachabilityTimeout((int) timeout.toMillis()); config.getMembershipConfig().setReachabilityTimeout((int) timeout.toMillis());
return this; return this;
} }


Expand Down
113 changes: 18 additions & 95 deletions cluster/src/main/java/io/atomix/cluster/ClusterConfig.java
Expand Up @@ -17,7 +17,6 @@


import io.atomix.utils.config.Config; import io.atomix.utils.config.Config;
import io.atomix.utils.net.Address; import io.atomix.utils.net.Address;
import io.atomix.utils.net.MalformedAddressException;


import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
Expand All @@ -29,11 +28,6 @@
*/ */
public class ClusterConfig implements Config { public class ClusterConfig implements Config {
private static final String DEFAULT_CLUSTER_NAME = "atomix"; private static final String DEFAULT_CLUSTER_NAME = "atomix";
private static final String DEFAULT_MULTICAST_IP = "230.0.0.1";
private static final int DEFAULT_MULTICAST_PORT = 54321;
private static final int DEFAULT_BROADCAST_INTERVAL = 100;
private static final int DEFAULT_REACHABILITY_TIMEOUT = 10000;
private static final int DEFAULT_REACHABILITY_THRESHOLD = 10;


private String clusterId = DEFAULT_CLUSTER_NAME; private String clusterId = DEFAULT_CLUSTER_NAME;
private MemberId memberId = MemberId.anonymous(); private MemberId memberId = MemberId.anonymous();
Expand All @@ -43,19 +37,8 @@ public class ClusterConfig implements Config {
private String host; private String host;
private Map<String, String> metadata = new HashMap<>(); private Map<String, String> metadata = new HashMap<>();
private NodeDiscoveryProvider.Config discoveryConfig; private NodeDiscoveryProvider.Config discoveryConfig;
private boolean multicastEnabled = false; private MulticastConfig multicastConfig = new MulticastConfig();
private Address multicastAddress; private MembershipConfig membershipConfig = new MembershipConfig();
private int broadcastInterval = DEFAULT_BROADCAST_INTERVAL;
private int reachabilityThreshold = DEFAULT_REACHABILITY_THRESHOLD;
private int reachabilityTimeout = DEFAULT_REACHABILITY_TIMEOUT;

public ClusterConfig() {
try {
multicastAddress = Address.from(DEFAULT_MULTICAST_IP, DEFAULT_MULTICAST_PORT);
} catch (MalformedAddressException e) {
multicastAddress = Address.from(DEFAULT_MULTICAST_PORT);
}
}


/** /**
* Returns the cluster identifier. * Returns the cluster identifier.
Expand Down Expand Up @@ -250,102 +233,42 @@ public ClusterConfig setDiscoveryConfig(NodeDiscoveryProvider.Config discoveryCo
} }


/** /**
* Returns whether multicast is enabled. * Returns the multicast configuration.
*
* @return whether multicast is enabled
*/
public boolean isMulticastEnabled() {
return multicastEnabled;
}

/**
* Sets whether multicast is enabled.
*
* @param multicastEnabled whether multicast is enabled
* @return the cluster configuration
*/
public ClusterConfig setMulticastEnabled(boolean multicastEnabled) {
this.multicastEnabled = multicastEnabled;
return this;
}

/**
* Returns the multicast address.
*
* @return the multicast address
*/
public Address getMulticastAddress() {
return multicastAddress;
}

/**
* Sets the multicast address.
*
* @param multicastAddress the multicast address
* @return the cluster configuration
*/
public ClusterConfig setMulticastAddress(Address multicastAddress) {
this.multicastAddress = multicastAddress;
return this;
}

/**
* Returns the reachability broadcast interval.
*
* @return the reachability broadcast interval
*/
public int getBroadcastInterval() {
return broadcastInterval;
}

/**
* Sets the reachability broadcast interval.
*
* @param broadcastInterval the reachability broadcast interval
* @return the cluster configuration
*/
public ClusterConfig setBroadcastInterval(int broadcastInterval) {
this.broadcastInterval = broadcastInterval;
return this;
}

/**
* Returns the reachability failure detection threshold.
* *
* @return the reachability failure detection threshold * @return the multicast configuration
*/ */
public int getReachabilityThreshold() { public MulticastConfig getMulticastConfig() {
return reachabilityThreshold; return multicastConfig;
} }


/** /**
* Sets the reachability failure detection threshold. * Sets the multicast configuration.
* *
* @param reachabilityThreshold the reachability failure detection threshold * @param multicastConfig the multicast configuration
* @return the cluster configuration * @return the cluster configuration
*/ */
public ClusterConfig setReachabilityThreshold(int reachabilityThreshold) { public ClusterConfig setMulticastConfig(MulticastConfig multicastConfig) {
this.reachabilityThreshold = reachabilityThreshold; this.multicastConfig = checkNotNull(multicastConfig);
return this; return this;
} }


/** /**
* Returns the reachability failure timeout. * Returns the cluster membership configuration.
* *
* @return the reachability failure timeout * @return the cluster membership configuration
*/ */
public int getReachabilityTimeout() { public MembershipConfig getMembershipConfig() {
return reachabilityTimeout; return membershipConfig;
} }


/** /**
* Sets the reachability failure timeout. * Sets the cluster membership configuration.
* *
* @param reachabilityTimeout the reachability failure timeout * @param membershipConfig the cluster membership configuration
* @return the cluster configuration * @return the cluster configuration
*/ */
public ClusterConfig setReachabilityTimeout(int reachabilityTimeout) { public ClusterConfig setMembershipConfig(MembershipConfig membershipConfig) {
this.reachabilityTimeout = reachabilityTimeout; this.membershipConfig = membershipConfig;
return this; return this;
} }
} }
91 changes: 91 additions & 0 deletions cluster/src/main/java/io/atomix/cluster/MembershipConfig.java
@@ -0,0 +1,91 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.cluster;

import io.atomix.utils.config.Config;

/**
* Cluster membership configuration.
*/
public class MembershipConfig implements Config {
private static final int DEFAULT_BROADCAST_INTERVAL = 100;
private static final int DEFAULT_REACHABILITY_TIMEOUT = 10000;
private static final int DEFAULT_REACHABILITY_THRESHOLD = 10;

private int broadcastInterval = DEFAULT_BROADCAST_INTERVAL;
private int reachabilityThreshold = DEFAULT_REACHABILITY_THRESHOLD;
private int reachabilityTimeout = DEFAULT_REACHABILITY_TIMEOUT;

/**
* Returns the reachability broadcast interval.
*
* @return the reachability broadcast interval
*/
public int getBroadcastInterval() {
return broadcastInterval;
}

/**
* Sets the reachability broadcast interval.
*
* @param broadcastInterval the reachability broadcast interval
* @return the membership configuration
*/
public MembershipConfig setBroadcastInterval(int broadcastInterval) {
this.broadcastInterval = broadcastInterval;
return this;
}

/**
* Returns the reachability failure detection threshold.
*
* @return the reachability failure detection threshold
*/
public int getReachabilityThreshold() {
return reachabilityThreshold;
}

/**
* Sets the reachability failure detection threshold.
*
* @param reachabilityThreshold the reachability failure detection threshold
* @return the membership configuration
*/
public MembershipConfig setReachabilityThreshold(int reachabilityThreshold) {
this.reachabilityThreshold = reachabilityThreshold;
return this;
}

/**
* Returns the reachability failure timeout.
*
* @return the reachability failure timeout
*/
public int getReachabilityTimeout() {
return reachabilityTimeout;
}

/**
* Sets the reachability failure timeout.
*
* @param reachabilityTimeout the reachability failure timeout
* @return the membership configuration
*/
public MembershipConfig setReachabilityTimeout(int reachabilityTimeout) {
this.reachabilityTimeout = reachabilityTimeout;
return this;
}
}

0 comments on commit cfe3c9c

Please sign in to comment.