Skip to content

Commit

Permalink
Fix a test in Chronicle-Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Mar 23, 2015
1 parent ae33a13 commit de2c95e
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 57 deletions.
7 changes: 4 additions & 3 deletions chronicle-queue/pom.xml
Expand Up @@ -66,17 +66,18 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>net.openhft</groupId> <groupId>net.openhft</groupId>
<artifactId>chronicle-network</artifactId> <artifactId>chronicle-wire</artifactId>
</dependency> </dependency>


<dependency> <dependency>
<groupId>net.openhft</groupId> <groupId>net.openhft</groupId>
<artifactId>chronicle-wire</artifactId> <artifactId>affinity</artifactId>
</dependency> </dependency>


<dependency> <dependency>
<groupId>net.openhft</groupId> <groupId>net.openhft</groupId>
<artifactId>affinity</artifactId> <artifactId>chronicle-threads</artifactId>
<version>1.0.0-alpha-SNAPSHOT</version>
</dependency> </dependency>


<dependency> <dependency>
Expand Down
@@ -0,0 +1,36 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesUtil;
import net.openhft.chronicle.bytes.MappedFile;
import net.openhft.chronicle.wire.Wires;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;

public class QueueDumpMain {
static final String[] SBP_TYPES = "data,meta-data,not-ready-data,not-ready-meta-data".split(",");

public static void dump(File filename, PrintWriter pw) throws FileNotFoundException {
MappedFile mappedFile = MappedFile.mappedFile(filename, 64 << 20, 16 << 20);
Bytes bytes = mappedFile.bytes();
pw.print("# Magic: ");
for (int i = 0; i < 8; i++)
pw.print((char) bytes.readUnsignedByte());
pw.println();
while (true) {
long spb = bytes.readUnsignedInt();
if (!Wires.isKnownLength(spb))
break;
pw.print("--- !");
pw.print(SBP_TYPES[((int) (spb >>> 30))]);
pw.println();
long start = bytes.position();
BytesUtil.toString(bytes, pw, start, start, start + Wires.lengthOf(spb));
pw.println();
bytes.skip(Wires.lengthOf(spb));
}
pw.flush();
}
}
Expand Up @@ -67,15 +67,15 @@ enum Field implements WireKey {
@Override @Override
public void writeMarshallable(@NotNull WireOut out) { public void writeMarshallable(@NotNull WireOut out) {
out.write(Field.uuid).uuid(uuid) out.write(Field.uuid).uuid(uuid)
.write(Field.writeByte).int64(writeByte()) .write(Field.writeByte).int64forBinding(PADDED_SIZE)
.write(Field.created).zonedDateTime(created) .write(Field.created).zonedDateTime(created)
.write(Field.user).text(user) .write(Field.user).text(user)
.write(Field.host).text(host) .write(Field.host).text(host)
.write(Field.compression).text(compression) .write(Field.compression).text(compression)
.write(Field.indexCount).int32(indexCount) .write(Field.indexCount).int32(indexCount)
.write(Field.indexSpacing).int32(indexSpacing) .write(Field.indexSpacing).int32(indexSpacing)
.write(Field.index2Index).int64(index2Index()) .write(Field.index2Index).int64forBinding(0L)
.write(Field.lastIndex).int64(lastIndex()); .write(Field.lastIndex).int64forBinding(-1L);
out.addPadding((int) (PADDED_SIZE - out.bytes().position())); out.addPadding((int) (PADDED_SIZE - out.bytes().position()));
} }


Expand Down
Expand Up @@ -128,14 +128,8 @@ private void readHeader() throws IOException {


bytes.position(HEADER_OFFSET); bytes.position(HEADER_OFFSET);


Consumer<WireIn> nullConsumer = o -> { if (!wire.readDocument(w -> w.read().marshallable(header), null))
}; throw new AssertionError("No header!?");

Consumer<WireIn> dataConsumer = $ -> {
wire.read().marshallable(header);
};

wire.readDocument(dataConsumer, nullConsumer);
firstBytes = bytes.position(); firstBytes = bytes.position();
} }


