Skip to content

Commit

Permalink
QUEUE-5 using the jetbrains @NotNull and adding "Index" via wire
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Feb 15, 2015
1 parent a2ab045 commit 056deeb
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 43 deletions.
5 changes: 5 additions & 0 deletions chronicle-queue/pom.xml
Expand Up @@ -95,6 +95,11 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>


<dependency>
<groupId>com.intellij</groupId>
<artifactId>annotations</artifactId>
</dependency>

<!-- for testing --> <!-- for testing -->
<dependency> <dependency>
<groupId>org.easymock</groupId> <groupId>org.easymock</groupId>
Expand Down
@@ -1,6 +1,7 @@
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.lang.model.constraints.NotNull;
import org.jetbrains.annotations.NotNull;


import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
Expand Down
Expand Up @@ -18,8 +18,10 @@


package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;




import net.openhft.chronicle.wire.Wire; import net.openhft.chronicle.wire.Wire;
import net.openhft.lang.model.constraints.NotNull; import org.jetbrains.annotations.NotNull;


/** /**
* The main data container of a {@link ChronicleQueue}, an extended version of {@link ExcerptTailer} which also facilitates * The main data container of a {@link ChronicleQueue}, an extended version of {@link ExcerptTailer} which also facilitates
Expand Down
Expand Up @@ -19,7 +19,8 @@
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.wire.WireIn; import net.openhft.chronicle.wire.WireIn;
import net.openhft.lang.model.constraints.NotNull; import org.jetbrains.annotations.NotNull;



import java.util.function.Function; import java.util.function.Function;


Expand Down
@@ -1,6 +1,7 @@
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.lang.model.constraints.NotNull;
import org.jetbrains.annotations.NotNull;


/** /**
* Created by peter on 29/01/15. * Created by peter on 29/01/15.
Expand Down
Expand Up @@ -36,8 +36,6 @@ public static void index(@NotNull final ChronicleQueue chronicle) throws Excepti
final Function<WireIn, Object> reader = wireIn -> { final Function<WireIn, Object> reader = wireIn -> {


long address = wireIn.bytes().position() - 4; long address = wireIn.bytes().position() - 4;
System.out.print("address=" + address);

recordAddress(index, address, single, index2Index); recordAddress(index, address, single, index2Index);
wireIn.bytes().skip(wireIn.bytes().remaining()); wireIn.bytes().skip(wireIn.bytes().remaining());
return null; return null;
Expand Down Expand Up @@ -65,20 +63,19 @@ private static void recordAddress(long index,
if (index % 64 != 0) if (index % 64 != 0)
return; return;


System.out.println("index2Index=" + index2Index);
long offset = IndexOffset.toAddress0(index); long offset = IndexOffset.toAddress0(index);
Bytes chronicleBytes = chronicleQueue.bytes(); Bytes chronicleBytes = chronicleQueue.bytes();
long rootOffset = index2Index + offset; long rootOffset = index2Index + offset;
System.out.println("rootOffset=" + rootOffset);
long refToSecondary = chronicleBytes.readVolatileLong(rootOffset); long refToSecondary = chronicleBytes.readVolatileLong(rootOffset);


if (refToSecondary == UNINITIALISED) { if (refToSecondary == UNINITIALISED) {
boolean success = chronicleBytes.compareAndSwapLong(rootOffset, UNINITIALISED, BUILDING); boolean success = chronicleBytes.compareAndSwapLong(rootOffset, UNINITIALISED, BUILDING);
if (!success) { if (!success) {
refToSecondary = chronicleBytes.readVolatileLong(rootOffset); refToSecondary = chronicleBytes.readVolatileLong(rootOffset);
} else { } else {
refToSecondary = chronicleQueue.newIndexToIndex(); refToSecondary = chronicleQueue.newIndex();
System.out.println("refToSecondary=" + refToSecondary);
chronicleBytes.writeOrderedLong(rootOffset, refToSecondary); chronicleBytes.writeOrderedLong(rootOffset, refToSecondary);
} }
} }
Expand All @@ -87,7 +84,6 @@ private static void recordAddress(long index,
assert l == UNINITIALISED; assert l == UNINITIALISED;


long offset1 = refToSecondary + IndexOffset.toAddress1(index); long offset1 = refToSecondary + IndexOffset.toAddress1(index);
System.out.println("offset1=" + offset1);
chronicleBytes.bytes().writeLong(offset1, address); chronicleBytes.bytes().writeLong(offset1, address);




Expand All @@ -96,7 +92,6 @@ private static void recordAddress(long index,
enum IndexOffset { enum IndexOffset {
; ;



static long toAddress0(long index) { static long toAddress0(long index) {


long siftedIndex = index >> (17L + 6L); long siftedIndex = index >> (17L + 6L);
Expand Down
Expand Up @@ -2,12 +2,12 @@


import net.openhft.chronicle.queue.*; import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.wire.BinaryWire; import net.openhft.chronicle.wire.BinaryWire;
import net.openhft.chronicle.wire.ValueOut;
import net.openhft.chronicle.wire.WireKey; import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.lang.Jvm; import net.openhft.lang.Jvm;
import net.openhft.lang.io.Bytes; import net.openhft.lang.io.*;
import net.openhft.lang.io.MappedFile; import net.openhft.lang.model.DataValueClasses;
import net.openhft.lang.io.MappedMemory;
import net.openhft.lang.io.MultiStoreBytes;
import net.openhft.lang.values.LongValue; import net.openhft.lang.values.LongValue;


import java.io.IOException; import java.io.IOException;
Expand All @@ -19,12 +19,13 @@
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;


import static net.openhft.chronicle.wire.BinaryWire.isDocument; import static net.openhft.chronicle.wire.BinaryWire.isDocument;


/** /**
* SingleChronicle implements Chronicle over a single streaming file * SingleChronicle implements Chronicle over a single streaming file
* * <p>
* Created by peter on 30/01/15. * Created by peter on 30/01/15.
*/ */
public class SingleChronicleQueue implements ChronicleQueue, DirectChronicleQueue { public class SingleChronicleQueue implements ChronicleQueue, DirectChronicleQueue {
Expand Down Expand Up @@ -247,7 +248,7 @@ public long firstBytes() {




/** /**
* @return gets the index2index, and creates on if it does not exist * @return gets the index2index, or creates it, if it does not exist.
*/ */
long indexToIndex() { long indexToIndex() {
for (; ; ) { for (; ; ) {
Expand All @@ -263,7 +264,7 @@ long indexToIndex() {
if (!header.index2Index.compareAndSwapValue(UNINITIALISED, NOT_READY)) if (!header.index2Index.compareAndSwapValue(UNINITIALISED, NOT_READY))
continue; continue;


long indexToIndex = newIndexToIndex(); long indexToIndex = newIndex();
header.index2Index.setOrderedValue(indexToIndex); header.index2Index.setOrderedValue(indexToIndex);
return indexToIndex; return indexToIndex;
} }
Expand All @@ -274,10 +275,11 @@ long indexToIndex() {
* *
* @return the address of the Excerpt * @return the address of the Excerpt
*/ */
long newIndexToIndex() { long newIndex() {


// the space required for 17bits // the space required for 17bits
long length = 8L * (1L << 17L); long wireLen = 6;
long length = wireLen + 8L * (1L << 17L);


long firstByte; long firstByte;
LongValue writeByte = header.writeByte; LongValue writeByte = header.writeByte;
Expand All @@ -286,11 +288,16 @@ long newIndexToIndex() {
for (; ; ) { for (; ; ) {


if (bytes.compareAndSwapInt(lastByte, 0, NOT_READY | (int) length)) { if (bytes.compareAndSwapInt(lastByte, 0, NOT_READY | (int) length)) {
firstByte = lastByte + 4;
new BinaryWire(bytes.bytes(lastByte + 4, wireLen)).write(() -> "Index");
firstByte = lastByte + 4 + wireLen;

long lastByte2 = firstByte + length; long lastByte2 = firstByte + length;
header.lastIndex.addAtomicValue(1); header.lastIndex.addAtomicValue(1);
writeByte.setValue(lastByte2); writeByte.setValue(lastByte2);
bytes.writeOrderedInt(lastByte, (int) length); bytes.writeOrderedInt(lastByte, (int) length);
long l = lastByte + 4 + wireLen + length;
bytes.skip(length30((int)l));
return firstByte; return firstByte;
} }


Expand Down
Expand Up @@ -66,31 +66,15 @@ public void testSimpleDirect() throws Exception {
appender.writeDocument(wire -> wire.write(() -> "key").text("value=" + j)); appender.writeDocument(wire -> wire.write(() -> "key").text("value=" + j));
} }


StringBuilder first = new StringBuilder();
StringBuilder surname = new StringBuilder();

final ExcerptTailer tailer = chronicle.createTailer(); final ExcerptTailer tailer = chronicle.createTailer();
final Bytes toRead = DirectStore.allocate(512).bytes();
ObjectSerializer objectSerializer = toRead.objectSerializer();
System.out.println(objectSerializer);


Bytes bytes = chronicle.bytes();


for (int i = 0; i < chronicle.lastIndex(); i++) { for (int i = 0; i < chronicle.lastIndex(); i++) {



tailer.readDocument(wireIn -> {
// System.out.println(AbstractBytes.toHex(toRead.flip())); Bytes bytes1 = wireIn.bytes();
tailer.readDocument(new Function<WireIn, Object>() { long remaining = bytes1.remaining();
@Override bytes1.skip(remaining);
public Object apply(WireIn wireIn) { return null;
Bytes bytes = wireIn.bytes();
long remaining = bytes.remaining();
System.out.println(bytes.position());
bytes.skip(remaining);
System.out.println(bytes.position());
return null;
}
}); });
} }


Expand Down Expand Up @@ -259,6 +243,44 @@ public void testReadAtIndexWithIndexes() throws Exception {
// creates the indexes // creates the indexes
Indexer.index(chronicle); Indexer.index(chronicle);


tailer.index(67);

StringBuilder sb = new StringBuilder();
tailer.readDocument(wire -> wire.read(() -> "key").text(sb));

Assert.assertEquals("value=67", sb.toString());

} finally {
file.delete();
}

}

@Test
public void testReadAtIndexWithIndexesAtStart() throws Exception {

File file = File.createTempFile("chronicle.", "q");
file.deleteOnExit();
try {

DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getName()).build();

final ExcerptAppender appender = chronicle.createAppender();
appender.writeDocument(wire -> wire.write(() -> "key").text("value=" + 0));

// creates the indexes
Indexer.index(chronicle);

// create 100 documents
for (int i = 1; i < 100; i++) {
final int j = i;
appender.writeDocument(wire -> wire.write(() -> "key").text("value=" + j));
}

final ExcerptTailer tailer = chronicle.createTailer();



tailer.index(67); tailer.index(67);


StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
Expand Down

0 comments on commit 056deeb

Please sign in to comment.