Skip to content

Commit

Permalink
- Only master node should answer to the split-brain merge requests. O…
Browse files Browse the repository at this point in the history
…ther members should silently ignore the request and optionally log it.

- Non-master members should send master confirmation as soon as possible after they detect a master change. This is just to speed up and help detection of possible network split. Since if a master member receives a master confirmation from a non-member endpoint, it sends a "remove-me-from-your-cluster" request to the caller.

- Master confirmations should be reset only when a node becomes master after old master disconnects.

- Member UUID should not change when it's merging to a cluster, since this makes very hard to detect a member's uuid for uuid-sensitive operations like locks.

- Split brain handling process improved and changed significantly.  Instead of just comparing cluster member count between clusters, complete member-lists will be compared, if at least one member is common between two clusters, clusters will reject to merge. They will not try to merge until they have completely different members in their cluster.  Eventually both cluster will diverge because of heartbeat and/or master-confirmation timeouts.

- Heartbeat mechanism is changed to explicitly send heartbeats instead of relying read and write timestamps of other members. Since, it can be that other node is replying with an error response (like caller-not-member-exception), but currently caller node assumes it's heartbeating implicitly. And this causes one side to assume they are in the same cluster but practically it cannot execute operations on other member.
  • Loading branch information
mdogan committed Jul 28, 2015
1 parent bef95d3 commit 204bf49
Show file tree
Hide file tree
Showing 24 changed files with 409 additions and 292 deletions.
Expand Up @@ -30,7 +30,6 @@
import com.hazelcast.client.spi.impl.DefaultAddressTranslator;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.instance.TestUtil;
import com.hazelcast.logging.ILogger;
Expand Down Expand Up @@ -165,11 +164,7 @@ void handleClientMessage(ClientMessage clientMessage) {
@Override
public boolean write(SocketWritable socketWritable) {
ClientMessage newPacket = readFromPacket((ClientMessage) socketWritable);
MemberImpl member = serverNodeEngine.getClusterService().getMember(remoteAddress);
lastWriteTime = System.currentTimeMillis();
if (member != null) {
member.didRead();
}
serverNodeEngine.getNode().clientEngine.handleClientMessage(newPacket, serverSideConnection);
return true;
}
Expand Down Expand Up @@ -251,10 +246,6 @@ public boolean write(SocketWritable socketWritable) {
final ClientMessage packet = (ClientMessage) socketWritable;
if (nodeEngine.getNode().isActive()) {
ClientMessage newPacket = readFromPacket(packet);
MemberImpl member = nodeEngine.getClusterService().getMember(localEndpoint);
if (member != null) {
member.didRead();
}
responseConnection.handleClientMessage(newPacket);
return true;
}
Expand Down
Expand Up @@ -29,7 +29,6 @@
import com.hazelcast.client.spi.impl.DefaultAddressTranslator;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.instance.TestUtil;
import com.hazelcast.logging.ILogger;
Expand Down Expand Up @@ -166,11 +165,7 @@ void handlePacket(Packet packet) {
@Override
public boolean write(SocketWritable socketWritable) {
Packet newPacket = readFromPacket((Packet) socketWritable);
MemberImpl member = serverNodeEngine.getClusterService().getMember(remoteAddress);
lastWriteTime = System.currentTimeMillis();
if (member != null) {
member.didRead();
}
serverNodeEngine.getNode().clientEngine.handlePacket(newPacket);
return true;
}
Expand Down Expand Up @@ -272,10 +267,6 @@ public boolean write(SocketWritable socketWritable) {
final Packet packet = (Packet) socketWritable;
if (nodeEngine.getNode().isActive()) {
Packet newPacket = readFromPacket(packet);
MemberImpl member = nodeEngine.getClusterService().getMember(localEndpoint);
if (member != null) {
member.didRead();
}
responseConnection.handlePacket(newPacket);
return true;
}
Expand Down
172 changes: 119 additions & 53 deletions hazelcast/src/main/java/com/hazelcast/cluster/impl/AbstractJoiner.java
Expand Up @@ -17,25 +17,30 @@
package com.hazelcast.cluster.impl;

import com.hazelcast.cluster.Joiner;
import com.hazelcast.cluster.impl.operations.JoinCheckOperation;
import com.hazelcast.cluster.impl.operations.MemberRemoveOperation;
import com.hazelcast.cluster.impl.operations.MergeClustersOperation;
import com.hazelcast.cluster.impl.operations.PrepareMergeOperation;
import com.hazelcast.config.Config;
import com.hazelcast.core.Member;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.impl.OperationResponseHandlerFactory;
import com.hazelcast.util.Clock;
import com.hazelcast.util.EmptyStatement;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
Expand All @@ -47,6 +52,9 @@

public abstract class AbstractJoiner implements Joiner {

private static final int SPLIT_BRAIN_CONN_TIMEOUT = 5000;
private static final int SPLIT_BRAIN_SLEEP_TIME = 10;

private final ExceptionHandler whileWaitMergeExceptionHandler;
private final AtomicLong joinStartTime = new AtomicLong(Clock.currentTimeMillis());
private final AtomicInteger tryCount = new AtomicInteger(0);
Expand Down Expand Up @@ -158,63 +166,121 @@ protected final long getMaxJoinTimeToMasterNode() {
return (node.getGroupProperties().MAX_WAIT_SECONDS_BEFORE_JOIN.getInteger() + 10) * 1000L;
}

boolean shouldMerge(JoinMessage joinRequest) {
boolean shouldMerge = false;
if (joinRequest != null) {
boolean validJoinRequest;
try {
try {
validJoinRequest = node.getClusterService().validateJoinMessage(joinRequest);
} catch (Exception e) {
logger.finest(e.getMessage());
validJoinRequest = false;
boolean shouldMerge(JoinMessage joinMessage) {
if (joinMessage == null) {
return false;
}

ClusterServiceImpl clusterService = node.getClusterService();
try {
boolean validJoinRequest = clusterService.validateJoinMessage(joinMessage);
if (!validJoinRequest) {
logger.finest("Cannot process split brain merge message from " + joinMessage.getAddress()
+ ", since join-message could not be validated.");
return false;
}
} catch (Exception e) {
logger.finest(e.getMessage());
return false;
}

try {
if (clusterService.getMember(joinMessage.getAddress()) != null) {
if (logger.isFinestEnabled()) {
logger.finest("Should not merge to " + joinMessage.getAddress()
+ ", because it is already member of this cluster.");
}
if (validJoinRequest) {
for (Member member : node.getClusterService().getMembers()) {
MemberImpl memberImpl = (MemberImpl) member;
if (memberImpl.getAddress().equals(joinRequest.getAddress())) {
if (logger.isFinestEnabled()) {
logger.finest("Should not merge to " + joinRequest.getAddress()
+ ", because it is already member of this cluster.");
}
return false;
}
}
int currentMemberCount = node.getClusterService().getMembers().size();
if (joinRequest.getMemberCount() > currentMemberCount) {
// I should join the other cluster
logger.info(node.getThisAddress() + " is merging to " + joinRequest.getAddress()
+ ", because : joinRequest.getMemberCount() > currentMemberCount ["
+ (joinRequest.getMemberCount() + " > " + currentMemberCount) + "]");
if (logger.isFinestEnabled()) {
logger.finest(joinRequest.toString());
}
shouldMerge = true;
} else if (joinRequest.getMemberCount() == currentMemberCount) {
// compare the hashes
if (node.getThisAddress().hashCode() > joinRequest.getAddress().hashCode()) {
logger.info(node.getThisAddress() + " is merging to " + joinRequest.getAddress()
+ ", because : node.getThisAddress().hashCode() > joinRequest.address.hashCode() "
+ ", this node member count: " + currentMemberCount);
if (logger.isFinestEnabled()) {
logger.finest(joinRequest.toString());
}
shouldMerge = true;
} else {
if (logger.isFinestEnabled()) {
logger.finest(joinRequest.getAddress() + " should merge to this node "
+ ", because : node.getThisAddress().hashCode() < joinRequest.address.hashCode() "
+ ", this node member count: " + currentMemberCount);
}
}
return false;
}

Collection<Address> targetMemberAddresses = joinMessage.getMemberAddresses();
if (targetMemberAddresses.contains(node.getThisAddress())) {
node.nodeEngine.getOperationService()
.send(new MemberRemoveOperation(node.getThisAddress()), joinMessage.getAddress());
logger.info(node.getThisAddress() + " CANNOT merge to " + joinMessage.getAddress()
+ ", because it thinks this-node as its member.");
return false;
}

Collection<Address> thisMemberAddresses = clusterService.getMemberAddresses();
for (Address address : thisMemberAddresses) {
if (targetMemberAddresses.contains(address)) {
logger.info(node.getThisAddress() + " CANNOT merge to " + joinMessage.getAddress()
+ ", because it thinks " + address + " as its member. "
+ "But " + address + " is member of this cluster.");
return false;
}
}

int currentMemberCount = clusterService.getSize();
if (joinMessage.getMemberCount() > currentMemberCount) {
// I should join the other cluster
logger.info(node.getThisAddress() + " is merging to " + joinMessage.getAddress()
+ ", because : joinMessage.getMemberCount() > currentMemberCount ["
+ (joinMessage.getMemberCount() + " > " + currentMemberCount) + "]");
if (logger.isFinestEnabled()) {
logger.finest(joinMessage.toString());
}
return true;
} else if (joinMessage.getMemberCount() == currentMemberCount) {
// compare the hashes
if (node.getThisAddress().hashCode() > joinMessage.getAddress().hashCode()) {
logger.info(node.getThisAddress() + " is merging to " + joinMessage.getAddress()
+ ", because : node.getThisAddress().hashCode() > joinMessage.address.hashCode() "
+ ", this node member count: " + currentMemberCount);
if (logger.isFinestEnabled()) {
logger.finest(joinMessage.toString());
}
return true;
} else {
logger.info(joinMessage.getAddress() + " should merge to this node "
+ ", because : node.getThisAddress().hashCode() < joinMessage.address.hashCode() "
+ ", this node member count: " + currentMemberCount);
}
} catch (Throwable e) {
logger.severe(e);
return false;
} else {
logger.info(joinMessage.getAddress() + " should merge to this node "
+ ", because : currentMemberCount > joinMessage.getMemberCount() ["
+ (currentMemberCount + " > " + joinMessage.getMemberCount()) + "]");
}
} catch (Throwable e) {
logger.severe(e);
}
return false;
}

JoinMessage sendSplitBrainJoinMessage(Address target) {
if (logger.isFinestEnabled()) {
logger.finest(node.getThisAddress() + " is connecting to " + target);
}

Connection conn = node.connectionManager.getOrConnect(target, true);
int timeout = SPLIT_BRAIN_CONN_TIMEOUT;
while (conn == null) {
if ((timeout -= SPLIT_BRAIN_SLEEP_TIME) < 0) {
return null;
}
try {
//noinspection BusyWait
Thread.sleep(SPLIT_BRAIN_SLEEP_TIME);
} catch (InterruptedException e) {
EmptyStatement.ignore(e);
return null;
}
conn = node.connectionManager.getConnection(target);
}

NodeEngine nodeEngine = node.nodeEngine;
Future f = nodeEngine.getOperationService().createInvocationBuilder(ClusterServiceImpl.SERVICE_NAME,
new JoinCheckOperation(node.createSplitBrainJoinMessage()), target)
.setTryCount(1).invoke();
try {
return (JoinMessage) f.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
logger.finest("Timeout during join check!", e);
} catch (Exception e) {
logger.warning("Error during join check!", e);
}
return shouldMerge;
return null;
}

@Override
Expand Down

0 comments on commit 204bf49

Please sign in to comment.