Skip to content

Commit

Permalink
Add perf tests for ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Jan 19, 2016
1 parent 18a41f5 commit acf21fc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 116 deletions.
Expand Up @@ -48,7 +48,7 @@ public class BytesRingBuffer {
private final Header header;

public BytesRingBuffer(@NotNull final BytesStore byteStore) {
capacity = byteStore.writeLimit() - Header.size();
capacity = byteStore.realCapacity() - Header.size();

if (byteStore.writeRemaining() <= Header.size()) {
throw new IllegalStateException("The byteStore is too small, the minimum recommended " +
Expand Down
Expand Up @@ -21,39 +21,35 @@
import net.openhft.affinity.Affinity;
import net.openhft.affinity.AffinityLock;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesMarshallable;
import net.openhft.chronicle.bytes.ReadBytesMarshallable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.util.Histogram;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.WireType;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Result on 18/1/2016 running on i7-3970X Ubuntu 10.04 with affinity writing to tmpfs
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.6 / 2.8 4.7 / 14 31 / 1,080 - 27,790
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 2.0 / 3.1 4.7 / 14 967 / 9,180 - 18,350
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.7 / 1.8 1.9 / 2.4 184 / 1,800 - 2,160
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 4.7 / 7.3 418 / 5,900 12,320 / 14,940 - 14,940
* <p>
* Result on 18/1/2016 running on i7-3970X Ubuntu 10.04 with affinity writing to ext4 on Samsung 840 SSD
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.6 / 2.2 4.7 / 28 36 / 160 - 29,880
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 2.1 / 2.5 5.8 / 113 160 / 1,670 - 20,450
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.7 / 1.8 1.8 / 2.4 1,610 / 4,330 - 4,590
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 4.7 / 7.6 20,450 / 155,190 188,740 / 188,740 - 188,740
* <p>
* Result with out co-ordinated omission
* write: 50/90 99/99.9 99.99/99.999 - worst was 2.8 / 4.5 9,180 / 222,300 247,460 / 255,850 - 255,850
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 3.4 / 5.0 2,290 / 222,300 247,460 / 255,850 - 255,850
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.8 / 2.6 5.2 / 13 121 / 319 - 672
* write-read: 50/90 99/99.9 99.99/99.999 - worst was 2.2 / 3.8 5.8 / 13 258 / 516 - 1,210
* <p>
* Results 27/10/2015 running on a MBP 50/90 99/99.9 99.99/99.999 - worst
* was 1.5 / 27 104 / 3,740 8,000 / 13,890 - 36,700
* write: 50/90 99/99.9 99.99/99.999 - worst was 0.49 / 1.1 3.6 / 80 8,650 / 20,450 - 22,540
* write-read: 50/90 99/99.9 99.99/99.999 99.9999/worst was 0.53 / 1.1 3.8 / 6,160 17,300 / 21,500 23,590 / 23,590
* <p>
* write: 50/90 99/99.9 99.99/99.999 - worst was 1.5 / 1.5 1.6 / 2.2 65 / 1,740 - 3,600
* write-read: 50/90 99/99.9 99.99/99.999 99.9999/worst was 3.9 / 6.5 225 / 15,990 106,950 / 115,340 115,340 / 115,340
*/
public class ChronicleQueueLatencyDistributionWithBytes extends ChronicleQueueTestBase {

public static final int STRING_LENGTH = 192;
public static final int BYTES_LENGTH = 16;
private static final long INTERVAL_US = 20;
public static final int BLOCK_SIZE = 16 << 20;

Expand All @@ -63,7 +59,8 @@ public void test() throws IOException, InterruptedException {
Histogram histogram = new Histogram();
Histogram writeHistogram = new Histogram();

String path = "target/deleteme" + System.nanoTime() + ".q"; /*getTmpDir()*/
// String path = "target/deleteme" + System.nanoTime() + ".q"; /*getTmpDir()*/
String path = getTmpDir() + "/deleteme.q";
new File(path).deleteOnExit();
ChronicleQueue rqueue = new SingleChronicleQueueBuilder(path)
.wireType(WireType.FIELDLESS_BINARY)
Expand All @@ -80,7 +77,7 @@ public void test() throws IOException, InterruptedException {
ExcerptTailer tailer = rqueue.createTailer();

Thread tailerThread = new Thread(() -> {
MyReadMarshallable myReadMarshallable = new MyReadMarshallable(histogram);
Bytes bytes = Bytes.allocateDirect(BYTES_LENGTH).unchecked(true);
AffinityLock lock = null;
try {
if (Boolean.getBoolean("enableTailerAffinity")) {
Expand All @@ -89,9 +86,12 @@ public void test() throws IOException, InterruptedException {

while (true) {
try {
// tailer.readBytes(myReadMarshallable);
if (!tailer.readBytes(myReadMarshallable))
bytes.clear();
if (tailer.readBytes(bytes)) {
histogram.sample(System.nanoTime() - bytes.readLong(0));
} else {
tailer.prefetch();
}
} catch (IOException e) {
e.printStackTrace();
break;
Expand All @@ -103,40 +103,27 @@ public void test() throws IOException, InterruptedException {
}
}
}, "tailer thread");
/*
Pauser pauser = new LongPauser(0, 0, 1, 1, TimeUnit.MILLISECONDS);
Thread prefetcher = new Thread(() -> {
while(!Thread.currentThread().isInterrupted()) {
appender.prefetch();
pauser.pause();
}
}, "prefetcher");
prefetcher.start();
*/

Thread appenderThread = new Thread(() -> {
AffinityLock lock = null;
try {
if (Boolean.getBoolean("enableAppenderAffinity")) {
lock = Affinity.acquireLock();
}
Bytes bytes = Bytes.allocateDirect(BYTES_LENGTH).unchecked(true);

char[] value = new char[STRING_LENGTH];
Arrays.fill(value, 'X');
String id = new String(value);
TestTrade bt = new TestTrade();
bt.setId(id);
long next = System.nanoTime() + INTERVAL_US * 1000;
for (int i = 0; i < 2_000_000; i++) {
for (int i = 0; i < 10_000_000; i++) {
while (System.nanoTime() < next)
/* busy wait*/ ;
long start = next;
bt.setTime(start);
appender.writeBytes(bt);
bytes.readPosition(0);
bytes.readLimit(BYTES_LENGTH);
bytes.writeLong(0L, System.nanoTime());
appender.writeBytes(bytes);
if (i > 500000)
writeHistogram.sample(System.nanoTime() - start);
next += INTERVAL_US * 1000;
// pauser.unpause();
}
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -148,6 +135,7 @@ public void test() throws IOException, InterruptedException {
}, "appender thread");

tailerThread.start();
Thread.sleep(100);

appenderThread.start();
appenderThread.join();
Expand All @@ -162,80 +150,4 @@ public void test() throws IOException, InterruptedException {
// rqueue.close();
// wqueue.close();
}

static class MyReadMarshallable implements ReadBytesMarshallable {
private AtomicInteger counter = new AtomicInteger(0);
private TestTrade testTrade = new TestTrade();
private Histogram histogram;

public MyReadMarshallable(Histogram histogram) {
this.histogram = histogram;
}

@Override
public void readMarshallable(Bytes in) throws IORuntimeException {
testTrade.readMarshallable(in);

long time = testTrade.getTime();
if (counter.get() > 200_000) {
histogram.sample(System.nanoTime() - time);
}
if (counter.incrementAndGet() % 200_000 == 0) {
System.out.println(counter.get());
}
}
}

static class TestTrade implements BytesMarshallable {
private int price;
private String id;
private long time;

public long getTime() {
return time;
}

public void setTime(long time) {
this.time = time;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public int getPrice() {
return price;
}

public void setPrice(int price) {
this.price = price;
}

@Override
public void readMarshallable(@NotNull Bytes in) {
this.price = in.readInt();
this.id = in.readUtf8();
this.time = in.readLong();
}

@Override
public void writeMarshallable(@NotNull Bytes out) {
out.writeInt(this.price);
out.writeUtf8(this.id);
out.writeLong(this.time);
}

@Override
public String toString() {
return "TestTrade{" +
"price=" + price +
", id='" + id + '\'' +
", time=" + time +
'}';
}
}
}
Expand Up @@ -19,7 +19,9 @@
package net.openhft.chronicle.queue.impl.ringbuffer;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.NativeBytes;
import net.openhft.chronicle.bytes.NativeBytesStore;
import net.openhft.chronicle.core.util.Histogram;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -31,6 +33,7 @@
import static net.openhft.chronicle.bytes.BytesStore.wrap;
import static net.openhft.chronicle.bytes.NativeBytesStore.nativeStoreWithFixedCapacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* @author Rob Austin.
Expand Down Expand Up @@ -235,4 +238,24 @@ public void testMultiThreadedFasterReaderThanWriter() throws Throwable {
}
}

@Test
public void perfTest() throws InterruptedException {
BytesRingBuffer brb = new BytesRingBuffer(NativeBytes.nativeBytes(2 << 20).unchecked(true));
Bytes bytes = NativeBytes.nativeBytes(64).unchecked(true);
for (int t = 0; t < 5; t++) {
Histogram hist = new Histogram();
for (int j = 0; j < 10_000_000; j += 20_000) {
for (int i = 0; i < 20_000; i++) {
bytes.readPosition(0);
bytes.readLimit(bytes.realCapacity());
long start = System.nanoTime();
assertTrue(brb.offer(bytes));
hist.sample(System.nanoTime() - start);
}
brb.clear();
}
System.out.println(hist.toMicrosFormat());
}
}

}

0 comments on commit acf21fc

Please sign in to comment.