Skip to content

Commit

Permalink
#389 More tests and some documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
dpisklov committed Oct 25, 2017
1 parent acdc0b4 commit e099848
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 11 deletions.
9 changes: 9 additions & 0 deletions README.adoc
Expand Up @@ -799,6 +799,15 @@ An Appender is something like an iterator in Chronicle environment. You add data
+
Each Chronicle queue is composed of excerpts. Putting data to a Chronicle queue means starting a new excerpt, writing data into it, and finishing the excerpt at the end.

=== File Rolling

Chronicle Queue is designed to roll its files depending on the roll cycle chosen when queue is created (see https://github.com/OpenHFT/Chronicle-Queue/blob/master/src/main/java/net/openhft/chronicle/queue/RollCycles.java[RollCycles]).
When the roll cycle reaches the point it should roll, appender will atomically writes `EOF` mark at the end of current file to indicate that no other appender should write to this file and no tailer should read further, and instead everyone
should use new file.

If the process was shutdown, and restarted later when the roll cycle should be using a new file, appender will try to locate old file and write `EOF` mark in it to help tailers reading it. However, tailers are robust enough to understand that the `EOF` mark should
be present in the file from previous roll cycle even if it's not written, after certain timeout.

== Changes from Chronicle Queue v3

Chronicle Queue v4 solves a number of issues that existed in Chronicle Queue v3.
Expand Down
Expand Up @@ -47,6 +47,7 @@ public final class ChronicleReader {
private long startIndex = UNSET_VALUE;
private boolean tailInputSource = false;
private long maxHistoryRecords = UNSET_VALUE;
private boolean readOnly = true;
private Consumer<String> messageSink;
private Function<ExcerptTailer, DocumentContext> pollMethod = ExcerptTailer::readingDocument;
private Supplier<QueueEntryHandler> entryHandlerFactory = MessageToTextQueueEntryHandler::new;
Expand Down Expand Up @@ -97,6 +98,11 @@ public void execute() {
}
}

ChronicleReader withReadOnly(boolean readOnly) {
this.readOnly = readOnly;
return this;
}

public ChronicleReader withMessageSink(final Consumer<String> messageSink) {
this.messageSink = messageSink;
return this;
Expand Down Expand Up @@ -184,7 +190,7 @@ private SingleChronicleQueue createQueue() {
return SingleChronicleQueueBuilder
.binary(basePath.toFile())
.testBlockSize()
.readOnly(true)
.readOnly(readOnly)
.build();
}

Expand Down
Expand Up @@ -15,17 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.openhft.chronicle.queue.impl.single;
package net.openhft.chronicle.queue.reader;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.reader.ChronicleReader;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireType;
Expand All @@ -34,20 +35,22 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.Calendar;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.SUFFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

public class RollEOFTest {

public static final long TIMEOUT = Jvm.isDebug() ? 500000L : 5000L;
private final File path = DirectoryUtils.tempDir(getClass().getName());

@Test(timeout = 5000L)
Expand All @@ -71,7 +74,7 @@ public void testRollWritesEOF() throws Exception {
assertEquals(4, l.size());
}

@Test//(timeout = 5000L)
@Test(timeout = 5000L)
public void testRollWithoutEOFDoesntBlowup() throws Exception {

final MutableTimeProvider timeProvider = new MutableTimeProvider();
Expand Down Expand Up @@ -99,6 +102,34 @@ public void testRollWithoutEOFDoesntBlowup() throws Exception {
assertEquals(4, l.size());
}

@Test(timeout = 5000L)
public void testRollWithoutEOF() throws Exception {

final MutableTimeProvider timeProvider = new MutableTimeProvider();
Calendar cal = Calendar.getInstance();
cal.add(Calendar.DAY_OF_MONTH, -3);
timeProvider.setTime(cal.getTimeInMillis());
createQueueAndWriteData(timeProvider);
assertEquals(1, getNumberOfQueueFiles());

// adjust time
timeProvider.setTime(System.currentTimeMillis());
createQueueAndWriteData(timeProvider);
assertEquals(2, getNumberOfQueueFiles());

Optional<Path> firstQueueFile = Files.list(path.toPath()).filter(p -> p.toString().endsWith(SUFFIX)).sorted().findFirst();

assertTrue(firstQueueFile.isPresent());

// remove EOF from first file
removeEOF(firstQueueFile.get());

List<String> l = new LinkedList<>();
new ChronicleReader().withMessageSink(l::add).withBasePath(path.toPath()).withReadOnly(false).execute();
// 2 entries per message
assertEquals(4, l.size());
}

private void removeEOF(Path path) throws IOException {
long blockSize = 64 << 10;
long chunkSize = OS.pageAlign(blockSize);
Expand All @@ -111,7 +142,7 @@ private void removeEOF(Path path) throws IOException {
bytes.readLimit(bytes.capacity());
bytes.readSkip(4);
// move past header
try (final SingleChronicleQueueStore qs = SingleChronicleQueueBuilder.loadStore(wire)) {
try (final SingleChronicleQueueStore qs = loadStore(wire)) {
assertNotNull(qs);
long l = qs.writePosition();
long len = Wires.lengthOf(bytes.readVolatileInt(l));
Expand All @@ -124,6 +155,16 @@ private void removeEOF(Path path) throws IOException {
}
}

private SingleChronicleQueueStore loadStore(Wire wire) {
try {
Method loadStoreMethod = SingleChronicleQueueBuilder.class.getDeclaredMethod("loadStore", Wire.class);
loadStoreMethod.setAccessible(true);
return (SingleChronicleQueueStore) loadStoreMethod.invoke(null, wire);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private long getNumberOfQueueFiles() throws IOException {
return getQueueFilesStream().count();
}
Expand Down

0 comments on commit e099848

Please sign in to comment.