Skip to content

Commit

Permalink
ARTEMIS-2347 JournalStorageManager::stopReplication can deadlock whil…
Browse files Browse the repository at this point in the history
…e stopping

AbstractJournalStorageManager::performCachedLargeMessageDeletes
must enforce acquisition of manager write lock (as documented)
to avoid unlucky racing calls of stopReplication while stopping
to deadlock.
  • Loading branch information
franz1981 authored and clebertsuconic committed May 29, 2019
1 parent a7641e6 commit 0d273d2
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 120 deletions.
Expand Up @@ -323,19 +323,24 @@ public void run() {
*/
@Override
protected void performCachedLargeMessageDeletes() {
largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE);
try {
msg.delete();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId);
}
if (replicator != null) {
replicator.largeMessageDelete(messageId, JournalStorageManager.this);
}
confirmLargeMessage(largeServerMessage);
});
largeMessagesToDelete.clear();
storageManagerLock.writeLock().lock();
try {
largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE);
try {
msg.delete();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId);
}
if (replicator != null) {
replicator.largeMessageDelete(messageId, JournalStorageManager.this);
}
confirmLargeMessage(largeServerMessage);
});
largeMessagesToDelete.clear();
} finally {
storageManagerLock.writeLock().unlock();
}
}

protected SequentialFile createFileForLargeMessage(final long messageID, final boolean durable) {
Expand Down
Expand Up @@ -15,134 +15,153 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

public class JournalStorageManagerTest {

ScheduledExecutorService dumbScheduler = new ScheduledExecutorService() {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return null;
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return null;
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return null;
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return null;
}

@Override
public void shutdown() {

}

@Override
public List<Runnable> shutdownNow() {
return null;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return null;
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return null;
}

@Override
public Future<?> submit(Runnable task) {
return null;
}
import static java.util.stream.Collectors.toList;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return null;
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException {
return null;
}
@RunWith(Parameterized.class)
public class JournalStorageManagerTest {

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Parameterized.Parameter
public JournalType journalType;

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Parameterized.Parameters(name = "journal type={0}")
public static Collection<Object[]> getParams() {
final JournalType[] values = JournalType.values();
return Stream.of(JournalType.values())
.map(journalType -> new Object[]{journalType})
.collect(toList());
}

@Override
public void execute(Runnable command) {
private static ExecutorService executor;
private static ExecutorService ioExecutor;
private static ExecutorService testExecutor;

}
};
@BeforeClass
public static void initExecutors() {
executor = Executors.newSingleThreadExecutor();
//to allow concurrent compaction and I/O operations
ioExecutor = Executors.newFixedThreadPool(2);
testExecutor = Executors.newSingleThreadExecutor();
}

ExecutorFactory dumbExecutor = new ExecutorFactory() {
@Override
public ArtemisExecutor getExecutor() {
return new ArtemisExecutor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
}
};
@AfterClass
public static void destroyExecutors() {
ioExecutor.shutdownNow();
executor.shutdownNow();
testExecutor.shutdownNow();
}

/**
* Test of fixJournalFileSize method, of class JournalStorageManager.
*/
@Test
public void testFixJournalFileSize() {
JournalStorageManager manager = new JournalStorageManager(new ConfigurationImpl(), null, dumbExecutor, dumbScheduler, dumbExecutor);
if (journalType == JournalType.ASYNCIO) {
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
}
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096));
Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096));
Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096));
}

@Test(timeout = 20_000)
public void testStopReplicationDoesNotDeadlockWhileStopping() throws Exception {
if (journalType == JournalType.ASYNCIO) {
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
}
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
manager.start();
manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
final PostOffice postOffice = mock(PostOffice.class);
final JournalLoader journalLoader = mock(JournalLoader.class);
manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader);
final ReplicationManager replicationManager = mock(ReplicationManager.class);
final PagingManager pagingManager = mock(PagingManager.class);
when(pagingManager.getStoreNames()).thenReturn(new SimpleString[0]);
manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, 0);
final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage());
largeMessage.setDurable(true);
when(replicationManager.isSynchronizing()).thenReturn(true);
largeMessage.deleteFile();
final long pendingRecordID = largeMessage.getPendingRecordID();
final AtomicReference<CompletableFuture<Void>> stopReplication = new AtomicReference<>();
doAnswer(invocation -> {
final CompletableFuture<Void> finished = new CompletableFuture<>();
final CountDownLatch beforeStopReplication;
if (stopReplication.compareAndSet(null, finished)) {
beforeStopReplication = new CountDownLatch(1);
//before the deadlock fix:
//manager::stop is already owning the large message lock here
//but not yet the manager read lock
testExecutor.execute(() -> {
beforeStopReplication.countDown();
try {
//it needs to acquire the manager write lock
//and large message lock next
manager.stopReplication();
finished.complete(null);
} catch (Exception e) {
finished.completeExceptionally(e);
}
});
} else {
beforeStopReplication = null;
}
if (beforeStopReplication != null) {
beforeStopReplication.await();
//do not remove this sleep: before the deadlock fix
//it was needed to give manager::stopReplication the chance to acquire
//the manager write lock before manager::stop
TimeUnit.MILLISECONDS.sleep(500);
}
//confirmPendingLargeMessage will acquire manager read lock
return invocation.callRealMethod();
}).when(manager).confirmPendingLargeMessage(pendingRecordID);
manager.stop();
final CompletableFuture<Void> stoppedReplication = stopReplication.get();
Assert.assertNotNull(stoppedReplication);
stoppedReplication.get();
}

}

0 comments on commit 0d273d2

Please sign in to comment.