Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;

public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {

SimpleString getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImp
*/
@Override
public boolean removeMember(final long uniqueEventID, final String nodeId) {
if (splitBrainDetection && nodeId.equals(nodeManager.getNodeId().toString())) {
if (nodeId.equals(nodeManager.getNodeId().toString())) {
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class FileBasedNodeManager extends NodeManager {
public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence";
private static final String ACCESS_MODE = "rw";
private final File directory;
protected File serverLockFile;
private final Path activationSequencePath;
protected FileChannel channel;
protected FileChannel activationSequenceChannel;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void writeNodeActivationSequence(long sequence) throws NodeManagerExcepti
* </ol>
*/
protected synchronized void setUpServerLockFile() throws IOException {
File serverLockFile = newFile(SERVER_LOCK_NAME);
serverLockFile = newFile(SERVER_LOCK_NAME);

boolean fileCreated = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -45,8 +46,6 @@ public class FileLockNodeManager extends FileBasedNodeManager {

private static final int BACKUP_LOCK_POS = 2;

private static final long LOCK_LENGTH = 1;

private static final byte LIVE = 'L';

private static final byte FAILINGBACK = 'F';
Expand All @@ -65,6 +64,8 @@ public class FileLockNodeManager extends FileBasedNodeManager {

private final FileChannel[] lockChannels = new FileChannel[3];

private long serverLockLastModified;

private final long lockAcquisitionTimeoutNanos;

protected boolean interrupted = false;
Expand Down Expand Up @@ -110,6 +111,7 @@ protected synchronized void setUpServerLockFile() throws IOException {
super.setUpServerLockFile();

lockChannels[0] = channel;
serverLockLastModified = serverLockFile.lastModified();

for (int i = 1; i < 3; i++) {
if (lockChannels[i] != null && lockChannels[i].isOpen()) {
Expand Down Expand Up @@ -189,33 +191,32 @@ public void awaitLiveNode() throws NodeManagerException, InterruptedException {
logger.debug("awaiting live node...");
do {
byte state = getState();
while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
if (logger.isDebugEnabled()) {
logger.debug("awaiting live node startup state='{}'", state);
}
while (state == NOT_STARTED || state == FIRST_TIME_START) {
logger.debug("awaiting live node startup state = '{}'", (char) state);

Thread.sleep(2000);
state = getState();
}

liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
liveLock = lock(LIVE_LOCK_POS);
if (interrupted) {
interrupted = false;
throw new InterruptedException("Lock was interrupted");
}
state = getState();
if (state == FileLockNodeManager.PAUSED) {
if (state == PAUSED) {
liveLock.release();
logger.debug("awaiting live node restarting");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.FAILINGBACK) {
} else if (state == FAILINGBACK) {
liveLock.release();
logger.debug("awaiting live node failing back");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.LIVE) {
if (logger.isDebugEnabled()) {
logger.debug("acquired live node lock state = {}", (char) state);
}
} else if (state == LIVE) {
// if the backup acquires the file lock and the state is 'L' that means the primary died
logger.debug("acquired live node lock state = {}", (char) state);
serverLockFile.setLastModified(System.currentTimeMillis());
logger.debug("touched {}; new time: {}", serverLockFile.getAbsoluteFile(), serverLockFile.lastModified());
break;
}
}
Expand Down Expand Up @@ -305,7 +306,7 @@ public void awaitLiveStatus() throws NodeManagerException, InterruptedException
}

private void setLive() throws NodeManagerException {
writeFileLockStatus(FileLockNodeManager.LIVE);
writeFileLockStatus(LIVE);
}

private void setFailingBack() throws NodeManagerException {
Expand All @@ -318,16 +319,14 @@ private void setPaused() throws NodeManagerException {

/**
* @param status
* @throws ActiveMQLockAcquisitionTimeoutException,IOException
* @throws NodeManagerException
*/
private void writeFileLockStatus(byte status) throws NodeManagerException {
if (replicatedBackup && channel == null) {
return;
}

if (logger.isDebugEnabled()) {
logger.debug("writing status: {}", status);
}
logger.debug("writing status: {}", (char) status);
ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status);
bb.position(0);
Expand All @@ -345,6 +344,8 @@ private void writeFileLockStatus(byte status) throws NodeManagerException {
lock.release();
}
}
serverLockLastModified = serverLockFile.lastModified();
logger.debug("Modified {} at {}", serverLockFile.getName(), serverLockLastModified);
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
Expand All @@ -371,9 +372,7 @@ private byte getState() throws NodeManagerException {
}
}

if (logger.isDebugEnabled()) {
logger.debug("state: {}", result);
}
logger.debug("state: {}", (char) result);
return result;
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
Expand All @@ -400,17 +399,13 @@ public final SimpleString readNodeId() throws NodeManagerException {

protected FileLock tryLock(final int lockPos) throws IOException {
try {
if (logger.isDebugEnabled()) {
logger.debug("trying to lock position: {}", lockPos);
}
logger.debug("trying to lock position: {}", lockPos);

FileLock lock = lockChannels[lockPos].tryLock();
if (logger.isDebugEnabled()) {
if (lock != null) {
logger.debug("locked position: {}", lockPos);
} else {
logger.debug("failed to lock position: {}", lockPos);
}
if (lock != null) {
logger.debug("locked position: {}", lockPos);
} else {
logger.debug("failed to lock position: {}", lockPos);
}

return lock;
Expand All @@ -429,16 +424,19 @@ protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTi
FileLock lock = tryLock(lockPosition);
isRecurringFailure = false;

logger.debug("lock: {}", lock);

// even if the lock is valid it may have taken too long to acquire
if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos));
}

if (lock == null) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
return null;
}

if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
}
} else {
return lock;
}
Expand All @@ -456,7 +454,7 @@ protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTi
if (this.lockAcquisitionTimeoutNanos != -1) {
final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start);
if (remainingTime <= 0) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos));
}
waitTime = Math.min(waitTime, remainingTime);
}
Expand Down Expand Up @@ -529,19 +527,23 @@ public void run() {
}
lostLock = isLiveLockLost();
if (!lostLock) {
logger.debug("Server still has the lock, double check status is live");
// Java always thinks the lock is still valid even when there is no filesystem
// so we do another check

// Should be able to retrieve the status unless something is wrong
// When EFS is gone, this locks. Which can be solved but is a lot of threading
// work where we need to
// manage the timeout ourselves and interrupt the thread used to claim the lock.
/*
* Java always thinks the lock is still valid even when there is no filesystem
* so we perform additional checks...
*/

/*
* We should be able to retrieve the status unless something is wrong. When EFS is
* gone, this locks. Which can be solved but is a lot of threading work where we
* need to manage the timeout ourselves and interrupt the thread used to claim the
* lock.
*/
logger.debug("Lock appears to be valid; double check by reading status");
byte state = getState();
if (state == LIVE) {
logger.debug("Status is set to live");
} else {
logger.debug("Status is not live");

logger.debug("Lock appears to be valid; triple check by comparing timestamp");
if (hasBeenModified(state)) {
lostLock = true;
}
}
} catch (Exception exception) {
Expand All @@ -554,9 +556,26 @@ public void run() {
logger.warn("Lost the lock according to the monitor, notifying listeners");
notifyLostLock();
}

}

}
private boolean hasBeenModified(byte state) {
boolean modified = false;

// Create a new instance of the File object so we can get the most up-to-date information on the file.
File freshServerLockFile = new File(serverLockFile.getAbsolutePath());

if (freshServerLockFile.exists()) {
// the other broker competing for the lock may modify the state as 'F' when it starts so ensure the state is 'L' before returning true
if (freshServerLockFile.lastModified() > serverLockLastModified && state == LIVE) {
logger.debug("Lock file {} originally locked at {} was modified at {}", serverLockFile.getAbsolutePath(), new Date(serverLockLastModified), new Date(freshServerLockFile.lastModified()));
modified = true;
}
} else {
logger.debug("Lock file {} does not exist", serverLockFile.getAbsolutePath());
modified = true;
}

return modified;
}
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@
<scope>test</scope>
<!-- License: EPL 1.0 -->
</dependency>

<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ public void run() {
service.shutdown();

assertTrue("Expected to find AMQ224000", AssertionLoggerHandler.findText("AMQ224000"));
assertTrue("Expected to find \"timed out waiting for lock\"", AssertionLoggerHandler.findText("timed out waiting for lock"));
assertTrue("Expected to find \"Timed out waiting for lock\"", AssertionLoggerHandler.findText("Timed out waiting for lock"));
}
}