Skip to content

Commit

Permalink
Make group membership protocol configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 26, 2018
1 parent cbc6321 commit b61be2d
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cluster/src/main/java/io/atomix/cluster/AtomixCluster.java
Expand Up @@ -313,7 +313,7 @@ protected static ManagedClusterMembershipService buildClusterMembershipService(
} else {
localMember = new Member(config.getLocalNode());
}
return new DefaultClusterMembershipService(localMember, bootstrapMetadataService, persistentMetadataService, messagingService, broadcastService);
return new DefaultClusterMembershipService(localMember, bootstrapMetadataService, persistentMetadataService, messagingService, broadcastService, config.getMembership());
}

/**
Expand Down
21 changes: 21 additions & 0 deletions cluster/src/main/java/io/atomix/cluster/ClusterConfig.java
Expand Up @@ -35,6 +35,7 @@ public class ClusterConfig implements Config {
private Collection<MemberConfig> nodes = new ArrayList<>();
private boolean multicastEnabled = false;
private Address multicastAddress;
private GroupMembershipConfig membership = new GroupMembershipConfig();

public ClusterConfig() {
try {
Expand Down Expand Up @@ -143,4 +144,24 @@ public ClusterConfig setMulticastAddress(Address multicastAddress) {
this.multicastAddress = multicastAddress;
return this;
}

/**
* Returns the group membership configuration.
*
* @return the group membership configuration
*/
public GroupMembershipConfig getMembership() {
return membership;
}

/**
* Sets the group membership configuration.
*
* @param membership the group membership configuration
* @return the cluster configuration
*/
public ClusterConfig setMembership(GroupMembershipConfig membership) {
this.membership = membership;
return this;
}
}
91 changes: 91 additions & 0 deletions cluster/src/main/java/io/atomix/cluster/GroupMembershipConfig.java
@@ -0,0 +1,91 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.cluster;

import io.atomix.utils.config.Config;

/**
* Group membership protocol configuration.
*/
public class GroupMembershipConfig implements Config {
private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
private static final int DEFAULT_FAILURE_TIMEOUT = 10000;
private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;

private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
private int failureTimeout = DEFAULT_FAILURE_TIMEOUT;

/**
* Returns the failure detector heartbeat interval.
*
* @return the failure detector heartbeat interval
*/
public int getHeartbeatInterval() {
return heartbeatInterval;
}

/**
* Sets the failure detector heartbeat interval.
*
* @param heartbeatInterval the failure detector heartbeat interval
* @return the group membership configuration
*/
public GroupMembershipConfig setHeartbeatInterval(int heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
return this;
}

/**
* Returns the failure detector threshold.
*
* @return the failure detector threshold
*/
public int getPhiFailureThreshold() {
return phiFailureThreshold;
}

/**
* Sets the failure detector threshold.
*
* @param phiFailureThreshold the failure detector threshold
* @return the group membership configuration
*/
public GroupMembershipConfig setPhiFailureThreshold(int phiFailureThreshold) {
this.phiFailureThreshold = phiFailureThreshold;
return this;
}

/**
* Returns the base failure timeout.
*
* @return the base failure timeout
*/
public int getFailureTimeout() {
return failureTimeout;
}

/**
* Sets the base failure timeout.
*
* @param failureTimeout the base failure timeout
* @return the group membership configuration
*/
public GroupMembershipConfig setFailureTimeout(int failureTimeout) {
this.failureTimeout = failureTimeout;
return this;
}
}
Expand Up @@ -21,9 +21,10 @@
import io.atomix.cluster.BootstrapMetadataService;
import io.atomix.cluster.ClusterEvent;
import io.atomix.cluster.ClusterEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.GroupMembershipConfig;
import io.atomix.cluster.ManagedClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.Member.State;
Expand Down Expand Up @@ -67,15 +68,8 @@ public class DefaultClusterMembershipService

private static final Logger LOGGER = getLogger(DefaultClusterMembershipService.class);

private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
private static final long DEFAULT_FAILURE_TIME = 10000;
private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";

private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;

private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;

