Skip to content

Commit

Permalink
Ensure clients are replicated to other clients in ClusterService.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 8, 2018
1 parent e621b76 commit 03e470d
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 13 deletions.
Expand Up @@ -345,7 +345,7 @@ public CompletableFuture<Void> stop() {
/**
* Endpoint serializer.
*/
private static class EndpointSerializer extends com.esotericsoftware.kryo.Serializer<Endpoint> {
static class EndpointSerializer extends com.esotericsoftware.kryo.Serializer<Endpoint> {
@Override
public void write(Kryo kryo, Output output, Endpoint endpoint) {
output.writeString(endpoint.host().getHostAddress());
Expand Down
Expand Up @@ -36,6 +36,7 @@
import io.atomix.utils.serializer.Serializer;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -62,6 +63,7 @@ public class DefaultClusterService

private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
private static final long DEFAULT_FAILURE_TIME = 1000;
private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";

private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
Expand All @@ -74,7 +76,10 @@ public class DefaultClusterService
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(NodeId.class)
.register(Node.Type.class)
.register(Node.State.class)
.register(ClusterHeartbeat.class)
.register(StatefulNode.class)
.register(new DefaultClusterMetadataService.EndpointSerializer(), Endpoint.class)
.build("ClusterService"));

private final MessagingService messagingService;
Expand Down Expand Up @@ -104,12 +109,16 @@ public Node getLocalNode() {

@Override
public Set<Node> getNodes() {
return ImmutableSet.copyOf(nodes.values());
return ImmutableSet.copyOf(nodes.values()
.stream()
.filter(node -> node.type() == Node.Type.DATA || node.getState() == State.ACTIVE)
.collect(Collectors.toList()));
}

@Override
public Node getNode(NodeId nodeId) {
return nodes.get(nodeId);
Node node = nodes.get(nodeId);
return node != null ? node.type() == Node.Type.DATA || node.getState() == State.ACTIVE ? node : null : null;
}

/**
Expand All @@ -124,8 +133,9 @@ private void sendHeartbeats() {
byte[] payload = SERIALIZER.encode(new ClusterHeartbeat(localNode.id(), localNode.type()));
peers.forEach((node) -> {
sendHeartbeat(node.endpoint(), payload);
double phi = failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector()).phi();
if (phi >= phiFailureThreshold) {
PhiAccrualFailureDetector failureDetector = failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector());
double phi = failureDetector.phi();
if (phi >= phiFailureThreshold || System.currentTimeMillis() - failureDetector.lastUpdated() > DEFAULT_FAILURE_TIME) {
if (node.getState() == State.ACTIVE) {
deactivateNode(node);
}
Expand All @@ -144,8 +154,21 @@ private void sendHeartbeats() {
* Sends a heartbeat to the given peer.
*/
private void sendHeartbeat(Endpoint endpoint, byte[] payload) {
messagingService.sendAsync(endpoint, HEARTBEAT_MESSAGE, payload).whenComplete((result, error) -> {
if (error != null) {
messagingService.sendAndReceive(endpoint, HEARTBEAT_MESSAGE, payload).whenComplete((response, error) -> {
if (error == null) {
Collection<StatefulNode> nodes = SERIALIZER.decode(response);
boolean sendHeartbeats = false;
for (StatefulNode node : nodes) {
if (this.nodes.putIfAbsent(node.id(), node) == null) {
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node));
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node));
sendHeartbeats = true;
}
}
if (sendHeartbeats) {
sendHeartbeats();
}
} else {
LOGGER.trace("Sending heartbeat to {} failed", endpoint, error);
}
});
Expand All @@ -154,10 +177,13 @@ private void sendHeartbeat(Endpoint endpoint, byte[] payload) {
/**
* Handles a heartbeat message.
*/
private void handleHeartbeat(Endpoint endpoint, byte[] message) {
private byte[] handleHeartbeat(Endpoint endpoint, byte[] message) {
ClusterHeartbeat heartbeat = SERIALIZER.decode(message);
failureDetectors.computeIfAbsent(heartbeat.nodeId(), n -> new PhiAccrualFailureDetector()).report();
activateNode(new StatefulNode(heartbeat.nodeId(), heartbeat.nodeType(), endpoint));
return SERIALIZER.encode(nodes.values().stream()
.filter(node -> node.type() == Node.Type.CLIENT)
.collect(Collectors.toList()));
}

/**
Expand All @@ -169,6 +195,7 @@ private void activateNode(StatefulNode node) {
node.setState(State.ACTIVE);
nodes.put(node.id(), node);
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node));
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node));
sendHeartbeat(node.endpoint(), SERIALIZER.encode(new ClusterHeartbeat(localNode.id(), localNode.type())));
} else if (existingNode.getState() == State.INACTIVE) {
existingNode.setState(State.ACTIVE);
Expand All @@ -188,7 +215,7 @@ private void deactivateNode(StatefulNode node) {
post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
break;
case CLIENT:
nodes.remove(node.id());
post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, existingNode));
post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, existingNode));
break;
default:
Expand Down
Expand Up @@ -55,7 +55,7 @@ public PhiAccrualFailureDetector() {
* Creates a new failure detector.
*
* @param minSamples the minimum number of samples required to compute phi
* @param phiFactor the phi factor
* @param phiFactor the phi factor
*/
public PhiAccrualFailureDetector(int minSamples, double phiFactor) {
this(minSamples, phiFactor, DEFAULT_WINDOW_SIZE);
Expand All @@ -65,7 +65,7 @@ public PhiAccrualFailureDetector(int minSamples, double phiFactor) {
* Creates a new failure detector.
*
* @param minSamples the minimum number of samples required to compute phi
* @param phiFactor the phi factor
* @param phiFactor the phi factor
* @param windowSize the phi accrual window size
*/
public PhiAccrualFailureDetector(int minSamples, double phiFactor, int windowSize) {
Expand All @@ -74,6 +74,15 @@ public PhiAccrualFailureDetector(int minSamples, double phiFactor, int windowSiz
this.history = new History(windowSize);
}

/**
* Returns the last time a heartbeat was reported.
*
* @return the last time a heartbeat was reported
*/
public long lastUpdated() {
return history.latestHeartbeatTime();
}

/**
* Report a new heart beat for the specified node id.
*/
Expand Down Expand Up @@ -112,9 +121,9 @@ public double phi() {
/**
* Computes the phi value from the given samples.
*
* @param samples the samples from which to compute phi
* @param samples the samples from which to compute phi
* @param lastHeartbeat the last heartbeat
* @param currentTime the current time
* @param currentTime the current time
* @return phi
*/
private double computePhi(DescriptiveStatistics samples, long lastHeartbeat, long currentTime) {
Expand Down
84 changes: 84 additions & 0 deletions core/src/test/java/io/atomix/core/AtomixTest.java
Expand Up @@ -15,6 +15,9 @@
*/
package io.atomix.core;

import com.google.common.base.Throwables;
import io.atomix.cluster.ClusterEvent;
import io.atomix.cluster.ClusterEventListener;
import io.atomix.cluster.Node;
import io.atomix.utils.concurrent.Futures;
import org.junit.After;
Expand All @@ -23,9 +26,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;

/**
* Atomix test.
*/
Expand Down Expand Up @@ -82,4 +89,81 @@ public void testScaleDown() throws Exception {
instances.get(1).stop().join();
instances.get(2).stop().join();
}

/**
* Tests a client joining and leaving the cluster.
*/
@Test
public void testClientJoinLeave() throws Exception {
List<CompletableFuture<Atomix>> futures = new ArrayList<>();
futures.add(startAtomix(Node.Type.DATA, 1, 1, 2, 3));
futures.add(startAtomix(Node.Type.DATA, 2, 1, 2, 3));
futures.add(startAtomix(Node.Type.DATA, 3, 1, 2, 3));
Futures.allOf(futures).join();

TestClusterEventListener dataListener = new TestClusterEventListener();
instances.get(0).clusterService().addListener(dataListener);

Atomix client1 = startAtomix(Node.Type.CLIENT, 4, 1, 2, 3).join();

// client1 added to data node
ClusterEvent event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ADDED, event1.type());
event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ACTIVATED, event1.type());

Thread.sleep(1000);

TestClusterEventListener clientListener = new TestClusterEventListener();
client1.clusterService().addListener(clientListener);

Atomix client2 = startAtomix(Node.Type.CLIENT, 5, 1, 2, 3).join();

// client2 added to data node
ClusterEvent event2 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ADDED, event2.type());
event2 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_ACTIVATED, event2.type());

// client2 added to client node
event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_ADDED, event1.type());
event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_ACTIVATED, event1.type());

client2.stop().join();

// client2 removed from data node
event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_DEACTIVATED, event1.type());
event1 = dataListener.event();
assertEquals(ClusterEvent.Type.NODE_REMOVED, event1.type());

// client2 removed from client node
event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_DEACTIVATED, event1.type());
event1 = clientListener.event();
assertEquals(ClusterEvent.Type.NODE_REMOVED, event1.type());
}

private static class TestClusterEventListener implements ClusterEventListener {

private final BlockingQueue<ClusterEvent> queue = new ArrayBlockingQueue<>(1);

@Override
public void onEvent(ClusterEvent event) {
try {
queue.put(event);
} catch (InterruptedException e) {
}
}

public boolean eventReceived() {
return !queue.isEmpty();
}

public ClusterEvent event() throws InterruptedException {
return queue.take();
}
}
}

0 comments on commit 03e470d

Please sign in to comment.