diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java index 36e2f560ba4..306d4da334b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java @@ -28,7 +28,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics; -public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener { +public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener { SimpleString getName(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index e71500fd17b..b835c38ace3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -555,7 +555,7 @@ public boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImp */ @Override public boolean removeMember(final long uniqueEventID, final String nodeId) { - if (splitBrainDetection && nodeId.equals(nodeManager.getNodeId().toString())) { + if (nodeId.equals(nodeManager.getNodeId().toString())) { ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId); return false; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java index 834e4509fa6..f03676fc90d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java @@ -42,6 +42,7 @@ public abstract class FileBasedNodeManager extends NodeManager { public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence"; private static final String ACCESS_MODE = "rw"; private final File directory; + protected File serverLockFile; private final Path activationSequencePath; protected FileChannel channel; protected FileChannel activationSequenceChannel; @@ -134,7 +135,7 @@ public void writeNodeActivationSequence(long sequence) throws NodeManagerExcepti * */ protected synchronized void setUpServerLockFile() throws IOException { - File serverLockFile = newFile(SERVER_LOCK_NAME); + serverLockFile = newFile(SERVER_LOCK_NAME); boolean fileCreated = false; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java index 2733672a945..873eabccd23 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; +import java.util.Date; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -45,8 +46,6 @@ public class FileLockNodeManager extends FileBasedNodeManager { private static final int BACKUP_LOCK_POS = 2; - private static final long LOCK_LENGTH = 1; - private static final byte LIVE = 'L'; private static final byte FAILINGBACK = 'F'; @@ -65,6 +64,8 @@ public class FileLockNodeManager extends FileBasedNodeManager { private final FileChannel[] lockChannels = new FileChannel[3]; + private long serverLockLastModified; + private final long lockAcquisitionTimeoutNanos; protected boolean interrupted = false; @@ -110,6 +111,7 @@ protected synchronized void setUpServerLockFile() throws IOException { super.setUpServerLockFile(); lockChannels[0] = channel; + serverLockLastModified = serverLockFile.lastModified(); for (int i = 1; i < 3; i++) { if (lockChannels[i] != null && lockChannels[i].isOpen()) { @@ -189,33 +191,32 @@ public void awaitLiveNode() throws NodeManagerException, InterruptedException { logger.debug("awaiting live node..."); do { byte state = getState(); - while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) { - if (logger.isDebugEnabled()) { - logger.debug("awaiting live node startup state='{}'", state); - } + while (state == NOT_STARTED || state == FIRST_TIME_START) { + logger.debug("awaiting live node startup state = '{}'", (char) state); Thread.sleep(2000); state = getState(); } - liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS); + liveLock = lock(LIVE_LOCK_POS); if (interrupted) { interrupted = false; throw new InterruptedException("Lock was interrupted"); } state = getState(); - if (state == FileLockNodeManager.PAUSED) { + if (state == PAUSED) { liveLock.release(); logger.debug("awaiting live node restarting"); Thread.sleep(2000); - } else if (state == FileLockNodeManager.FAILINGBACK) { + } else if (state == FAILINGBACK) { liveLock.release(); logger.debug("awaiting live node failing back"); Thread.sleep(2000); - } else if (state == FileLockNodeManager.LIVE) { - if (logger.isDebugEnabled()) { - logger.debug("acquired live node lock state = {}", (char) state); - } + } else if (state == LIVE) { + // if the backup acquires the file lock and the state is 'L' that means the primary died + logger.debug("acquired live node lock state = {}", (char) state); + serverLockFile.setLastModified(System.currentTimeMillis()); + logger.debug("touched {}; new time: {}", serverLockFile.getAbsoluteFile(), serverLockFile.lastModified()); break; } } @@ -305,7 +306,7 @@ public void awaitLiveStatus() throws NodeManagerException, InterruptedException } private void setLive() throws NodeManagerException { - writeFileLockStatus(FileLockNodeManager.LIVE); + writeFileLockStatus(LIVE); } private void setFailingBack() throws NodeManagerException { @@ -318,16 +319,14 @@ private void setPaused() throws NodeManagerException { /** * @param status - * @throws ActiveMQLockAcquisitionTimeoutException,IOException + * @throws NodeManagerException */ private void writeFileLockStatus(byte status) throws NodeManagerException { if (replicatedBackup && channel == null) { return; } - if (logger.isDebugEnabled()) { - logger.debug("writing status: {}", status); - } + logger.debug("writing status: {}", (char) status); ByteBuffer bb = ByteBuffer.allocateDirect(1); bb.put(status); bb.position(0); @@ -345,6 +344,8 @@ private void writeFileLockStatus(byte status) throws NodeManagerException { lock.release(); } } + serverLockLastModified = serverLockFile.lastModified(); + logger.debug("Modified {} at {}", serverLockFile.getName(), serverLockLastModified); } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) { throw new NodeManagerException(e); } @@ -371,9 +372,7 @@ private byte getState() throws NodeManagerException { } } - if (logger.isDebugEnabled()) { - logger.debug("state: {}", result); - } + logger.debug("state: {}", (char) result); return result; } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) { throw new NodeManagerException(e); @@ -400,17 +399,13 @@ public final SimpleString readNodeId() throws NodeManagerException { protected FileLock tryLock(final int lockPos) throws IOException { try { - if (logger.isDebugEnabled()) { - logger.debug("trying to lock position: {}", lockPos); - } + logger.debug("trying to lock position: {}", lockPos); FileLock lock = lockChannels[lockPos].tryLock(); - if (logger.isDebugEnabled()) { - if (lock != null) { - logger.debug("locked position: {}", lockPos); - } else { - logger.debug("failed to lock position: {}", lockPos); - } + if (lock != null) { + logger.debug("locked position: {}", lockPos); + } else { + logger.debug("failed to lock position: {}", lockPos); } return lock; @@ -429,16 +424,19 @@ protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTi FileLock lock = tryLock(lockPosition); isRecurringFailure = false; + logger.debug("lock: {}", lock); + + // even if the lock is valid it may have taken too long to acquire + if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) { + throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos)); + } + if (lock == null) { try { Thread.sleep(500); } catch (InterruptedException e) { return null; } - - if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) { - throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock"); - } } else { return lock; } @@ -456,7 +454,7 @@ protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTi if (this.lockAcquisitionTimeoutNanos != -1) { final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start); if (remainingTime <= 0) { - throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock"); + throw new ActiveMQLockAcquisitionTimeoutException("Timed out waiting for lock. Waited for " + TimeUnit.NANOSECONDS.toSeconds(lockAcquisitionTimeoutNanos)); } waitTime = Math.min(waitTime, remainingTime); } @@ -529,19 +527,23 @@ public void run() { } lostLock = isLiveLockLost(); if (!lostLock) { - logger.debug("Server still has the lock, double check status is live"); - // Java always thinks the lock is still valid even when there is no filesystem - // so we do another check - - // Should be able to retrieve the status unless something is wrong - // When EFS is gone, this locks. Which can be solved but is a lot of threading - // work where we need to - // manage the timeout ourselves and interrupt the thread used to claim the lock. + /* + * Java always thinks the lock is still valid even when there is no filesystem + * so we perform additional checks... + */ + + /* + * We should be able to retrieve the status unless something is wrong. When EFS is + * gone, this locks. Which can be solved but is a lot of threading work where we + * need to manage the timeout ourselves and interrupt the thread used to claim the + * lock. + */ + logger.debug("Lock appears to be valid; double check by reading status"); byte state = getState(); - if (state == LIVE) { - logger.debug("Status is set to live"); - } else { - logger.debug("Status is not live"); + + logger.debug("Lock appears to be valid; triple check by comparing timestamp"); + if (hasBeenModified(state)) { + lostLock = true; } } } catch (Exception exception) { @@ -554,9 +556,26 @@ public void run() { logger.warn("Lost the lock according to the monitor, notifying listeners"); notifyLostLock(); } - } - } + private boolean hasBeenModified(byte state) { + boolean modified = false; + + // Create a new instance of the File object so we can get the most up-to-date information on the file. + File freshServerLockFile = new File(serverLockFile.getAbsolutePath()); + if (freshServerLockFile.exists()) { + // the other broker competing for the lock may modify the state as 'F' when it starts so ensure the state is 'L' before returning true + if (freshServerLockFile.lastModified() > serverLockLastModified && state == LIVE) { + logger.debug("Lock file {} originally locked at {} was modified at {}", serverLockFile.getAbsolutePath(), new Date(serverLockLastModified), new Date(freshServerLockFile.lastModified())); + modified = true; + } + } else { + logger.debug("Lock file {} does not exist", serverLockFile.getAbsolutePath()); + modified = true; + } + + return modified; + } + } } diff --git a/pom.xml b/pom.xml index 390b5abb4f1..e47a8abe902 100644 --- a/pom.xml +++ b/pom.xml @@ -316,7 +316,7 @@ test - + org.easymock easymock diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FileLockTimeoutTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FileLockTimeoutTest.java index 86d32f37505..6308b55688b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FileLockTimeoutTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FileLockTimeoutTest.java @@ -96,6 +96,6 @@ public void run() { service.shutdown(); assertTrue("Expected to find AMQ224000", AssertionLoggerHandler.findText("AMQ224000")); - assertTrue("Expected to find \"timed out waiting for lock\"", AssertionLoggerHandler.findText("timed out waiting for lock")); + assertTrue("Expected to find \"Timed out waiting for lock\"", AssertionLoggerHandler.findText("Timed out waiting for lock")); } }