Skip to content

Commit

Permalink
Add Atomix standalone server.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 8, 2017
1 parent a96bfe0 commit 7f9153c
Show file tree
Hide file tree
Showing 17 changed files with 472 additions and 748 deletions.
71 changes: 14 additions & 57 deletions cluster/src/main/java/io/atomix/cluster/Node.java
Expand Up @@ -16,9 +16,7 @@
package io.atomix.cluster;

import io.atomix.cluster.impl.DefaultNode;

import java.net.InetAddress;
import java.net.UnknownHostException;
import io.atomix.messaging.Endpoint;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -71,13 +69,11 @@ public enum State {
}

private final NodeId id;
private final InetAddress address;
private final int port;
private final Endpoint endpoint;

protected Node(NodeId id, InetAddress address, int port) {
protected Node(NodeId id, Endpoint endpoint) {
this.id = checkNotNull(id, "id cannot be null");
this.address = checkNotNull(address, "address cannot be null");
this.port = port;
this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
}

/**
Expand All @@ -90,21 +86,12 @@ public NodeId id() {
}

/**
* Returns the IP address of the controller instance.
*
* @return IP address
*/
public InetAddress address() {
return address;
}

/**
* Returns the TCP port on which the node listens for connections.
* Returns the node endpoint.
*
* @return TCP port
* @return the node endpoint
*/
public int tcpPort() {
return port;
public Endpoint endpoint() {
return endpoint;
}

/**
Expand All @@ -125,20 +112,16 @@ public int tcpPort() {
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("address", address)
.add("port", port)
.add("endpoint", endpoint)
.toString();
}

/**
* Node builder.
*/
public abstract static class Builder implements io.atomix.utils.Builder<Node> {
private static final int DEFAULT_PORT = 5678;

protected NodeId id;
protected InetAddress address;
protected int port = DEFAULT_PORT;
protected Endpoint endpoint;

/**
* Sets the node identifier.
Expand All @@ -152,39 +135,13 @@ public Builder withId(NodeId id) {
}

/**
* Sets the node host.
*
* @param host the node host
* @return the node builder
* @throws IllegalArgumentException if the host name cannot be resolved
*/
public Builder withHost(String host) {
try {
return withAddress(InetAddress.getByName(host));
} catch (UnknownHostException e) {
throw new IllegalArgumentException("Failed to resolve host", e);
}
}

/**
* Sets the node address.
*
* @param address the node address
* @return the node builder
*/
public Builder withAddress(InetAddress address) {
this.address = checkNotNull(address, "address cannot be null");
return this;
}

/**
* Sets the node port.
* Sets the node endpoint.
*
* @param port the node port
* @param endpoint the node endpoint
* @return the node builder
*/
public Builder withPort(int port) {
this.port = port;
public Builder withEndpoint(Endpoint endpoint) {
this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
return this;
}
}
Expand Down
Expand Up @@ -92,7 +92,13 @@ public DefaultClusterService(ClusterMetadata clusterMetadata, MessagingService m
this.clusterMetadata = checkNotNull(clusterMetadata, "clusterMetadata cannot be null");
this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
this.localNode = (DefaultNode) clusterMetadata.localNode();
clusterMetadata.bootstrapNodes().forEach(n -> nodes.put(n.id(), (DefaultNode) n));
if (clusterMetadata.bootstrapNodes().contains(localNode)) {
localNode.setType(Node.Type.CORE);
} else {
localNode.setType(Node.Type.CLIENT);
}
nodes.put(localNode.id(), localNode);
clusterMetadata.bootstrapNodes().forEach(n -> nodes.put(n.id(), ((DefaultNode) n).setType(Node.Type.CORE)));
messagingService.registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, heartbeatExecutor);
}

Expand Down Expand Up @@ -120,9 +126,9 @@ private void sendHeartbeats() {
.stream()
.filter(node -> !(node.id().equals(localNode().id())))
.collect(Collectors.toSet());
byte[] hbMessagePayload = SERIALIZER.encode(localNode);
byte[] payload = SERIALIZER.encode(localNode.id());
peers.forEach((node) -> {
sendHeartbeat(hbMessagePayload, node);
sendHeartbeat(node.endpoint(), payload);
double phi = failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector()).phi();
if (phi >= phiFailureThreshold) {
if (node.state() == State.ACTIVE) {
Expand All @@ -142,11 +148,10 @@ private void sendHeartbeats() {
/**
* Sends a heartbeat to the given peer.
*/
private void sendHeartbeat(byte[] messagePayload, Node peer) {
Endpoint remoteEp = new Endpoint(peer.address(), peer.tcpPort());
messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
private void sendHeartbeat(Endpoint endpoint, byte[] payload) {
messagingService.sendAsync(endpoint, HEARTBEAT_MESSAGE, payload).whenComplete((result, error) -> {
if (error != null) {
LOGGER.trace("Sending heartbeat to {} failed", remoteEp, error);
LOGGER.trace("Sending heartbeat to {} failed", endpoint, error);
}
});
}
Expand All @@ -155,9 +160,9 @@ private void sendHeartbeat(byte[] messagePayload, Node peer) {
* Handles a heartbeat message.
*/
private void handleHeartbeat(Endpoint endpoint, byte[] message) {
DefaultNode node = SERIALIZER.decode(message);
failureDetectors.computeIfAbsent(node.id(), n -> new PhiAccrualFailureDetector()).report();
activateNode(node);
NodeId nodeId = SERIALIZER.decode(message);
failureDetectors.computeIfAbsent(nodeId, n -> new PhiAccrualFailureDetector()).report();
activateNode(new DefaultNode(nodeId, endpoint));
}

/**
Expand All @@ -169,7 +174,7 @@ private void activateNode(DefaultNode node) {
node.setState(State.ACTIVE);
nodes.put(node.id(), node);
eventListeners.forEach(l -> l.onEvent(new ClusterEvent(Type.NODE_ADDED, node)));
sendHeartbeat(SERIALIZER.encode(localNode), node);
sendHeartbeat(node.endpoint(), SERIALIZER.encode(localNode.id()));
} else if (existingNode.state() == State.INACTIVE) {
existingNode.setState(State.ACTIVE);
eventListeners.forEach(l -> l.onEvent(new ClusterEvent(Type.NODE_ACTIVATED, existingNode)));
Expand Down
13 changes: 8 additions & 5 deletions cluster/src/main/java/io/atomix/cluster/impl/DefaultNode.java
Expand Up @@ -17,6 +17,7 @@

import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.messaging.Endpoint;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand All @@ -28,8 +29,8 @@ public class DefaultNode extends Node {
private Type type = Type.CLIENT;
private State state = State.INACTIVE;

public DefaultNode(NodeId id, InetAddress address, int port) {
super(id, address, port);
public DefaultNode(NodeId id, Endpoint endpoint) {
super(id, endpoint);
}

/**
Expand Down Expand Up @@ -63,16 +64,18 @@ public State state() {
* Default cluster node builder.
*/
public static class Builder extends Node.Builder {
protected static final int DEFAULT_PORT = 5678;

@Override
public Node build() {
if (address == null) {
if (endpoint == null) {
try {
address = InetAddress.getByName("127.0.0.1");
endpoint = new Endpoint(InetAddress.getByName("127.0.0.1"), DEFAULT_PORT);
} catch (UnknownHostException e) {
throw new IllegalStateException("Failed to instantiate address", e);
}
}
return new DefaultNode(id, address, port);
return new DefaultNode(id, endpoint);
}
}
}
Expand Up @@ -163,15 +163,13 @@ public <M, R> CompletableFuture<R> sendAndReceive(M message,
private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Node node = cluster.node(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.address(), node.tcpPort());
return messagingService.sendAsync(nodeEp, subject.toString(), payload);
return messagingService.sendAsync(node.endpoint(), subject.toString(), payload);
}

private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Node node = cluster.node(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.address(), node.tcpPort());
return messagingService.sendAndReceive(nodeEp, subject.toString(), payload);
return messagingService.sendAndReceive(node.endpoint(), subject.toString(), payload);
}

@Override
Expand Down

0 comments on commit 7f9153c

Please sign in to comment.