Skip to content

Commit

Permalink
Test with a busy waiter
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Jan 21, 2016
1 parent 7c7fa19 commit 64990bb
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 43 deletions.
Expand Up @@ -65,7 +65,7 @@ protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws
storeForCycle(cycle(), builder.epoch());
epoch = builder.epoch();
bufferedAppends = builder.buffered();
eventloop = builder.eventGroup();
eventloop = builder.eventLoop();
ringBufferCapacity = BytesRingBuffer.sizeFor(builder.bufferCapacity());
}

Expand Down
Expand Up @@ -38,7 +38,7 @@ public class SingleChronicleQueueBuilder implements ChronicleQueueBuilder {
private long epoch; // default is 1970-01-01 UTC
private boolean isBuffered;
private Consumer<Throwable> onThrowable = Throwable::printStackTrace;
private EventLoop eventGroup;
private EventLoop eventLoop;

private long bufferCapacity = 2 << 20;

Expand All @@ -54,6 +54,33 @@ public SingleChronicleQueueBuilder(File path) {
this.epoch = 0;
}

public static SingleChronicleQueueBuilder binary(File name) {
return binary(name.getAbsolutePath());
}

public static SingleChronicleQueueBuilder binary(String name) {
return new SingleChronicleQueueBuilder(name)
.wireType(WireType.BINARY);
}

public static SingleChronicleQueueBuilder text(File name) {
return text(name.getAbsolutePath());
}

public static SingleChronicleQueueBuilder text(String name) {
return new SingleChronicleQueueBuilder(name)
.wireType(WireType.TEXT);
}

public static SingleChronicleQueueBuilder raw(File name) {
return raw(name.getAbsolutePath());
}

public static SingleChronicleQueueBuilder raw(String name) {
return new SingleChronicleQueueBuilder(name)
.wireType(WireType.RAW);
}

public File path() {
return this.path;
}
Expand Down Expand Up @@ -88,6 +115,9 @@ public long bufferCapacity() {
return bufferCapacity;
}

// *************************************************************************
// HELPERS
// *************************************************************************

/**
* @param ringBufferSize sets the ring buffer capacity in bytes
Expand Down Expand Up @@ -124,8 +154,8 @@ public RollCycle rollCycle() {

@NotNull
public ChronicleQueue build() throws IOException {
if (isBuffered && eventGroup == null)
eventGroup = new EventGroup(true, onThrowable);
if (isBuffered && eventLoop == null)
eventLoop = new EventGroup(true, onThrowable);
return new SingleChronicleQueue(this.clone());
}

Expand All @@ -140,37 +170,6 @@ public SingleChronicleQueueBuilder clone() {
}
}

// *************************************************************************
// HELPERS
// *************************************************************************

public static SingleChronicleQueueBuilder binary(File name) {
return binary(name.getAbsolutePath());
}

public static SingleChronicleQueueBuilder binary(String name) {
return new SingleChronicleQueueBuilder(name)
.wireType(WireType.BINARY);
}

public static SingleChronicleQueueBuilder text(File name) {
return text(name.getAbsolutePath());
}

public static SingleChronicleQueueBuilder text(String name) {
return new SingleChronicleQueueBuilder(name)
.wireType(WireType.TEXT);
}

public static SingleChronicleQueueBuilder raw(File name) {
return raw(name.getAbsolutePath());
}

public static SingleChronicleQueueBuilder raw(String name) {
return new SingleChronicleQueueBuilder(name)
.wireType(WireType.RAW);
}

/**
* use this to trap exceptions that came from the other threads
*
Expand Down Expand Up @@ -203,8 +202,13 @@ public boolean buffered() {
return this.isBuffered;
}

public EventLoop eventGroup() {
return eventGroup;
public EventLoop eventLoop() {
return eventLoop;
}

public SingleChronicleQueueBuilder eventLoop(EventLoop eventLoop) {
this.eventLoop = eventLoop;
return this;
}

public SingleChronicleQueueBuilder bufferCapacity(int bufferCapacity) {
Expand Down
Expand Up @@ -24,6 +24,8 @@
import net.openhft.chronicle.bytes.NativeBytes;
import net.openhft.chronicle.core.util.Histogram;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.BusyPauser;
import net.openhft.chronicle.threads.EventGroup;
import net.openhft.chronicle.wire.WireType;
import org.junit.Test;

Expand Down Expand Up @@ -52,28 +54,30 @@ public class ChronicleQueueLatencyDistributionWithBytes extends ChronicleQueueTe

public static final int BYTES_LENGTH = 128;
public static final int BLOCK_SIZE = 16 << 20;
private static final long INTERVAL_US = 20;
public static final int BUFFER_CAPACITY = 1 << 20;
private static final long INTERVAL_US = 5;

// @Ignore("long running")
@Test
public void test() throws IOException, InterruptedException {
Histogram histogram = new Histogram();
Histogram writeHistogram = new Histogram();

String path = "target/deleteme" + System.nanoTime() + ".q"; /*getTmpDir()*/
// String path = getTmpDir() + "/deleteme.q";
// String path = "target/deleteme" + System.nanoTime() + ".q"; /*getTmpDir()*/
String path = getTmpDir() + "/deleteme.q";
new File(path).deleteOnExit();
ChronicleQueue rqueue = new SingleChronicleQueueBuilder(path)
.wireType(WireType.FIELDLESS_BINARY)
.blockSize(BLOCK_SIZE)
.bufferCapacity(64 << 10)
.build();

EventGroup eventLoop = new EventGroup(true, Throwable::printStackTrace, BusyPauser.INSTANCE, true);
ChronicleQueue wqueue = new SingleChronicleQueueBuilder(path)
.wireType(WireType.FIELDLESS_BINARY)
.blockSize(BLOCK_SIZE)
.bufferCapacity(64 << 10)
.bufferCapacity(BUFFER_CAPACITY)
.buffered(true)
.eventLoop(eventLoop)
.build();

ExcerptAppender appender = wqueue.createAppender();
Expand Down Expand Up @@ -118,7 +122,7 @@ public void test() throws IOException, InterruptedException {
Bytes bytes = Bytes.allocateDirect(BYTES_LENGTH).unchecked(true);

long next = System.nanoTime() + INTERVAL_US * 1000;
for (int i = 0; i < 1_000_000; i++) {
for (int i = 0; i < 2_000_000; i++) {
while (System.nanoTime() < next)
/* busy wait*/ ;
long start = next;
Expand Down Expand Up @@ -156,5 +160,6 @@ public void test() throws IOException, InterruptedException {
System.out.println("write-read: " + histogram.toMicrosFormat());
// rqueue.close();
// wqueue.close();
eventLoop.close();
}
}

0 comments on commit 64990bb

Please sign in to comment.