Skip to content

Commit

Permalink
improve stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
epickrram committed Dec 11, 2017
1 parent 082b12f commit c1dfce2
Showing 1 changed file with 48 additions and 21 deletions.
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;


public class RollCycleMultiThreadStressTest { public class RollCycleMultiThreadStressTest {
Expand All @@ -40,7 +39,7 @@ public class RollCycleMultiThreadStressTest {
@Test @Test
public void stress() throws Exception { public void stress() throws Exception {
final File path = DirectoryUtils.tempDir("rollCycleStress"); final File path = DirectoryUtils.tempDir("rollCycleStress");
LOG.warn("using path {} now is {}", path, LocalDateTime.now()); LOG.warn("Using path: {}, current time is: {}", path, LocalDateTime.now());
final int numThreads = Runtime.getRuntime().availableProcessors(); final int numThreads = Runtime.getRuntime().availableProcessors();
final int numWriters = numThreads / 4 + 1; final int numWriters = numThreads / 4 + 1;
final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
Expand Down Expand Up @@ -80,16 +79,18 @@ public void stress() throws Exception {
System.out.printf("Writer has written %d of %d messages after %ds. Readers at %s. Waiting...%n", System.out.printf("Writer has written %d of %d messages after %ds. Readers at %s. Waiting...%n",
wrote.get() + 1, expectedNumberOfMessages, wrote.get() + 1, expectedNumberOfMessages,
i * 10, readersLastRead); i * 10, readersLastRead);
readers.forEach(reader -> { readers.stream().filter(r -> !r.isMakingProgress()).findAny().ifPresent(reader -> {
if ((wrote.get() - reader.lastRead) > 1_000_000) { if (reader.exception != null) {
if (reader.exception != null) { throw new AssertionError("Reader encountered exception, so stopped reading messages",
throw new AssertionError("Reader encountered exception, so stopped reading messages", reader.exception);
reader.exception);
}
throw new AssertionError("Reader is stuck");
} }
throw new AssertionError("Reader is stuck");

}); });
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L)); final long waitUntil = System.currentTimeMillis() + 10_000L;
while (wrote.get() < expectedNumberOfMessages && System.currentTimeMillis() < waitUntil) {
LockSupport.parkNanos(1L);
}
} else { } else {
break; break;
} }
Expand Down Expand Up @@ -146,7 +147,7 @@ public void stress() throws Exception {
private boolean areAllReadersComplete(final int expectedNumberOfMessages, final List<Reader> readers) { private boolean areAllReadersComplete(final int expectedNumberOfMessages, final List<Reader> readers) {
boolean allReadersComplete = true; boolean allReadersComplete = true;


int count=0; int count = 0;
for (Reader reader : readers) { for (Reader reader : readers) {
++count; ++count;
if (reader.lastRead < expectedNumberOfMessages - 1) { if (reader.lastRead < expectedNumberOfMessages - 1) {
Expand All @@ -162,31 +163,56 @@ private static final class Reader implements Callable<Throwable> {
private final int expectedNumberOfMessages; private final int expectedNumberOfMessages;
private volatile int lastRead = -1; private volatile int lastRead = -1;
private volatile Throwable exception; private volatile Throwable exception;
private int readSequenceAtLastProgressCheck = -1;


Reader(final File path, final int expectedNumberOfMessages) { Reader(final File path, final int expectedNumberOfMessages) {
this.path = path; this.path = path;
this.expectedNumberOfMessages = expectedNumberOfMessages; this.expectedNumberOfMessages = expectedNumberOfMessages;
} }


boolean isMakingProgress() {
if (readSequenceAtLastProgressCheck == -1) {
return true;
}

final boolean makingProgress = lastRead > readSequenceAtLastProgressCheck;
readSequenceAtLastProgressCheck = lastRead;

return makingProgress;
}

@Override @Override
public Throwable call() { public Throwable call() {


try (final ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path) try (final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(path)
.testBlockSize() .testBlockSize()
.rollCycle(RollCycles.TEST_SECONDLY) .rollCycle(RollCycles.TEST_SECONDLY)
.build()) { .build()) {


final ExcerptTailer tailer = queue.createTailer(); final ExcerptTailer tailer = queue.createTailer();

int lastTailerCycle = -1;
int lastQueueCycle = -1;
while (lastRead != expectedNumberOfMessages - 1) { while (lastRead != expectedNumberOfMessages - 1) {
try (DocumentContext rd = tailer.readingDocument()) { try (DocumentContext rd = tailer.readingDocument()) {
if (rd.isPresent()) { if (rd.isPresent()) {
int v = -1; int v = -1;
for (int i = 0; i< NUMBER_OF_INTS; i++) { for (int i = 0; i < NUMBER_OF_INTS; i++) {
v = rd.wire().getValueIn().int32(); v = rd.wire().getValueIn().int32();
assertEquals(lastRead + 1, v); if (lastRead + 1 != v) {
String failureMessage = "Expected: " + (lastRead + 1) +
", actual: " + v;
if (lastTailerCycle != -1) {
failureMessage += ". Tailer cycle at last read: " + lastTailerCycle +
" (current: " + (tailer.cycle()) +
"), queue cycle at last read: " + lastQueueCycle +
" (current: " + queue.cycle() + ")";
}
throw new AssertionError(failureMessage);
}
} }
lastRead = v; lastRead = v;
lastTailerCycle = tailer.cycle();
lastQueueCycle = queue.cycle();
} }
} }
} }
Expand Down Expand Up @@ -221,23 +247,24 @@ public Throwable call() {
.rollCycle(RollCycles.TEST_SECONDLY) .rollCycle(RollCycles.TEST_SECONDLY)
.build()) { .build()) {
final ExcerptAppender appender = queue.acquireAppender(); final ExcerptAppender appender = queue.acquireAppender();
long nextTime = System.nanoTime(); final long startTime = System.nanoTime();
int loopIteration = 0;
while (true) { while (true) {
final int value; final int value;


try (DocumentContext writingDocument = appender.writingDocument()) { try (DocumentContext writingDocument = appender.writingDocument()) {
value = wrote.getAndIncrement(); value = wrote.getAndIncrement();
ValueOut valueOut = writingDocument.wire().getValueOut(); ValueOut valueOut = writingDocument.wire().getValueOut();
// make the message longer // make the message longer
for (int i = 0; i< NUMBER_OF_INTS; i++) { for (int i = 0; i < NUMBER_OF_INTS; i++) {
valueOut.int32(value); valueOut.int32(value);
} }
long delay = nextTime - System.nanoTime(); while (System.nanoTime() < (startTime + (loopIteration * SLEEP_PER_WRITE_NANOS))) {
if (delay > 0) { Thread.yield();
LockSupport.parkNanos(delay);
} }
nextTime += SLEEP_PER_WRITE_NANOS * 0.99;
} }
loopIteration++;

if (value >= expectedNumberOfMessages) { if (value >= expectedNumberOfMessages) {
return null; return null;
} }
Expand Down

0 comments on commit c1dfce2

Please sign in to comment.