Skip to content

Commit

Permalink
CHange test to avoid co-ordinated omission and tune.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Jan 18, 2016
1 parent 0d020df commit 15d0c73
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 26 deletions.
Expand Up @@ -23,4 +23,9 @@ public interface ExcerptCommon {
* @return the queue associated with this Excerpt
*/
ChronicleQueue queue();

/**
* Hint to the underlying store to get the next page(s)
*/
void prefetch();
}
Expand Up @@ -20,6 +20,7 @@
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.bytes.ReadBytesMarshallable;
import net.openhft.chronicle.bytes.WriteBytesMarshallable;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.annotation.ForceInline;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
Expand All @@ -32,7 +33,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

import static net.openhft.chronicle.bytes.Bytes.elasticByteBuffer;
import static net.openhft.chronicle.queue.ChronicleQueue.toCycle;
Expand Down Expand Up @@ -139,19 +140,24 @@ public long writeBytes(@NotNull Bytes<?> bytes) throws IOException {
public long cycle() {
throw new UnsupportedOperationException("todo");
}

@Override
public void prefetch() {
throw new UnsupportedOperationException("todo");

}
}

/**
* StoreAppender
*/
public static class StoreAppender extends DefaultAppender<AbstractChronicleQueue> {


private Wire wire;
private long epoch;
private long cycle;
private long index = -1;
private WireStore store;
private long nextPrefetch = OS.pageSize();

public StoreAppender(@NotNull AbstractChronicleQueue queue) throws IOException {

Expand Down Expand Up @@ -231,6 +237,16 @@ private WireStore store() throws IOException {
return this.store;
}

@Override
public void prefetch() {
long position = wire.bytes().writePosition();
if (position < nextPrefetch)
return;
long prefetch = OS.mapAlign(position) + OS.pageSize();
// touch the page without modifying it.
wire.bytes().compareAndSwapInt(prefetch, ~0, ~0);
nextPrefetch = prefetch + OS.pageSize();
}
}

// *************************************************************************
Expand All @@ -250,6 +266,7 @@ public static class StoreTailer implements ExcerptTailer {

private long index;
private WireStore store;
private long nextPrefetch = OS.pageSize();

public StoreTailer(@NotNull AbstractChronicleQueue queue) throws IOException {
this.queue = queue;
Expand All @@ -260,20 +277,20 @@ public StoreTailer(@NotNull AbstractChronicleQueue queue) throws IOException {

@Override
public boolean readDocument(@NotNull ReadMarshallable marshaller) throws IOException {
return readAtIndex(marshaller::readMarshallable);
return readAtIndex(marshaller, ReadMarshallable::readMarshallable);
}

@Override
public boolean readBytes(@NotNull Bytes using) throws IOException {
return readAtIndex(w -> using.write(w.bytes()));
return readAtIndex(using, (t, w) -> t.write(w.bytes()));
}

@Override
public boolean readBytes(@NotNull ReadBytesMarshallable using) throws IOException {
return readAtIndex(w -> using.readMarshallable(w.bytes()));
return readAtIndex(using, (t, w) -> t.readMarshallable(w.bytes()));
}

private boolean readAtIndex(@NotNull Consumer<Wire> c) throws IOException {
private <T> boolean readAtIndex(T t, @NotNull BiConsumer<T, Wire> c) throws IOException {

final long readPosition = wire.bytes().readPosition();
final long readLimit = wire.bytes().readLimit();
Expand All @@ -288,7 +305,7 @@ private boolean readAtIndex(@NotNull Consumer<Wire> c) throws IOException {
moveToIndex(firstIndex);
}

final boolean success = readAtSubIndex(c);
final boolean success = readAtSubIndex(t, c);

if (success) {

Expand All @@ -305,7 +322,7 @@ private boolean readAtIndex(@NotNull Consumer<Wire> c) throws IOException {
return false;
}

private boolean readAtSubIndex(@NotNull Consumer<Wire> c) throws IOException {
private <T> boolean readAtSubIndex(T t, @NotNull BiConsumer<T, Wire> c) throws IOException {

long roll;
for (; ; ) {
Expand All @@ -320,7 +337,7 @@ private boolean readAtSubIndex(@NotNull Consumer<Wire> c) throws IOException {
return false;

if (documentContext.isData()) {
c.accept(wire);
c.accept(t, wire);
return true;
}

Expand Down Expand Up @@ -458,6 +475,17 @@ private StoreTailer cycle(long cycle) throws IOException {
}


@Override
public void prefetch() {
long position = wire.bytes().readPosition();
if (position < nextPrefetch)
return;
long prefetch = OS.mapAlign(position) + OS.pageSize();
// touch the page without modifying it.
wire.bytes().compareAndSwapInt(prefetch, ~0, ~0);
nextPrefetch = prefetch + OS.pageSize();
}

}
}

@@ -1,3 +1,21 @@
/*
*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package net.openhft.chronicle.queue;

import net.openhft.affinity.Affinity;
Expand Down
Expand Up @@ -32,26 +32,49 @@
import org.junit.Ignore;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Results 27/10/2015 running on a MBP 50/90 99/99.9 99.99/99.999 - worst was 1.5 / 27 104 / 3,740
* 8,000 / 13,890 - 36,700
* Result on 18/1/2016 running on i7-3970X Ubuntu 10.04 with affinity writing to tmpfs
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.6 / 2.8 4.7 / 14 31 / 1,080 - 27,790
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 2.0 / 3.1 4.7 / 14 967 / 9,180 - 18,350
* <p>
* Result on 18/1/2016 running on i7-3970X Ubuntu 10.04 with affinity writing to ext4 on Samsung 840 SSD
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.6 / 2.2 4.7 / 28 36 / 160 - 29,880
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 2.1 / 2.5 5.8 / 113 160 / 1,670 - 20,450
* <p>
* Results 27/10/2015 running on a MBP 50/90 99/99.9 99.99/99.999 - worst
* was 1.5 / 27 104 / 3,740 8,000 / 13,890 - 36,700
*/
public class ChronicleQueueLatencyDistributionWithBytes extends ChronicleQueueTestBase {

public static final int STRING_LENGTH = 192;
private static final long INTERVAL_US = 20;
public static final int BLOCK_SIZE = 16 << 20;

@Ignore("long running")
@Test
public void test() throws Exception {
Histogram histogram = new Histogram();
Histogram writeHistogram = new Histogram();

String path = "target/deleteme" + System.nanoTime() + ".q"; /*getTmpDir()*/
new File(path).deleteOnExit();
ChronicleQueue rqueue = new SingleChronicleQueueBuilder(path)
.wireType(WireType.FIELDLESS_BINARY)
.blockSize(BLOCK_SIZE)
.build();

ChronicleQueue queue = new SingleChronicleQueueBuilder(getTmpDir())
.wireType(WireType.BINARY)
.blockSize(1_000_000_000)
ChronicleQueue wqueue = new SingleChronicleQueueBuilder(path)
.wireType(WireType.FIELDLESS_BINARY)
.blockSize(BLOCK_SIZE)
.build();

ExcerptAppender appender = queue.createAppender();
ExcerptTailer tailer = queue.createTailer();
ExcerptAppender appender = wqueue.createAppender();
ExcerptTailer tailer = rqueue.createTailer();

Thread tailerThread = new Thread(() -> {
MyReadMarshallable myReadMarshallable = new MyReadMarshallable(histogram);
Expand All @@ -63,7 +86,9 @@ public void test() throws Exception {

while (true) {
try {
tailer.readBytes(myReadMarshallable);
// tailer.readBytes(myReadMarshallable);
if (!tailer.readBytes(myReadMarshallable))
tailer.prefetch();
} catch (IOException e) {
e.printStackTrace();
break;
Expand All @@ -74,7 +99,7 @@ public void test() throws Exception {
lock.release();
}
}
});
}, "tailer thread");

Thread appenderThread = new Thread(() -> {
AffinityLock lock = null;
Expand All @@ -83,11 +108,22 @@ public void test() throws Exception {
lock = Affinity.acquireLock();
}

char[] value = new char[STRING_LENGTH];
Arrays.fill(value, 'X');
String id = new String(value);
TestTrade bt = new TestTrade();
for (int i = 0; i < 10_000_000; i++) {
Jvm.busyWaitMicros(5);
bt.setTime(System.nanoTime());
bt.setId(id);
long next = System.nanoTime() + INTERVAL_US * 1000;
for (int i = 0; i < 2_000_000; i++) {
while (System.nanoTime() < next)
/* busy wait*/ ;
long start = next;
bt.setTime(start);
appender.writeBytes(bt);
writeHistogram.sample(System.nanoTime() - start);
next += INTERVAL_US * 1000;
if (next > System.nanoTime())
appender.prefetch();
}
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -96,7 +132,7 @@ public void test() throws Exception {
lock.release();
}
}
});
}, "appender thread");

tailerThread.start();

Expand All @@ -106,7 +142,10 @@ public void test() throws Exception {
//Pause to allow tailer to catch up (if needed)
Jvm.pause(500);

System.out.println(histogram.toMicrosFormat());
System.out.println("write: " + writeHistogram.toMicrosFormat());
System.out.println("write-read: " + histogram.toMicrosFormat());
// rqueue.close();
// wqueue.close();
}

static class MyReadMarshallable implements ReadBytesMarshallable {
Expand All @@ -123,10 +162,10 @@ public void readMarshallable(Bytes in) throws IORuntimeException {
testTrade.readMarshallable(in);

long time = testTrade.getTime();
if (counter.get() > 1_000_000) {
if (counter.get() > 200_000) {
histogram.sample(System.nanoTime() - time);
}
if (counter.incrementAndGet() % 1_000_000 == 0) {
if (counter.incrementAndGet() % 200_000 == 0) {
System.out.println(counter.get());
}
}
Expand Down

0 comments on commit 15d0c73

Please sign in to comment.