From b3f0a87e0db4832585a0460633889e13ef4bcbf8 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 17 Jan 2019 15:27:56 -0500 Subject: [PATCH] ARTEMIS-2200 NPE when calling journal.delete from Multiple Threads --- .../core/journal/impl/JournalImpl.java | 6 ++ .../openwire/OpenWireLargeMessageTest.java | 2 +- .../journal/impl/JournalImplTestUnit.java | 74 ++++++++++++++++--- 3 files changed, 72 insertions(+), 10 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index c9d0dce6e8e..f46ad92def1 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -964,6 +964,12 @@ record = records.remove(id); // computing the delete should be done after compacting is done if (record == null) { compactor.addCommandDelete(id, usedFile); + // JournalImplTestUni::testDoubleDelete was written to validate this condition: + if (compactor == null) { + logger.debug("Record " + id + " had been deleted already from a different call"); + } else { + compactor.addCommandDelete(id, usedFile); + } } else { record.delete(usedFile); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java index 01c56b7eb65..cdb646db95c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java @@ -167,7 +167,7 @@ public void testFastLargeMessageProducerDropOnPaging() throws Exception { } server.stop(); Assert.assertFalse(AssertionLoggerHandler.findText("NullPointerException")); - Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find record")); + Assert.assertFalse(AssertionLoggerHandler.findText("It was not possible to delete message")); } finally { AssertionLoggerHandler.stopCapture(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index ecc36a4e47d..25b2413845c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -19,6 +19,8 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; @@ -27,6 +29,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TestableJournal; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.unit.UnitTestLogger; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding; import org.apache.activemq.artemis.utils.RandomUtil; @@ -437,7 +440,10 @@ private int calculateRecordsPerFile(final int fileSize, final int alignment, int /** * Use: calculateNumberOfFiles (fileSize, numberOfRecords, recordSize, numberOfRecords2, recordSize2, , ...., numberOfRecordsN, recordSizeN); */ - private int calculateNumberOfFiles(TestableJournal journal, final int fileSize, final int alignment, final int... record) throws Exception { + private int calculateNumberOfFiles(TestableJournal journal, + final int fileSize, + final int alignment, + final int... record) throws Exception { if (journal != null) { journal.flush(); } @@ -1413,8 +1419,7 @@ public void testCommitRecordsInFileReclaim() throws Exception { @Test public void testCommitRecordsInFileNoReclaim() throws Exception { - setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + - 512, true); + setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true); createJournal(); startJournal(); load(); @@ -1497,8 +1502,7 @@ public void testCommitRecordsInFileNoReclaim() throws Exception { @Test public void testRollbackRecordsInFileNoReclaim() throws Exception { - setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + - 512, true); + setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true); createJournal(); startJournal(); load(); @@ -1589,8 +1593,7 @@ public void testRollbackRecordsInFileNoReclaim() throws Exception { @Test public void testEmptyPrepare() throws Exception { - setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + - 512, true); + setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true); createJournal(); startJournal(); load(); @@ -1624,8 +1627,7 @@ public void testEmptyPrepare() throws Exception { @Test public void testPrepareNoReclaim() throws Exception { - setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + - 512, true); + setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength, getAlignment()) + 512, true); createJournal(); startJournal(); load(); @@ -1998,6 +2000,60 @@ public void testMultipleAddUpdate() throws Exception { loadAndCheck(); } + @Test + public void testDoubleDelete() throws Exception { + + AssertionLoggerHandler.startCapture(); + try { + setup(10, 10 * 1024, true); + createJournal(); + startJournal(); + load(); + + byte[] record = generateRecord(100); + + add(1); + + // I'm not adding that to the test assertion, as it will be deleted anyway. + // the test assertion doesn't support multi-thread, so I'm calling the journal directly here + journal.appendAddRecord(2, (byte) 0, record, sync); + + Thread[] threads = new Thread[100]; + CountDownLatch alignLatch = new CountDownLatch(threads.length); + CountDownLatch startFlag = new CountDownLatch(1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + alignLatch.countDown(); + try { + startFlag.await(5, TimeUnit.SECONDS); + journal.appendDeleteRecord(2, false); + } catch (java.lang.IllegalStateException expected) { + } catch (Exception e) { + e.printStackTrace(); + } + }); + threads[i].start(); + } + + Assert.assertTrue(alignLatch.await(5, TimeUnit.SECONDS)); + startFlag.countDown(); + + for (Thread t : threads) { + t.join(TimeUnit.SECONDS.toMillis(10)); + Assert.assertFalse(t.isAlive()); + } + journal.flush(); + + Assert.assertFalse(AssertionLoggerHandler.findText("NullPointerException")); + stopJournal(); + createJournal(); + startJournal(); + loadAndCheck(); + } finally { + AssertionLoggerHandler.stopCapture(); + } + } + @Test public void testMultipleAddUpdateAll() throws Exception { setup(10, 10 * 1024, true);