From e0998482ec0edf080aa50ea70affcd332480fc59 Mon Sep 17 00:00:00 2001 From: Dmitry Pisklov Date: Tue, 24 Oct 2017 17:08:52 +0100 Subject: [PATCH] #389 More tests and some documentation --- README.adoc | 9 +++ .../queue/reader/ChronicleReader.java | 8 ++- .../{impl/single => reader}/RollEOFTest.java | 61 ++++++++++++++++--- 3 files changed, 67 insertions(+), 11 deletions(-) rename src/test/java/net/openhft/chronicle/queue/{impl/single => reader}/RollEOFTest.java (72%) diff --git a/README.adoc b/README.adoc index bd612757d8..da76e51712 100755 --- a/README.adoc +++ b/README.adoc @@ -799,6 +799,15 @@ An Appender is something like an iterator in Chronicle environment. You add data + Each Chronicle queue is composed of excerpts. Putting data to a Chronicle queue means starting a new excerpt, writing data into it, and finishing the excerpt at the end. +=== File Rolling + +Chronicle Queue is designed to roll its files depending on the roll cycle chosen when queue is created (see https://github.com/OpenHFT/Chronicle-Queue/blob/master/src/main/java/net/openhft/chronicle/queue/RollCycles.java[RollCycles]). +When the roll cycle reaches the point it should roll, appender will atomically writes `EOF` mark at the end of current file to indicate that no other appender should write to this file and no tailer should read further, and instead everyone +should use new file. + +If the process was shutdown, and restarted later when the roll cycle should be using a new file, appender will try to locate old file and write `EOF` mark in it to help tailers reading it. However, tailers are robust enough to understand that the `EOF` mark should +be present in the file from previous roll cycle even if it's not written, after certain timeout. + == Changes from Chronicle Queue v3 Chronicle Queue v4 solves a number of issues that existed in Chronicle Queue v3. diff --git a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java index 324044e11a..e3a07668b9 100755 --- a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java @@ -47,6 +47,7 @@ public final class ChronicleReader { private long startIndex = UNSET_VALUE; private boolean tailInputSource = false; private long maxHistoryRecords = UNSET_VALUE; + private boolean readOnly = true; private Consumer messageSink; private Function pollMethod = ExcerptTailer::readingDocument; private Supplier entryHandlerFactory = MessageToTextQueueEntryHandler::new; @@ -97,6 +98,11 @@ public void execute() { } } + ChronicleReader withReadOnly(boolean readOnly) { + this.readOnly = readOnly; + return this; + } + public ChronicleReader withMessageSink(final Consumer messageSink) { this.messageSink = messageSink; return this; @@ -184,7 +190,7 @@ private SingleChronicleQueue createQueue() { return SingleChronicleQueueBuilder .binary(basePath.toFile()) .testBlockSize() - .readOnly(true) + .readOnly(readOnly) .build(); } diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/RollEOFTest.java b/src/test/java/net/openhft/chronicle/queue/reader/RollEOFTest.java similarity index 72% rename from src/test/java/net/openhft/chronicle/queue/impl/single/RollEOFTest.java rename to src/test/java/net/openhft/chronicle/queue/reader/RollEOFTest.java index 299706db04..1a9eb455db 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/RollEOFTest.java +++ b/src/test/java/net/openhft/chronicle/queue/reader/RollEOFTest.java @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.openhft.chronicle.queue.impl.single; +package net.openhft.chronicle.queue.reader; import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.MappedBytes; -import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.OS; import net.openhft.chronicle.core.time.TimeProvider; import net.openhft.chronicle.queue.DirectoryUtils; import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.RollCycles; -import net.openhft.chronicle.queue.reader.ChronicleReader; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore; import net.openhft.chronicle.wire.DocumentContext; import net.openhft.chronicle.wire.Wire; import net.openhft.chronicle.wire.WireType; @@ -34,20 +35,22 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; +import java.util.Calendar; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.SUFFIX; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class RollEOFTest { - public static final long TIMEOUT = Jvm.isDebug() ? 500000L : 5000L; private final File path = DirectoryUtils.tempDir(getClass().getName()); @Test(timeout = 5000L) @@ -71,7 +74,7 @@ public void testRollWritesEOF() throws Exception { assertEquals(4, l.size()); } - @Test//(timeout = 5000L) + @Test(timeout = 5000L) public void testRollWithoutEOFDoesntBlowup() throws Exception { final MutableTimeProvider timeProvider = new MutableTimeProvider(); @@ -99,6 +102,34 @@ public void testRollWithoutEOFDoesntBlowup() throws Exception { assertEquals(4, l.size()); } + @Test(timeout = 5000L) + public void testRollWithoutEOF() throws Exception { + + final MutableTimeProvider timeProvider = new MutableTimeProvider(); + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.DAY_OF_MONTH, -3); + timeProvider.setTime(cal.getTimeInMillis()); + createQueueAndWriteData(timeProvider); + assertEquals(1, getNumberOfQueueFiles()); + + // adjust time + timeProvider.setTime(System.currentTimeMillis()); + createQueueAndWriteData(timeProvider); + assertEquals(2, getNumberOfQueueFiles()); + + Optional firstQueueFile = Files.list(path.toPath()).filter(p -> p.toString().endsWith(SUFFIX)).sorted().findFirst(); + + assertTrue(firstQueueFile.isPresent()); + + // remove EOF from first file + removeEOF(firstQueueFile.get()); + + List l = new LinkedList<>(); + new ChronicleReader().withMessageSink(l::add).withBasePath(path.toPath()).withReadOnly(false).execute(); + // 2 entries per message + assertEquals(4, l.size()); + } + private void removeEOF(Path path) throws IOException { long blockSize = 64 << 10; long chunkSize = OS.pageAlign(blockSize); @@ -111,7 +142,7 @@ private void removeEOF(Path path) throws IOException { bytes.readLimit(bytes.capacity()); bytes.readSkip(4); // move past header - try (final SingleChronicleQueueStore qs = SingleChronicleQueueBuilder.loadStore(wire)) { + try (final SingleChronicleQueueStore qs = loadStore(wire)) { assertNotNull(qs); long l = qs.writePosition(); long len = Wires.lengthOf(bytes.readVolatileInt(l)); @@ -124,6 +155,16 @@ private void removeEOF(Path path) throws IOException { } } + private SingleChronicleQueueStore loadStore(Wire wire) { + try { + Method loadStoreMethod = SingleChronicleQueueBuilder.class.getDeclaredMethod("loadStore", Wire.class); + loadStoreMethod.setAccessible(true); + return (SingleChronicleQueueStore) loadStoreMethod.invoke(null, wire); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + private long getNumberOfQueueFiles() throws IOException { return getQueueFilesStream().count(); }