Skip to content

Commit

Permalink
re-enable RollCycleMultiThreadStressTest (for short duration - 2 secs…
Browse files Browse the repository at this point in the history
…), make RollCycleTest.newRollCycleIgnored2 fail more consistently, speed up tests using TimeProvider, improve logic for skipping empty files in next0
  • Loading branch information
JerryShea committed Jan 10, 2018
1 parent 76f8890 commit 4b6519d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 122 deletions.
Expand Up @@ -1155,13 +1155,19 @@ private boolean next0(boolean includeMetaData) throws UnrecoverableTimeoutExcept
state = FOUND_CYCLE;
continue;
}
if (state != END_OF_CYCLE) {
// Winding back to the previous cycle results in a re-initialisation of all the objects => garbage
int nextCycle = queue.rollCycle().toCycle(nextIndex);
cycle(nextCycle, false);
if (state == END_OF_CYCLE)
continue;
if (cycle < queue.lastCycle()) {
// we have encountered an empty file without an EOF marker
// TODO: more work needed - I thought that the appender and/or tailer would write an EOF into this file
state = END_OF_CYCLE;
continue;
}
continue;
// We are here because we are waiting for an entry to be written to this file.
// Winding back to the previous cycle results in a re-initialisation of all the objects => garbage
int nextCycle = queue.rollCycle().toCycle(nextIndex);
cycle(nextCycle, false);
state = CYCLE_NOT_FOUND;
} else {
state = END_OF_CYCLE;
}
Expand Down
12 changes: 7 additions & 5 deletions src/test/java/net/openhft/chronicle/queue/LastAppendedTest.java
@@ -1,6 +1,7 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.MethodReader;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -11,10 +12,11 @@

