Skip to content

Commit

Permalink
ARTEMIS-4143 improve mitigation against split-brain with shared-storage
Browse files Browse the repository at this point in the history
Configurations employing shared-storage with NFS are susceptible to
split-brain in certain scenarios. For example:

  1) Primary loses network connection to NFS.
  2) Backup activates.
  3) Primary reconnects to NFS.
  4) Split-brain.

In reality this situation is pretty unlikely due to the timing involved,
but the possibility still exists. Currently the file lock held by the
primary broker on the NFS share is essentially worthless in this
situation. This commit adds logic by which the timestamp of the lock
file is updated during activation and then routinely checked during
runtime to ensure consistency. This effectively mitigates split-brain in
this situation (and likely others). Here's how it works now.

  1) Primary loses network connection to NFS.
  2) Backup activates.
  3) Primary reconnects to NFS.
  4) Primary detects that the lock file's timestamp has been updated and
     shuts itself down.

When the primary shuts down in step #4 the Topology on the backup can be
damaged. Protections were added for this via ARTEMIS-2868 but only for
the replicated use-case. This commit applies the protection for
removeMember() so that the Topology remains intact.

There are no tests for these changes as I cannot determine how to
properly simulate this use-case. However, there have never been robust,
automated tests for these kinds of NFS use-cases so this is not a
departure from the norm.
  • Loading branch information
jbertram authored and clebertsuconic committed Feb 3, 2023
1 parent d1b3610 commit 8f30347
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;

