From 0d0f012dc423d9068fdafeeb71aa99ea463e4854 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 19 Oct 2018 13:12:45 -0700 Subject: [PATCH] AMQ-7080 - Keep track of free pages - Update db.free file during checkpoints --- .../store/kahadb/disk/page/PageFile.java | 83 +++++- .../store/kahadb/disk/util/SequenceSet.java | 51 ++++ .../store/kahadb/disk/page/PageFileTest.java | 259 +++++++++++++++++- .../org/apache/activemq/bugs/AMQ3120Test.java | 2 +- .../org/apache/activemq/bugs/AMQ4323Test.java | 4 +- 5 files changed, 373 insertions(+), 26 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index fe79a2d6b10..d29f4fb5fb5 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -37,6 +38,10 @@ import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -78,6 +83,7 @@ public class PageFile { private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4; private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4; + // Recovery header is (long offset) private static final Logger LOG = LoggerFactory.getLogger(PageFile.class); @@ -93,6 +99,10 @@ public class PageFile { // File handle used for writing pages.. private RecoverableRandomAccessFile recoveryFile; + private RecoverableRandomAccessFile freePagesFile; + + private ExecutorService freePagesFlushExecutorService; + // The size of pages private int pageSize = DEFAULT_PAGE_SIZE; @@ -404,11 +414,18 @@ public void load() throws IOException, IllegalStateException { recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw"); } + freePagesFile = new RecoverableRandomAccessFile(getFreeFile(), "rw"); + + if(freePagesFlushExecutorService == null) { + freePagesFlushExecutorService = Executors.newSingleThreadExecutor(); + } + + if(freePagesFile.length() > 0) { + loadFreeList(); + } + if (metaData.isCleanShutdown()) { nextTxid.set(metaData.getLastTxId() + 1); - if (metaData.getFreePages() > 0) { - loadFreeList(); - } } else { LOG.debug(toString() + ", Recovering page file..."); nextTxid.set(redoRecoveryUpdates()); @@ -422,9 +439,10 @@ public void load() throws IOException, IllegalStateException { metaData.setCleanShutdown(false); storeMetaData(); - getFreeFile().delete(); + startWriter(); - if (needsFreePageRecovery) { + if (needsFreePageRecovery && freeList.getMetadata().getNextTxid() != nextTxid.get()) { + freeList.clear(); asyncFreePageRecovery(); } } else { @@ -501,6 +519,9 @@ public void unload() throws IOException { flush(); try { stopWriter(); + freePagesFlushExecutorService.shutdown(); + freePagesFlushExecutorService.awaitTermination(1, TimeUnit.SECONDS); + freePagesFlushExecutorService = null; } catch (InterruptedException e) { throw new InterruptedIOException(); } @@ -526,6 +547,11 @@ public void unload() throws IOException { readFile = null; writeFile.close(); writeFile = null; + + if (freePagesFile != null) { + freePagesFile.close(); + } + if (enableRecoveryFile) { recoveryFile.close(); recoveryFile = null; @@ -695,19 +721,43 @@ private void storeMetaData() throws IOException { writeFile.sync(); } + private void storeFreeListAsync() throws IOException { + freeList.getMetadata().setNextTxid(nextTxid.get()); + ByteArrayOutputStream bOutput = new ByteArrayOutputStream(12); + DataOutputStream out = new DataOutputStream(bOutput); + SequenceSet.Marshaller.INSTANCE.writePayload(freeList, out); + + freePagesFlushExecutorService.execute(() -> { + try { + freePagesFile.seek(0); + freePagesFile.write(bOutput.toByteArray()); + freePagesFile.sync(); + } catch (IOException ex) { + //ignored + } + }); + } + private void storeFreeList() throws IOException { - FileOutputStream os = new FileOutputStream(getFreeFile()); - DataOutputStream dos = new DataOutputStream(os); - SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); - dos.close(); + freeList.getMetadata().setNextTxid(nextTxid.get()); + freePagesFile.seek(0); + SequenceSet.Marshaller.INSTANCE.writePayload(freeList, freePagesFile); + freePagesFile.sync(); } private void loadFreeList() throws IOException { freeList.clear(); - FileInputStream is = new FileInputStream(getFreeFile()); - DataInputStream dis = new DataInputStream(is); - freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); - dis.close(); + try { + FileInputStream is = new FileInputStream(getFreeFile()); + DataInputStream dis = new DataInputStream(is); + freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); + dis.close(); + } catch (EOFException ex) { + //Corrupted db.free file + // As this file is being written on every checkpoint now, We can now have a partial write + // where the size of the sequenceSet (firstLong) is written but not the whole sequenceSet + getFreeFile().delete(); + } } /////////////////////////////////////////////////////////////////// @@ -833,6 +883,11 @@ public long getFreePageCount() { return freeList.rangeSize(); } + public SequenceSet getFreePageList() { + assertLoaded(); + return freeList; + } + public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { assertNotLoaded(); this.recoveryFileMinPageCount = recoveryFileMinPageCount; @@ -1158,6 +1213,7 @@ private void writeBatch() throws IOException { if (enableDiskSyncs) { recoveryFile.sync(); } + storeFreeListAsync(); } for (PageWrite w : batch) { @@ -1169,7 +1225,6 @@ private void writeBatch() throws IOException { if (enableDiskSyncs) { writeFile.sync(); } - } catch (IOException ioError) { LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError); // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java index fac831bf706..66a8121670b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java @@ -33,6 +33,31 @@ */ public class SequenceSet extends LinkedNodeList implements Iterable { + private final MetaData metadata; + + public static class MetaData { + long nextTxid = -1; + long freePageHashCheckpoint = -1; + + public long getFreePageHashCheckpoint() { + return freePageHashCheckpoint; + } + + public long getNextTxid() { + return nextTxid; + } + + public void setNextTxid(long value) { + nextTxid = value; + } + } + public SequenceSet() { + this.metadata = new MetaData(); + } + public MetaData getMetadata() { + return metadata; + } + public static class Marshaller implements org.apache.activemq.store.kahadb.disk.util.Marshaller { public static final Marshaller INSTANCE = new Marshaller(); @@ -40,14 +65,33 @@ public static class Marshaller implements org.apache.activemq.store.kahadb.disk. public SequenceSet readPayload(DataInput in) throws IOException { SequenceSet value = new SequenceSet(); int count = in.readInt(); + long readCheckpointHash = 0; + for (int i = 0; i < count; i++) { if( in.readBoolean() ) { Sequence sequence = new Sequence(in.readLong(), in.readLong()); value.addLast(sequence); + readCheckpointHash = readCheckpointHash ^ sequence.first; + readCheckpointHash = readCheckpointHash ^ sequence.last; } else { Sequence sequence = new Sequence(in.readLong()); value.addLast(sequence); + readCheckpointHash = readCheckpointHash ^ sequence.first; + } + } + + try { + long savedCheckpointHash = in.readLong(); + //Check if i can was not a partial write - Check Hash + if (savedCheckpointHash == readCheckpointHash) { + value.metadata.freePageHashCheckpoint = savedCheckpointHash; + value.metadata.nextTxid = in.readLong(); + } else { + value.clear(); + value = new SequenceSet(); } + } catch (Exception ex) { + /* This is a new Matadata that was not present in previous versions */ } return value; } @@ -55,17 +99,24 @@ public SequenceSet readPayload(DataInput in) throws IOException { public void writePayload(SequenceSet value, DataOutput out) throws IOException { out.writeInt(value.size()); Sequence sequence = value.getHead(); + value.metadata.freePageHashCheckpoint = 0; + while (sequence != null ) { if( sequence.range() > 1 ) { out.writeBoolean(true); out.writeLong(sequence.first); out.writeLong(sequence.last); + value.metadata.freePageHashCheckpoint = value.metadata.freePageHashCheckpoint ^ sequence.first; + value.metadata.freePageHashCheckpoint = value.metadata.freePageHashCheckpoint ^ sequence.last; } else { out.writeBoolean(false); out.writeLong(sequence.first); + value.metadata.freePageHashCheckpoint = value.metadata.freePageHashCheckpoint ^ sequence.first; } sequence = sequence.getNext(); } + out.writeLong(value.metadata.freePageHashCheckpoint); + out.writeLong(value.metadata.nextTxid); } public int getFixedSize() { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java index 3a5cefd56ce..9e1335343d3 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java @@ -16,20 +16,28 @@ */ package org.apache.activemq.store.kahadb.disk.page; +import junit.framework.TestCase; +import org.apache.activemq.store.kahadb.disk.util.SequenceSet; +import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.activemq.util.RecoverableRandomAccessFile; +import org.apache.activemq.util.Wait; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashSet; - -import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; - -import junit.framework.TestCase; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicBoolean; @SuppressWarnings("rawtypes") public class PageFileTest extends TestCase { @@ -227,13 +235,12 @@ public void testFreePageRecoveryUncleanShutdown() throws Exception { assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - pf2.flush(); long freePages = pf2.getFreePageCount(); LOG.info("free page count: " + freePages); return freePages == 10l; } - }, 12000000)); + }, 100000)); } finally { pf.unload(); pf2.unload(); @@ -259,6 +266,240 @@ public void testAllocatedAndUnusedAreFree() throws Exception { tx.rollback(); assertEquals(10, pf.getPageCount()); assertEquals(pf.getFreePageCount(), 10); + } + + //Test for AMQ-7080 + public void testFreePageRecoveryCleanShutdownAndRecoverFromDbFreeFile() throws Exception { + + int numberOfFreePages = 100; + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(true); + pf.load(); + + //Allocate free pages + Page firstPage = pf.allocate(numberOfFreePages * 2); + for (int i = 0; i < numberOfFreePages; i++) { + pf.freePage(firstPage.pageId + i); + } + pf.flush(); + + assertEquals(pf.getFreePageCount(), numberOfFreePages); + + // Reload it... clean shutdown + pf.unload(); + pf.load(); + + assertEquals(pf.getFreePageCount(), numberOfFreePages); + pf.unload(); + } + + //Test for AMQ-7080 + public void testFreePageRecoveryUncleanShutdownAndRecoverFromDbFreeFile() throws Exception { + AtomicBoolean asyncRecovery = verifyAsyncRecovery(); + int numberOfFreePages = 100; + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(true); + pf.load(); + + //Allocate free pages + Transaction tx = pf.tx(); + tx.allocate(numberOfFreePages); + tx.commit(); + + pf.flush(); + + assertEquals(pf.getFreePageCount(), numberOfFreePages); + + waitFreePageFileFlush(pf); + + //Load a second instance on the same directory fo the page file which + //simulates an unclean shutdown from the previous run + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.load(); + + assertEquals(pf.getFreePageCount(), pf2.getFreePageCount()); + + pf2.unload(); + pf.unload(); + assertFalse(asyncRecovery.get()); + } + + //Test for AMQ-7080 + public void testFreePageRecoveryUncleanShutdownAndDirtyDbFree() throws Exception { + AtomicBoolean asyncRecovery = verifyAsyncRecovery(); + int numberOfFreePages = 100; + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(true); + pf.load(); + + //Allocate free pages + Transaction tx = pf.tx(); + tx.allocate(numberOfFreePages); + tx.commit(); + pf.flush(); + + FileInputStream is = new FileInputStream(pf.getFreeFile()); + DataInputStream dis = new DataInputStream(is); + + SequenceSet freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); + + // Simulate outdated db.free + freeList.getMetadata().setNextTxid(freeList.getMetadata().getNextTxid() - 1); + freeList.remove(100); + FileOutputStream os = new FileOutputStream(pf.getFreeFile()); + DataOutputStream dos = new DataOutputStream(os); + SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); + + //Load a second instance on the same directory fo the page file which + //simulates an unclean shutdown from the previous run + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.load(); + + try { + assertTrue("We have 100 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == numberOfFreePages; + } + }, 100000)); + } finally { + pf.unload(); + pf2.unload(); + } + + assertTrue(asyncRecovery.get()); + } + + //Test for AMQ-7080 + public void testFreePageRecoveryUncleanShutdownAndPartialWriteDbFree() throws Exception { + AtomicBoolean asyncRecovery = verifyAsyncRecovery(); + int numberOfFreePages = 100; + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(true); + pf.load(); + + //Allocate free pages + Transaction tx = pf.tx(); + tx.allocate(numberOfFreePages); + tx.commit(); + pf.flush(); + + waitFreePageFileFlush(pf); + // Simulate a partial write + RecoverableRandomAccessFile freeFile = new RecoverableRandomAccessFile(pf.getFreeFile(), "rw"); + freeFile.seek(10); + freeFile.writeLong(170); + freeFile.sync(); + + //Load a second instance on the same directory fo the page file which + //simulates an unclean shutdown from the previous run + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.load(); + + try { + assertTrue("We have 200 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == numberOfFreePages; + } + }, 100000)); + } finally { + pf.unload(); + pf2.unload(); + } + assertTrue(asyncRecovery.get()); + } + + //Test for AMQ-7080 + public void testFreePageRecoveryUncleanShutdownAndWrongFreePageSize() throws Exception { + AtomicBoolean asyncRecovery = verifyAsyncRecovery(); + int numberOfFreePages = 100; + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(true); + pf.load(); + + //Allocate free pages + Transaction tx = pf.tx(); + tx.allocate(numberOfFreePages); + tx.commit(); + pf.flush(); + + waitFreePageFileFlush(pf); + // Simulate a partial write with free count bigger than what is written + RecoverableRandomAccessFile freeFile = new RecoverableRandomAccessFile(pf.getFreeFile(), "rw"); + freeFile.seek(0); + freeFile.writeInt(1000000); + freeFile.sync(); + + //Load a second instance on the same directory fo the page file which + //simulates an unclean shutdown from the previous run + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.load(); + + try { + assertTrue("We have 200 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == numberOfFreePages; + } + }, 100000)); + } finally { + pf.unload(); + pf2.unload(); + } + assertTrue(asyncRecovery.get()); + } + + private void waitFreePageFileFlush(final PageFile pf) throws Exception { + assertTrue("Free Page file saved", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + FileInputStream is = new FileInputStream(pf.getFreeFile()); + DataInputStream dis = new DataInputStream(is); + SequenceSet freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); + return freeList.getMetadata().getNextTxid() == pf.getFreePageList().getMetadata().getNextTxid(); + } catch (Exception ex) { + // This file is written async + return false; + } + } + }, 10000)); + } + + private AtomicBoolean verifyAsyncRecovery() { + AtomicBoolean asyncRecovery = new AtomicBoolean(false); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Recovering pageFile free list")) { + asyncRecovery.set(true); + } + } + }; + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(PageFile.class); + log4jLogger.addAppender(appender); + log4jLogger.setLevel(Level.DEBUG); + return asyncRecovery; } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java index 32e338dd5e2..c5c260f97a6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java @@ -116,7 +116,7 @@ public void testCleanupOfFiles() throws Exception { final int messageCount = 500; startBroker(true); int fileCount = getFileCount(kahaDbDir); - assertEquals(4, fileCount); + assertEquals(5, fileCount); Connection connection = new ActiveMQConnectionFactory( broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java index efb3c715555..b5afd4972d3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java @@ -118,7 +118,7 @@ public void testCleanupOfFiles() throws Exception { final int messageCount = 500; startBroker(true); int fileCount = getFileCount(kahaDbDir); - assertEquals(4, fileCount); + assertEquals(5, fileCount); Connection connection = new ActiveMQConnectionFactory( broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); @@ -151,7 +151,7 @@ protected Message createMessage(int i) throws Exception { public boolean isSatisified() throws Exception { int fileCount = getFileCount(kahaDbDir); LOG.info("current filecount:" + fileCount); - return 4 == fileCount; + return 5 == fileCount; } }));