public class LastAppendedTest extends ChronicleQueueTestBase {
@Test
public void testLastWritten() throws InterruptedException {
public void testLastWritten() {
SetTimeProvider timeProvider = new SetTimeProvider();

try (SingleChronicleQueue outQueue = single(getTmpDir()).rollCycle(RollCycles.TEST_SECONDLY).sourceId(1).build()) {
try (SingleChronicleQueue inQueue = single(getTmpDir()).rollCycle(RollCycles.TEST_SECONDLY).sourceId(2).build()) {
try (SingleChronicleQueue outQueue = single(getTmpDir()).rollCycle(RollCycles.TEST_SECONDLY).sourceId(1).timeProvider(timeProvider).build()) {
try (SingleChronicleQueue inQueue = single(getTmpDir()).rollCycle(RollCycles.TEST_SECONDLY).sourceId(2).timeProvider(timeProvider).build()) {

// write some initial data to the inqueue
final Msg msg = inQueue.acquireAppender()
Expand All @@ -24,7 +26,7 @@ public void testLastWritten() throws InterruptedException {

msg.msg("somedata-0");

Thread.sleep(1000);
timeProvider.advanceMillis(1000);

// write data into the inQueue
msg.msg("somedata-1");
Expand All @@ -47,7 +49,7 @@ public void testLastWritten() throws InterruptedException {
// write data into the inQueue
msg.msg("somedata-2");

Thread.sleep(2000);
timeProvider.advanceMillis(2000);

msg.msg("somedata-3");
msg.msg("somedata-4");
Expand Down
Expand Up @@ -45,7 +45,7 @@ public static Collection<Object[]> data() {
}

@Test
public void testCountExcerptsWhenTheCycleIsRolled() throws Exception {
public void testCountExcerptsWhenTheCycleIsRolled() {

final AtomicLong time = new AtomicLong();

Expand Down Expand Up @@ -347,12 +347,12 @@ public void testCountExcerptsWhenTheCycleIsRolled() throws Exception {
}

@Test
public void testTailingWithEmptyCycles() throws Exception {
public void testTailingWithEmptyCycles() {
testTailing(p -> { p.execute(); return 1; });
}

@Test
public void testTailingWithMissingCycles() throws Exception {
public void testTailingWithMissingCycles() {
testTailing(p -> 0);
}

Expand Down
Expand Up @@ -8,14 +8,12 @@
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.ValueOut;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -36,12 +34,11 @@
public class RollCycleMultiThreadStressTest {
private static final Logger LOG = LoggerFactory.getLogger(RollCycleMultiThreadStressTest.class);
private static final long SLEEP_PER_WRITE_NANOS = Long.getLong("writeLatency", 10_000);
private static final int TEST_TIME = Integer.getInteger("testTime", 90);
private static final int TEST_TIME = Integer.getInteger("testTime", 2);
static final int NUMBER_OF_INTS = 18;//1060 / 4;

@Ignore("long running")
@Test
public void stress() throws Exception {
public void stress() {
final File path = Optional.ofNullable(System.getProperty("stress.test.dir")).
map(s -> new File(s, UUID.randomUUID().toString())).
orElse(DirectoryUtils.tempDir("rollCycleStress"));
Expand Down
Expand Up @@ -2,6 +2,7 @@

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MappedFile;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
Expand All @@ -12,7 +13,6 @@
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;

import java.io.File;
Expand All @@ -21,12 +21,13 @@
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RollCycleTest {
@Test
public void newRollCycleIgnored() throws Exception {
File path = DirectoryUtils.tempDir("newRollCycleIgnored");
TestTimeProvider timeProvider = new TestTimeProvider();
SetTimeProvider timeProvider = new SetTimeProvider();
ParallelQueueObserver observer = new ParallelQueueObserver(timeProvider, path.toPath());

try (SingleChronicleQueue queue = SingleChronicleQueueBuilder
Expand All @@ -43,12 +44,14 @@ public void newRollCycleIgnored() throws Exception {
observer.await();

// two days pass
timeProvider.add(TimeUnit.DAYS.toMillis(2));
timeProvider.advanceMillis(TimeUnit.DAYS.toMillis(2));

appender.writeText("Day 3 data");

// allow parallel tailer to finish iteration
Thread.sleep(2000);
for (int i = 0; i < 5_000 && observer.documentsRead != 1; i++) {
Thread.sleep(1);
}

thread.interrupt();
}
Expand All @@ -57,47 +60,47 @@ public void newRollCycleIgnored() throws Exception {
observer.queue.close();
}

@Ignore("seems to have been broken by 77823f6")
@Test
public void newRollCycleIgnored2() throws Exception {
File path = DirectoryUtils.tempDir("newRollCycleIgnored2");

TestTimeProvider timeProvider = new TestTimeProvider();
SetTimeProvider timeProvider = new SetTimeProvider();
ParallelQueueObserver observer = new ParallelQueueObserver(timeProvider, path.toPath());

int cyclesToWrite = 100;
try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.fieldlessBinary(path)
.testBlockSize()
.rollCycle(RollCycles.DAILY)
.timeProvider(timeProvider)
.build()) {
ExcerptAppender appender = queue.acquireAppender();
// uncomment next line to make the test pass
appender.writeText("Day 1 data");
appender.writeText("0");

Thread thread = new Thread(observer);
thread.start();

observer.await();

// two days pass
timeProvider.add(TimeUnit.DAYS.toMillis(2));

appender.writeText("Day 3 data");
for (int i=1; i<=cyclesToWrite; i++) {
// two days pass
timeProvider.advanceMillis(TimeUnit.DAYS.toMillis(2));
appender.writeText(Integer.toString(i));
}

// allow parallel tailer to finish iteration
for (int i = 0; i < 5_000 && observer.documentsRead != 2; i++) {
for (int i = 0; i < 5_000 && observer.documentsRead != 1+cyclesToWrite; i++) {
Thread.sleep(1);
}

thread.interrupt();
}

assertEquals(2, observer.documentsRead);
assertEquals(1+cyclesToWrite, observer.documentsRead);
observer.queue.close();
}

@Test
public void testWriteToCorruptedFile() throws Exception {
public void testWriteToCorruptedFile() {

File dir = DirectoryUtils.tempDir("testWriteToCorruptedFile");
try (SingleChronicleQueue queue = SingleChronicleQueueBuilder
Expand Down Expand Up @@ -136,21 +139,6 @@ public void checkMappedFiles() {
MappedFile.checkMappedFiles();
}

class TestTimeProvider implements TimeProvider {

volatile long add = 0;

@Override
public long currentTimeMillis() {
return System.currentTimeMillis() + add;
}

public void add(long addInMs) {
add += addInMs;
}

}

class ParallelQueueObserver implements Runnable, StoreFileListener {
SingleChronicleQueue queue;
CountDownLatch progressLatch;
Expand All @@ -175,12 +163,16 @@ public void run() {

progressLatch.countDown();

int lastDocId = -1;
while (!Thread.currentThread().isInterrupted()) {

String readText = tailer.readText();
if (readText != null) {
System.out.println("Read a document " + readText);
documentsRead++;
int docId = Integer.parseInt(readText);
assertTrue(docId == lastDocId + 1);
lastDocId = docId;
}
}
}
Expand Down

This file was deleted.

0 comments on commit 4b6519d

Please sign in to comment.