private static final Serializer SERIALIZER = Serializer.using(
KryoNamespace.builder()
.register(KryoNamespaces.BASIC)
Expand All @@ -92,6 +86,11 @@ public class DefaultClusterMembershipService
private final BroadcastService broadcastService;
private final BootstrapMetadataService bootstrapMetadataService;
private final PersistentMetadataService persistentMetadataService;

private final int heartbeatInterval;
private final int phiFailureThreshold;
private final int failureTimeout;

private final AtomicBoolean started = new AtomicBoolean();
private final StatefulMember localNode;
private final Map<MemberId, StatefulMember> nodes = Maps.newConcurrentMap();
Expand All @@ -110,7 +109,8 @@ public DefaultClusterMembershipService(
BootstrapMetadataService bootstrapMetadataService,
PersistentMetadataService persistentMetadataService,
MessagingService messagingService,
BroadcastService broadcastService) {
BroadcastService broadcastService,
GroupMembershipConfig config) {
this.bootstrapMetadataService = checkNotNull(bootstrapMetadataService, "bootstrapMetadataService cannot be null");
this.persistentMetadataService = checkNotNull(persistentMetadataService, "coreMetadataService cannot be null");
this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
Expand All @@ -123,6 +123,9 @@ public DefaultClusterMembershipService(
localMember.rack(),
localMember.host(),
localMember.tags());
this.heartbeatInterval = config.getHeartbeatInterval();
this.phiFailureThreshold = config.getPhiFailureThreshold();
this.failureTimeout = config.getFailureTimeout();
}

@Override
Expand Down Expand Up @@ -196,7 +199,7 @@ private CompletableFuture<Void> sendHeartbeats() {
CompletableFuture<Void> future = sendHeartbeat(node.address(), payload);
PhiAccrualFailureDetector failureDetector = failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector());
double phi = failureDetector.phi();
if (phi >= phiFailureThreshold || (phi == 0.0 && failureDetector.lastUpdated() > 0 && System.currentTimeMillis() - failureDetector.lastUpdated() > DEFAULT_FAILURE_TIME)) {
if (phi >= phiFailureThreshold || (phi == 0.0 && failureDetector.lastUpdated() > 0 && System.currentTimeMillis() - failureDetector.lastUpdated() > failureTimeout)) {
if (node.getState() == State.ACTIVE) {
deactivateNode(node);
}
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.GroupMembershipConfig;
import io.atomix.cluster.ManagedClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.Member.State;
Expand Down Expand Up @@ -69,23 +70,26 @@ public void testClusterService() throws Exception {
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(localMember1.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join());
broadcastServiceFactory.newBroadcastService().start().join(),
new GroupMembershipConfig());

Member localMember2 = buildNode(2, Member.Type.PERSISTENT);
ManagedClusterMembershipService clusterService2 = new DefaultClusterMembershipService(
localMember2,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(localMember2.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join());
broadcastServiceFactory.newBroadcastService().start().join(),
new GroupMembershipConfig());

Member localMember3 = buildNode(3, Member.Type.PERSISTENT);
ManagedClusterMembershipService clusterService3 = new DefaultClusterMembershipService(
localMember3,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(localMember3.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join());
broadcastServiceFactory.newBroadcastService().start().join(),
new GroupMembershipConfig());

assertNull(clusterService1.getMember(MemberId.from("1")));
assertNull(clusterService1.getMember(MemberId.from("2")));
Expand Down Expand Up @@ -121,7 +125,8 @@ public void testClusterService() throws Exception {
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(ephemeralMember.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join());
broadcastServiceFactory.newBroadcastService().start().join(),
new GroupMembershipConfig());

assertEquals(State.INACTIVE, ephemeralClusterService.getLocalMember().getState());

Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.GroupMembershipConfig;
import io.atomix.cluster.Member;
import io.atomix.cluster.impl.DefaultBootstrapMetadataService;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
Expand Down Expand Up @@ -75,7 +76,8 @@ public void testClusterEventService() throws Exception {
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestPersistentMetadataService(clusterMetadata),
messagingService1,
broadcastServiceFactory.newBroadcastService().start().join())
broadcastServiceFactory.newBroadcastService().start().join(),
new GroupMembershipConfig())
.start()
.join();
ClusterEventingService eventService1 = new DefaultClusterEventingService(clusterMembershipService1, messagingService1).start().join();
Expand All @@ -87,7 +89,8 @@ public void testClusterEventService() throws Exception {
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestPersistentMetadataService(clusterMetadata),
messagingService2,
broadcastServiceFactory.newBroadcastService().start().join())
broadcastServiceFactory.newBroadcastService().start().join(),
new GroupMembershipConfig())
.start()
.join();
ClusterEventingService eventService2 = new DefaultClusterEventingService(clusterMembershipService2, messagingService2).start().join();
Expand All @@ -99,7 +102,8 @@ public void testClusterEventService() throws Exception {
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestPersistentMetadataService(clusterMetadata),
messagingService3,
broadcastServiceFactory.newBroadcastService().start().join())
broadcastServiceFactory.newBroadcastService().start().join(),
new GroupMembershipConfig())
.start()
.join();
ClusterEventingService eventService3 = new DefaultClusterEventingService(clusterMembershipService3, messagingService3).start().join();
Expand Down

0 comments on commit b61be2d

Please sign in to comment.