Expand Down Expand Up @@ -296,13 +290,13 @@ long newIndex() {
final long lastByte = writeByte.getVolatileValue(); final long lastByte = writeByte.getVolatileValue();


for (; ; ) { for (; ; ) {
if (bytes.compareAndSwapInt(lastByte, 0, NOT_READY | (int) length)) { if (bytes.compareAndSwapInt(lastByte, 0, META_DATA | NOT_READY | (int) length)) {
long lastByte2 = lastByte + 4 + buffer.remaining() + indexSize; long lastByte2 = lastByte + 4 + buffer.remaining() + indexSize;
bytes.write(lastByte + 4, buffer); bytes.write(lastByte + 4, buffer);


header.lastIndex().addAtomicValue(1); header.lastIndex().addAtomicValue(1);
writeByte.setOrderedValue(lastByte2); writeByte.setOrderedValue(lastByte2);
bytes.writeOrderedInt(lastByte, (int) (6 + indexSize)); bytes.writeOrderedInt(lastByte, META_DATA | (int) (6 + indexSize));
long start = lastByte + 4; long start = lastByte + 4;
bytes.zeroOut(start + keyLen, start + keyLen + length); bytes.zeroOut(start + keyLen, start + keyLen + length);
return start + keyLen; return start + keyLen;
Expand Down
Expand Up @@ -40,13 +40,7 @@ public WireIn wire() {


@Override @Override
public boolean readDocument(Consumer<WireIn> reader) { public boolean readDocument(Consumer<WireIn> reader) {
Consumer<WireIn> metaDataConsumer = new Consumer<WireIn>() { wire.readDocument(null, reader);
@Override
public void accept(WireIn wireIn) {
// skip the meta data
}
};
wire.readDocument(metaDataConsumer, reader);
return true; return true;
} }


Expand Down Expand Up @@ -105,7 +99,7 @@ public boolean index(final long index) {


} }


final LongValue position = new LongDirectReference(); final LongDirectReference position = new LongDirectReference();
long last = chronicle.lastIndex(); long last = chronicle.lastIndex();




Expand All @@ -125,7 +119,7 @@ public boolean index(final long index) {
wire.readDocument(metaDataConsumer, dataConsumer); wire.readDocument(metaDataConsumer, dataConsumer);




if (position.getValue() != 0) { if (position.bytes() != null) {
wire.bytes().position(position.getValue()); wire.bytes().position(position.getValue());
return true; return true;
} }
Expand Down
@@ -1,8 +1,8 @@
package net.openhft.chronicle.queue.impl.ringbuffer; package net.openhft.chronicle.queue.impl.ringbuffer;


import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.network.internal.NamedThreadFactory;
import net.openhft.chronicle.queue.impl.DirectChronicleQueue; import net.openhft.chronicle.queue.impl.DirectChronicleQueue;
import net.openhft.chronicle.threads.NamedThreadFactory;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down
Expand Up @@ -71,7 +71,7 @@ public Bytes provide(long maxSize) {
@Test @Test
public void testSimpledSingleThreadedWriteRead() throws Exception { public void testSimpledSingleThreadedWriteRead() throws Exception {


try (NativeByteStore<Void> nativeStore = NativeByteStore.nativeStore(150)) { try (NativeBytesStore<Void> nativeStore = NativeBytesStore.nativeStore(150)) {
final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(nativeStore.bytes()); final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(nativeStore.bytes());


bytesRingBuffer.offer(data()); bytesRingBuffer.offer(data());
Expand All @@ -82,7 +82,7 @@ public void testSimpledSingleThreadedWriteRead() throws Exception {


@Test @Test
public void testPollWithNoData() throws Exception { public void testPollWithNoData() throws Exception {
try (NativeByteStore<Void> nativeStore = NativeByteStore.nativeStore(150)) { try (NativeBytesStore<Void> nativeStore = NativeBytesStore.nativeStore(150)) {


assert nativeStore.isNative(); assert nativeStore.isNative();


Expand All @@ -96,7 +96,7 @@ public void testPollWithNoData() throws Exception {


@Test @Test
public void testWriteAndRead() throws Exception { public void testWriteAndRead() throws Exception {
try (NativeByteStore<Void> nativeStore = NativeByteStore.nativeStore(150)) { try (NativeBytesStore<Void> nativeStore = NativeBytesStore.nativeStore(150)) {
assert nativeStore.isNative(); assert nativeStore.isNative();
final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(nativeStore.bytes()); final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(nativeStore.bytes());
data(); data();
Expand All @@ -113,7 +113,7 @@ private Bytes<ByteBuffer> data() {


@Test @Test
public void testFlowAroundSingleThreadedWriteDifferentSizeBuffers() throws Exception { public void testFlowAroundSingleThreadedWriteDifferentSizeBuffers() throws Exception {
try (NativeByteStore<Void> nativeStore = NativeByteStore.nativeStore(150)) { try (NativeBytesStore<Void> nativeStore = NativeBytesStore.nativeStore(150)) {


System.out.println(nativeStore.realCapacity()); System.out.println(nativeStore.realCapacity());
System.out.println(nativeStore.capacity()); System.out.println(nativeStore.capacity());
Expand All @@ -137,7 +137,7 @@ public void testFlowAroundSingleThreadedWriteDifferentSizeBuffers() throws Excep


@Test @Test
public void testWrite3read3SingleThreadedWrite() throws Exception { public void testWrite3read3SingleThreadedWrite() throws Exception {
try (NativeByteStore<Void> nativeStore = NativeByteStore.nativeStore(150)) { try (NativeBytesStore<Void> nativeStore = NativeBytesStore.nativeStore(150)) {
final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(nativeStore.bytes()); final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(nativeStore.bytes());


assert nativeStore.bytes().capacity() < (1 << 12); assert nativeStore.bytes().capacity() < (1 << 12);
Expand Down Expand Up @@ -180,7 +180,7 @@ public Bytes provide(long maxSize) {
@Test @Test
public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Exception { public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Exception {


try (NativeByteStore allocate = NativeByteStore.nativeStore(1000)) { try (NativeBytesStore allocate = NativeBytesStore.nativeStore(1000)) {
final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(allocate.bytes()); final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(allocate.bytes());




Expand All @@ -193,7 +193,7 @@ public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Except
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
final int j = i; final int j = i;
executorService.submit(() -> { executorService.submit(() -> {
try (NativeByteStore<Void> nativeStore = NativeByteStore.nativeStore(iterations)) { try (NativeBytesStore<Void> nativeStore = NativeBytesStore.nativeStore(iterations)) {
final Bytes out = nativeStore.bytes(); final Bytes out = nativeStore.bytes();
String expected = EXPECTED_VALUE + j; String expected = EXPECTED_VALUE + j;
out.clear(); out.clear();
Expand Down Expand Up @@ -223,7 +223,7 @@ public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Except
executorService.submit(() -> { executorService.submit(() -> {


try { try {
try (NativeByteStore<Void> nativeStore = NativeByteStore.nativeStore(25)) { try (NativeBytesStore<Void> nativeStore = NativeBytesStore.nativeStore(25)) {
Bytes bytes = nativeStore.bytes(); Bytes bytes = nativeStore.bytes();
Bytes result = null; Bytes result = null;
do { do {
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Except
@Test @Test
public void testMultiThreadedWithIntValues() throws Exception { public void testMultiThreadedWithIntValues() throws Exception {


try (NativeByteStore allocate = NativeByteStore.nativeStore(1000)) { try (NativeBytesStore allocate = NativeBytesStore.nativeStore(1000)) {




final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(allocate.bytes()); final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(allocate.bytes());
Expand All @@ -276,7 +276,7 @@ public void testMultiThreadedWithIntValues() throws Exception {
final int j = i; final int j = i;
executorService.submit(() -> { executorService.submit(() -> {


try (NativeByteStore allocate2 = NativeByteStore.nativeStore(iterations)) { try (NativeBytesStore allocate2 = NativeBytesStore.nativeStore(iterations)) {
final Bytes out = allocate2.bytes(); final Bytes out = allocate2.bytes();


out.clear(); out.clear();
Expand Down Expand Up @@ -307,7 +307,7 @@ public void testMultiThreadedWithIntValues() throws Exception {
executorService.submit(() -> { executorService.submit(() -> {


try { try {
try (NativeByteStore allocate3 = NativeByteStore.nativeStore(25)) { try (NativeBytesStore allocate3 = NativeBytesStore.nativeStore(25)) {
final Bytes bytes = allocate3.bytes(); final Bytes bytes = allocate3.bytes();
Bytes result = null; Bytes result = null;
do { do {
Expand Down
@@ -1,10 +1,10 @@
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.NativeBytesStore;
import net.openhft.chronicle.queue.impl.DirectChronicleQueue; import net.openhft.chronicle.queue.impl.DirectChronicleQueue;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
Expand Down Expand Up @@ -35,10 +35,38 @@ public class DirectChronicleQueueStringTest {
public static final String EXPECTED_STRING = "Hello World23456789012345678901234567890"; public static final String EXPECTED_STRING = "Hello World23456789012345678901234567890";
public static final byte[] EXPECTED_BYTES = EXPECTED_STRING.getBytes(); public static final byte[] EXPECTED_BYTES = EXPECTED_STRING.getBytes();
public static final String TMP = new File("/tmp").isDirectory() ? "/tmp" : System.getProperty("java.io.tmpdir"); public static final String TMP = new File("/tmp").isDirectory() ? "/tmp" : System.getProperty("java.io.tmpdir");
private static final Logger LOG = LoggerFactory.getLogger(DirectChronicleQueueStringTest.class.getName());


@Test @Test
public void testCreateAppender() throws Exception { public void testCreateAppender() throws Exception {
for (int r = 0; r < 2; r++) {
long start = System.nanoTime();

String name = TMP + "/single" + start + ".q";
File file = new File(name);
file.deleteOnExit();

DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(name)
.build();

writeSome(chronicle);

long mid = System.nanoTime();

DirectChronicleQueue chronicle2 = (DirectChronicleQueue) new ChronicleQueueBuilder(name)
.build();

readSome(chronicle2);

long end = System.nanoTime();
System.out.printf("Write rate %.1f M/s - Read rate %.1f M/s%n",
RUNS * 1e3 / (mid - start), RUNS * 1e3 / (end - mid));
}
}


@Test
@Ignore
public void testCreateAppenderMT() throws Exception {
for (int r = 0; r < 2; r++) { for (int r = 0; r < 2; r++) {
for (int t = 1; t < Runtime.getRuntime().availableProcessors(); t++) { for (int t = 1; t < Runtime.getRuntime().availableProcessors(); t++) {
List<Future<?>> futureList = new ArrayList<>(); List<Future<?>> futureList = new ArrayList<>();
Expand Down Expand Up @@ -87,7 +115,7 @@ public void testCreateAppender() throws Exception {
} }


private void readSome(DirectChronicleQueue chronicle) { private void readSome(DirectChronicleQueue chronicle) {
NativeByteStore allocate = NativeByteStore.nativeStore(EXPECTED_BYTES.length); NativeBytesStore allocate = NativeBytesStore.nativeStore(EXPECTED_BYTES.length);
final Bytes toRead = allocate.bytes(); final Bytes toRead = allocate.bytes();
AtomicLong offset = new AtomicLong(chronicle.firstBytes()); AtomicLong offset = new AtomicLong(chronicle.firstBytes());
for (int i = 0; i < RUNS; i++) { for (int i = 0; i < RUNS; i++) {
Expand All @@ -97,11 +125,12 @@ private void readSome(DirectChronicleQueue chronicle) {
} }


private void writeSome(DirectChronicleQueue chronicle) { private void writeSome(DirectChronicleQueue chronicle) {
NativeByteStore allocate = NativeByteStore.nativeStore(EXPECTED_BYTES.length); NativeBytesStore allocate = NativeBytesStore.nativeStore(EXPECTED_BYTES.length);
final Bytes toWrite = allocate.bytes(); final Bytes toWrite = allocate.bytes();
toWrite.write(EXPECTED_BYTES);
for (int i = 0; i < RUNS; i++) { for (int i = 0; i < RUNS; i++) {
toWrite.clear(); toWrite.clear();
toWrite.write(EXPECTED_BYTES);
toWrite.flip();
chronicle.appendDocument(toWrite); chronicle.appendDocument(toWrite);
} }
} }
Expand Down
Expand Up @@ -26,35 +26,37 @@ public void testMappingReferenceCount() throws Exception {
File tempFile = File.createTempFile("chronicle", "q"); File tempFile = File.createTempFile("chronicle", "q");


try { try {
int BLOCK_SIZE = 64; int BLOCK_SIZE = 4096;
final MappedFile mappedFile = MappedFile.mappedFile(tempFile.getName(), BLOCK_SIZE, 8); final MappedFile mappedFile = MappedFile.mappedFile(tempFile.getName(), BLOCK_SIZE, 8);
final Bytes bytes = mappedFile.bytes(); final Bytes bytes = mappedFile.bytes();



// write into block 1 // write into block 1
bytes.writeLong(64 + 8, Long.MAX_VALUE); bytes.writeLong(4096 + 8, Long.MAX_VALUE);
// Assert.assertEquals(1, mappedFile.getRefCount(1)); // Assert.assertEquals(1, mappedFile.getRefCount(1));
assertEquals("", mappedFile.referenceCounts()); assertEquals("refCount: 2, 0, 2", mappedFile.referenceCounts());


// we move from block 1 to block 2 // we move from block 1 to block 2
bytes.writeLong((64 * 2) + 8, Long.MAX_VALUE); bytes.writeLong((4096 * 2) + 8, Long.MAX_VALUE);
// assertEquals(0, mappedFile.getRefCount(1)); // assertEquals(0, mappedFile.getRefCount(1));
// assertEquals(1, mappedFile.getRefCount(2)); // assertEquals(1, mappedFile.getRefCount(2));
assertEquals("", mappedFile.referenceCounts()); assertEquals("refCount: 3, 0, 1, 2", mappedFile.referenceCounts());




// we move from block 2 back to block 1 // we move from block 2 back to block 1
bytes.writeLong((64 * 1) + 8, Long.MAX_VALUE); bytes.writeLong((4096 * 1) + 8, Long.MAX_VALUE);
// assertEquals(1, mappedFile.getRefCount(1)); // assertEquals(1, mappedFile.getRefCount(1));
// assertEquals(0, mappedFile.getRefCount(2)); // assertEquals(0, mappedFile.getRefCount(2));
assertEquals("", mappedFile.referenceCounts()); assertEquals("refCount: 3, 0, 2, 1", mappedFile.referenceCounts());


// we move from block 2 back to block 1 // we move from block 2 back to block 1
bytes.writeLong((64 * 3) + 8, Long.MAX_VALUE); bytes.writeLong((4096 * 3) + 8, Long.MAX_VALUE);
// assertEquals(1, mappedFile.getRefCount(3)); // assertEquals(1, mappedFile.getRefCount(3));
assertEquals("", mappedFile.referenceCounts()); assertEquals("refCount: 4, 0, 1, 1, 2", mappedFile.referenceCounts());


bytes.release();
mappedFile.close();


assertEquals("refCount: 0, 0, 0, 0, 0", mappedFile.referenceCounts());
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
Expand Down

0 comments on commit de2c95e

Please sign in to comment.