Skip to content

Commit

Permalink
Add support for partition member groups.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 31, 2018
1 parent a88204e commit 02054d0
Show file tree
Hide file tree
Showing 47 changed files with 2,160 additions and 455 deletions.
77 changes: 75 additions & 2 deletions cluster/src/main/java/io/atomix/cluster/Node.java
Expand Up @@ -141,11 +141,17 @@ public enum State {
private final NodeId id;
private final Type type;
private final Endpoint endpoint;
private final String zone;
private final String rack;
private final String host;

protected Node(NodeId id, Type type, Endpoint endpoint) {
protected Node(NodeId id, Type type, Endpoint endpoint, String zone, String rack, String host) {
this.id = checkNotNull(id, "id cannot be null");
this.type = checkNotNull(type, "type cannot be null");
this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
this.zone = zone;
this.rack = rack;
this.host = host;
}

/**
Expand Down Expand Up @@ -184,6 +190,33 @@ public State getState() {
throw new UnsupportedOperationException();
}

/**
* Returns the zone to which the node belongs.
*
* @return the zone to which the node belongs
*/
public String zone() {
return zone;
}

/**
* Returns the rack to which the node belongs.
*
* @return the rack to which the node belongs
*/
public String rack() {
return rack;
}

/**
* Returns the host to which the rack belongs.
*
* @return the host to which the rack belongs
*/
public String host() {
return host;
}

@Override
public int hashCode() {
return Objects.hash(id);
Expand All @@ -200,6 +233,10 @@ public String toString() {
.add("id", id)
.add("type", type)
.add("endpoint", endpoint)
.add("zone", zone)
.add("rack", rack)
.add("host", host)
.omitNullValues()
.toString();
}

Expand All @@ -210,6 +247,9 @@ public static class Builder implements io.atomix.utils.Builder<Node> {
protected NodeId id;
protected Type type;
protected Endpoint endpoint;
protected String zone;
protected String rack;
protected String host;

protected Builder(NodeId id) {
this.id = id;
Expand Down Expand Up @@ -249,12 +289,45 @@ public Builder withEndpoint(Endpoint endpoint) {
return this;
}

/**
* Sets the zone to which the node belongs.
*
* @param zone the zone to which the node belongs
* @return the node builder
*/
public Builder withZone(String zone) {
this.zone = zone;
return this;
}

/**
* Sets the rack to which the node belongs.
*
* @param rack the rack to which the node belongs
* @return the node builder
*/
public Builder withRack(String rack) {
this.rack = rack;
return this;
}

/**
* Sets the host to which the node belongs.
*
* @param host the host to which the node belongs
* @return the node builder
*/
public Builder withHost(String host) {
this.host = host;
return this;
}

@Override
public Node build() {
if (id == null) {
id = NodeId.from(endpoint.host().getHostName());
}
return new Node(id, type, endpoint);
return new Node(id, type, endpoint, zone, rack, host);
}
}
}
Expand Up @@ -26,10 +26,16 @@
final class ClusterHeartbeat {
private final NodeId nodeId;
private final Node.Type type;
private final String zone;
private final String rack;
private final String host;

ClusterHeartbeat(NodeId nodeId, Node.Type type) {
ClusterHeartbeat(NodeId nodeId, Node.Type type, String zone, String rack, String host) {
this.nodeId = nodeId;
this.type = type;
this.zone = zone;
this.rack = rack;
this.host = host;
}

/**
Expand All @@ -50,6 +56,33 @@ public Node.Type nodeType() {
return type;
}

/**
* Returns the zone.
*
* @return the zone
*/
public String zone() {
return zone;
}

/**
* Returns the rack.
*
* @return the rack
*/
public String rack() {
return rack;
}

/**
* Returns the host.
*
* @return the host
*/
public String host() {
return host;
}

@Override
public String toString() {
return toStringHelper(this)
Expand Down
Expand Up @@ -97,8 +97,15 @@ public class DefaultClusterMetadataService
private ScheduledFuture<?> metadataFuture;

public DefaultClusterMetadataService(ClusterMetadata metadata, MessagingService messagingService) {
metadata.bootstrapNodes().forEach(node -> nodes.put(node.id(),
new ReplicatedNode(node.id(), node.type(), node.endpoint(), new LogicalTimestamp(0), false)));
metadata.bootstrapNodes().forEach(node -> nodes.put(node.id(), new ReplicatedNode(
node.id(),
node.type(),
node.endpoint(),
node.zone(),
node.rack(),
node.host(),
new LogicalTimestamp(0),
false)));
this.messagingService = messagingService;
}

Expand All @@ -116,7 +123,15 @@ public void addNode(Node node) {
ReplicatedNode replicatedNode = nodes.get(node.id());
if (replicatedNode == null) {
LogicalTimestamp timestamp = clock.increment();
replicatedNode = new ReplicatedNode(node.id(), node.type(), node.endpoint(), timestamp, false);
replicatedNode = new ReplicatedNode(
node.id(),
node.type(),
node.endpoint(),
node.zone(),
node.rack(),
node.host(),
timestamp,
false);
nodes.put(replicatedNode.id(), replicatedNode);
broadcastUpdate(new NodeUpdate(replicatedNode, timestamp));
post(new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, getMetadata()));
Expand All @@ -129,7 +144,15 @@ public void removeNode(Node node) {
ReplicatedNode replicatedNode = nodes.get(node.id());
if (replicatedNode != null) {
LogicalTimestamp timestamp = clock.increment();
replicatedNode = new ReplicatedNode(node.id(), node.type(), node.endpoint(), timestamp, true);
replicatedNode = new ReplicatedNode(
node.id(),
node.type(),
node.endpoint(),
node.zone(),
node.rack(),
node.host(),
timestamp,
true);
nodes.put(replicatedNode.id(), replicatedNode);
broadcastUpdate(new NodeUpdate(replicatedNode, timestamp));
post(new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, getMetadata()));
Expand Down Expand Up @@ -262,7 +285,7 @@ private Optional<Endpoint> pickRandomPeer() {
List<Endpoint> nodes = this.nodes.values()
.stream()
.filter(replicatedNode -> !replicatedNode.tombstone() &&
!replicatedNode.endpoint().equals(messagingService.endpoint()))
!replicatedNode.endpoint().equals(messagingService.endpoint()))
.map(Node::endpoint)
.collect(Collectors.toList());
Collections.shuffle(nodes);
Expand All @@ -282,7 +305,15 @@ private byte[] handleAdvertisement(Endpoint endpoint, byte[] payload) {
} else if (digest.isNewerThan(node.timestamp())) {
if (digest.tombstone()) {
if (!node.tombstone()) {
nodes.put(node.id(), new ReplicatedNode(node.id(), node.type(), node.endpoint(), digest.timestamp(), true));
nodes.put(node.id(), new ReplicatedNode(
node.id(),
node.type(),
node.endpoint(),
node.zone(),
node.rack(),
node.host(),
digest.timestamp(),
true));
post(new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, getMetadata()));
}
} else {
Expand Down
Expand Up @@ -99,7 +99,13 @@ public class DefaultClusterService
public DefaultClusterService(Node localNode, ClusterMetadataService metadataService, MessagingService messagingService) {
this.metadataService = checkNotNull(metadataService, "metadataService cannot be null");
this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
this.localNode = new StatefulNode(localNode.id(), localNode.type(), localNode.endpoint());
this.localNode = new StatefulNode(
localNode.id(),
localNode.type(),
localNode.endpoint(),
localNode.zone(),
localNode.rack(),
localNode.host());
}

@Override
Expand Down Expand Up @@ -130,7 +136,12 @@ private void sendHeartbeats() {
.stream()
.filter(node -> !node.id().equals(getLocalNode().id()))
.collect(Collectors.toSet());
byte[] payload = SERIALIZER.encode(new ClusterHeartbeat(localNode.id(), localNode.type()));
byte[] payload = SERIALIZER.encode(new ClusterHeartbeat(
localNode.id(),
localNode.type(),
localNode.zone(),
localNode.rack(),
localNode.host()));
peers.forEach((node) -> {
sendHeartbeat(node.endpoint(), payload);
PhiAccrualFailureDetector failureDetector = failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector());
Expand Down Expand Up @@ -180,7 +191,13 @@ private void sendHeartbeat(Endpoint endpoint, byte[] payload) {
private byte[] handleHeartbeat(Endpoint endpoint, byte[] message) {
ClusterHeartbeat heartbeat = SERIALIZER.decode(message);
failureDetectors.computeIfAbsent(heartbeat.nodeId(), n -> new PhiAccrualFailureDetector()).report();
activateNode(new StatefulNode(heartbeat.nodeId(), heartbeat.nodeType(), endpoint));
activateNode(new StatefulNode(
heartbeat.nodeId(),
heartbeat.nodeType(),
endpoint,
heartbeat.zone(),
heartbeat.rack(),
heartbeat.host()));
return SERIALIZER.encode(nodes.values().stream()
.filter(node -> node.type() == Node.Type.CLIENT)
.collect(Collectors.toList()));
Expand All @@ -196,7 +213,12 @@ private void activateNode(StatefulNode node) {
nodes.put(node.id(), node);
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, node));
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, node));
sendHeartbeat(node.endpoint(), SERIALIZER.encode(new ClusterHeartbeat(localNode.id(), localNode.type())));
sendHeartbeat(node.endpoint(), SERIALIZER.encode(new ClusterHeartbeat(
localNode.id(),
localNode.type(),
localNode.zone(),
localNode.rack(),
localNode.host())));
} else if (existingNode.getState() == State.INACTIVE) {
existingNode.setState(State.ACTIVE);
post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, existingNode));
Expand Down Expand Up @@ -235,7 +257,13 @@ private void handleMetadataEvent(ClusterMetadataEvent event) {
.map(node -> {
StatefulNode existingNode = nodes.get(node.id());
if (existingNode == null) {
StatefulNode newNode = new StatefulNode(node.id(), node.type(), node.endpoint());
StatefulNode newNode = new StatefulNode(
node.id(),
node.type(),
node.endpoint(),
node.zone(),
node.rack(),
node.host());
nodes.put(newNode.id(), newNode);
post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, newNode));
}
Expand Down Expand Up @@ -267,7 +295,13 @@ public CompletableFuture<ClusterService> start() {
localNode.setState(State.ACTIVE);
nodes.put(localNode.id(), localNode);
metadataService.getMetadata().bootstrapNodes()
.forEach(node -> nodes.putIfAbsent(node.id(), new StatefulNode(node.id(), node.type(), node.endpoint())));
.forEach(node -> nodes.putIfAbsent(node.id(), new StatefulNode(
node.id(),
node.type(),
node.endpoint(),
node.zone(),
node.rack(),
node.host())));
messagingService.registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, heartbeatExecutor);
heartbeatFuture = heartbeatScheduler.scheduleWithFixedDelay(this::sendHeartbeats, 0, heartbeatInterval, TimeUnit.MILLISECONDS);
LOGGER.info("Started");
Expand Down
12 changes: 10 additions & 2 deletions cluster/src/main/java/io/atomix/cluster/impl/ReplicatedNode.java
Expand Up @@ -30,8 +30,16 @@ public class ReplicatedNode extends Node {
private final LogicalTimestamp timestamp;
private final boolean tombstone;

public ReplicatedNode(NodeId id, Type type, Endpoint endpoint, LogicalTimestamp timestamp, boolean tombstone) {
super(id, type, endpoint);
public ReplicatedNode(
NodeId id,
Type type,
Endpoint endpoint,
String zone,
String rack,
String host,
LogicalTimestamp timestamp,
boolean tombstone) {
super(id, type, endpoint, zone, rack, host);
this.timestamp = checkNotNull(timestamp, "timestamp cannot be null");
this.tombstone = tombstone;
}
Expand Down
10 changes: 8 additions & 2 deletions cluster/src/main/java/io/atomix/cluster/impl/StatefulNode.java
Expand Up @@ -25,8 +25,14 @@
public class StatefulNode extends Node {
private State state = State.INACTIVE;

public StatefulNode(NodeId id, Type type, Endpoint endpoint) {
super(id, type, endpoint);
public StatefulNode(
NodeId id,
Type type,
Endpoint endpoint,
String zone,
String rack,
String host) {
super(id, type, endpoint, zone, rack, host);
}

/**
Expand Down

0 comments on commit 02054d0

Please sign in to comment.