public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {

SimpleString getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImp
*/
@Override
public boolean removeMember(final long uniqueEventID, final String nodeId) {
if (splitBrainDetection && nodeId.equals(nodeManager.getNodeId().toString())) {
if (nodeId.equals(nodeManager.getNodeId().toString())) {
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class FileBasedNodeManager extends NodeManager {
public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence";
private static final String ACCESS_MODE = "rw";
private final File directory;
protected File serverLockFile;
private final Path activationSequencePath;
protected FileChannel channel;
protected FileChannel activationSequenceChannel;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void writeNodeActivationSequence(long sequence) throws NodeManagerExcepti
* </ol>
*/
protected synchronized void setUpServerLockFile() throws IOException {
File serverLockFile = newFile(SERVER_LOCK_NAME);
serverLockFile = newFile(SERVER_LOCK_NAME);

boolean fileCreated = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -45,8 +46,6 @@ public class FileLockNodeManager extends FileBasedNodeManager {

private static final int BACKUP_LOCK_POS = 2;

private static final long LOCK_LENGTH = 1;

private static final byte LIVE = 'L';

private static final byte FAILINGBACK = 'F';
Expand All @@ -65,6 +64,8 @@ public class FileLockNodeManager extends FileBasedNodeManager {

private final FileChannel[] lockChannels = new FileChannel[3];

private long serverLockLastModified;

private final long lockAcquisitionTimeoutNanos;

protected boolean interrupted = false;
Expand Down Expand Up @@ -110,6 +111,7 @@ protected synchronized void setUpServerLockFile() throws IOException {
super.setUpServerLockFile();

lockChannels[0] = channel;
serverLockLastModified = serverLockFile.lastModified();

for (int i = 1; i < 3; i++) {
if (lockChannels[i] != null && lockChannels[i].isOpen()) {
Expand Down Expand Up @@ -189,33 +191,32 @@ public void awaitLiveNode() throws NodeManagerException, InterruptedException {
logger.debug("awaiting live node...");
do {
byte state = getState();
while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
if (logger.isDebugEnabled()) {
logger.debug("awaiting live node startup state='{}'", state);
}
while (state == NOT_STARTED || state == FIRST_TIME_START) {
logger.debug("awaiting live node startup state = '{}'", (char) state);

Thread.sleep(2000);
state = getState();
}

liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
liveLock = lock(LIVE_LOCK_POS);
if (interrupted) {
interrupted = false;
throw new InterruptedException("Lock was interrupted");
}
state = getState();
if (state == FileLockNodeManager.PAUSED) {
if (state == PAUSED) {
liveLock.release();
logger.debug("awaiting live node restarting");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.FAILINGBACK) {
} else if (state == FAILINGBACK) {
liveLock.release();
logger.debug("awaiting live node failing back");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.LIVE) {
if (logger.isDebugEnabled()) {
logger.debug("acquired live node lock state = {}", (char) state);
}
} else if (state == LIVE) {
// if the backup acquires the file lock and the state is 'L' that means the primary died
logger.debug("acquired live node lock state = {}", (char) state);
serverLockFile.setLastModified(System.currentTimeMillis());
logger.debug("touched {}; new time: {}", serverLockFile.getAbsoluteFile(), serverLockFile.lastModified());
break;
}
}
Expand Down Expand Up @@ -305,7 +306,7 @@ public void awaitLiveStatus() throws NodeManagerException, InterruptedException
}

private void setLive() throws NodeManagerException {
writeFileLockStatus(FileLockNodeManager.LIVE);
writeFileLockStatus(LIVE);
}

private void setFailingBack() throws NodeManagerException {
Expand All @@ -318,16 +319,14 @@ private void setPaused() throws NodeManagerException {

/**
* @param status
* @throws ActiveMQLockAcquisitionTimeoutException,IOException
* @throws NodeManagerException
*/
private void writeFileLockStatus(byte status) throws NodeManagerException {
if (replicatedBackup && channel == null) {
return;
}

if (logger.isDebugEnabled()) {
logger.debug("writing status: {}", status);
}
logger.debug("writing status: {}", (char) status);
ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status);
bb.position(0);
Expand All @@ -345,6 +344,8 @@ private void writeFileLockStatus(byte status) throws NodeManagerException {
lock.release();
}
}
serverLockLastModified = serverLockFile.lastModified();
logger.debug("Modified {} at {}", serverLockFile.getName(), serverLockLastModified);
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
Expand All @@ -371,9 +372,7 @@ private byte getState() throws NodeManagerException {
}
}

if (logger.isDebugEnabled()) {
logger.debug("state: {}", result);
}
logger.debug("state: {}", (char) result);
return result;
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
Expand All @@ -400,17 +399,13 @@ public final SimpleString readNodeId() throws NodeManagerException {

protected FileLock tryLock(final int lockPos) throws IOException {
try {
if (logger.isDebugEnabled()) {
logger.debug("trying to lock position: {}", lockPos);
}
logger.debug("trying to lock position: {}", lockPos);

FileLock lock = lockChannels[lockPos].tryLock();
if (logger.isDebugEnabled()) {
if (lock != null) {
logger.debug("locked position: {}", lockPos);
} else {
logger.debug("failed to lock position: {}", lockPos);
}
if (lock != null) {
logger.debug("locked position: {}", lockPos);
} else {
logger.debug("failed to lock position: {}", lockPos);
}

return lock;
Expand All @@ -429,16 +424,19 @@ protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTi
FileLock lock = tryLock(lockPosition);
isRecurringFailure = false;

logger.debug("lock: {}", lock);

// even if the lock is valid it may have taken too long to acquire
if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos));
}

if (lock == null) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
return null;
}

if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
}
} else {
return lock;
}
Expand All @@ -456,7 +454,7 @@ protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTi
if (this.lockAcquisitionTimeoutNanos != -1) {
final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start);
if (remainingTime <= 0) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos));
}
waitTime = Math.min(waitTime, remainingTime);
}
Expand Down Expand Up @@ -529,19 +527,23 @@ public void run() {
}
lostLock = isLiveLockLost();
if (!lostLock) {
logger.debug("Server still has the lock, double check status is live");
// Java always thinks the lock is still valid even when there is no filesystem
// so we do another check

// Should be able to retrieve the status unless something is wrong
// When EFS is gone, this locks. Which can be solved but is a lot of threading
// work where we need to
// manage the timeout ourselves and interrupt the thread used to claim the lock.
/*
* Java always thinks the lock is still valid even when there is no filesystem
* so we perform additional checks...
*/

/*
* We should be able to retrieve the status unless something is wrong. When EFS is
* gone, this locks. Which can be solved but is a lot of threading work where we
* need to manage the timeout ourselves and interrupt the thread used to claim the
* lock.
*/
logger.debug("Lock appears to be valid; double check by reading status");
byte state = getState();
if (state == LIVE) {
logger.debug("Status is set to live");
} else {
logger.debug("Status is not live");

logger.debug("Lock appears to be valid; triple check by comparing timestamp");
if (hasBeenModified(state)) {
lostLock = true;
}
}
} catch (Exception exception) {
Expand All @@ -554,9 +556,26 @@ public void run() {
logger.warn("Lost the lock according to the monitor, notifying listeners");
notifyLostLock();
}

}

}
private boolean hasBeenModified(byte state) {
boolean modified = false;

// Create a new instance of the File object so we can get the most up-to-date information on the file.
File freshServerLockFile = new File(serverLockFile.getAbsolutePath());

if (freshServerLockFile.exists()) {
// the other broker competing for the lock may modify the state as 'F' when it starts so ensure the state is 'L' before returning true
if (freshServerLockFile.lastModified() > serverLockLastModified && state == LIVE) {
logger.debug("Lock file {} originally locked at {} was modified at {}", serverLockFile.getAbsolutePath(), new Date(serverLockLastModified), new Date(freshServerLockFile.lastModified()));
modified = true;
}
} else {
logger.debug("Lock file {} does not exist", serverLockFile.getAbsolutePath());
modified = true;
}

return modified;
}
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@
<scope>test</scope>
<!-- License: EPL 1.0 -->
</dependency>

<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ public void run() {
service.shutdown();

assertTrue("Expected to find AMQ224000", AssertionLoggerHandler.findText("AMQ224000"));
assertTrue("Expected to find \"timed out waiting for lock\"", AssertionLoggerHandler.findText("timed out waiting for lock"));
assertTrue("Expected to find \"Timed out waiting for lock\"", AssertionLoggerHandler.findText("Timed out waiting for lock"));
}
}

0 comments on commit 8f30347

Please sign in to comment.