Skip to content

Commit

Permalink
fixed most of the test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Jan 11, 2016
1 parent 0ccdc1e commit e02405d
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 114 deletions.
Expand Up @@ -37,6 +37,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


import static net.openhft.chronicle.bytes.Bytes.elasticByteBuffer; import static net.openhft.chronicle.bytes.Bytes.elasticByteBuffer;
import static net.openhft.chronicle.queue.ChronicleQueue.subIndex;


public class Excerpts { public class Excerpts {


Expand Down Expand Up @@ -266,6 +267,7 @@ public StoreTailer(@NotNull AbstractChronicleQueue queue) throws IOException {


toStart(); toStart();



// this.store = this.cycle > 0 ? queue.storeForCycle(this.cycle) : null; // this.store = this.cycle > 0 ? queue.storeForCycle(this.cycle) : null;
// this.index = this.cycle > 0 ? this.store.lastIndex() : -1; // this.index = this.cycle > 0 ? this.store.lastIndex() : -1;


Expand Down Expand Up @@ -360,14 +362,17 @@ public boolean index(long fullIndex) throws IOException {
if (nextCycle != queue.lastCycle()) if (nextCycle != queue.lastCycle())
cycle(nextCycle); cycle(nextCycle);


long index = ChronicleQueue.subIndex(fullIndex); final long position = this.store.moveToIndex(readContext, subIndex(fullIndex));
readContext.readPosition(position);
readContext.readLimit(readContext.capacity());


if (this.store.moveToIndex(readContext, index)) { if (position != -1) {
this.index = index - 1; this.index = fullIndex - 1;
return true; return true;
} }


return false; return false;

} }




Expand All @@ -384,6 +389,9 @@ public ExcerptTailer toStart() throws IOException {


this.index = -1; this.index = -1;


readContext.readPosition(0);
readContext.readLimit(readContext.capacity());

return this; return this;
} }


Expand Down Expand Up @@ -412,7 +420,7 @@ private StoreTailer cycle(long cycle) throws IOException {
} }
this.cycle = cycle; this.cycle = cycle;
this.index = -1; this.index = -1;
this.store = this.queue.storeForCycle(this.cycle, this.epoc); this.store = this.queue.storeForCycle(cycle, this.epoc);
this.readContext = store.mappedBytes(); this.readContext = store.mappedBytes();


if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
Expand All @@ -430,3 +438,4 @@ private StoreTailer context(@NotNull ThrowingAcceptor<Bytes, IOException> accept
} }
} }
} }

Expand Up @@ -72,7 +72,7 @@ public interface WireStore extends ReferenceCounted, Marshallable {


long read(@NotNull MappedBytes context, @NotNull ReadBytesMarshallable reader) throws IOException; long read(@NotNull MappedBytes context, @NotNull ReadBytesMarshallable reader) throws IOException;


boolean moveToIndex(@NotNull MappedBytes context, long index); long moveToIndex(@NotNull MappedBytes context, long index);


void install( void install(
@NotNull MappedBytes mappedBytes, @NotNull MappedBytes mappedBytes,
Expand Down
Expand Up @@ -74,8 +74,8 @@ public class SingleChronicleQueueStore implements WireStore {
* @param rollCycle * @param rollCycle
* @param wireType * @param wireType
* @param mappedBytes * @param mappedBytes
* @param rollEpoc sets an epoc offset as the number of number of milliseconds since January * @param rollEpoc sets an epoc offset as the number of number of milliseconds since January
* 1, 1970, 00:00:00 GMT * 1, 1970, 00:00:00 GMT
*/ */
SingleChronicleQueueStore(@Nullable RollCycle rollCycle, SingleChronicleQueueStore(@Nullable RollCycle rollCycle,
final WireType wireType, final WireType wireType,
Expand Down Expand Up @@ -140,6 +140,10 @@ public long read(@NotNull MappedBytes context, @NotNull ReadMarshallable reader)
return read(context, this::readWireMarshallable, reader); return read(context, this::readWireMarshallable, reader);
} }


private <T> long readWireMarshallable(MappedBytes mappedBytes, long l, T t) {
return 0;
}

@Override @Override
public long read(@NotNull MappedBytes context, @NotNull ReadBytesMarshallable reader) throws IOException { public long read(@NotNull MappedBytes context, @NotNull ReadBytesMarshallable reader) throws IOException {
return read(context, this::readBytesMarshallable, reader); return read(context, this::readBytesMarshallable, reader);
Expand Down Expand Up @@ -169,7 +173,7 @@ public boolean appendRollMeta(@NotNull MappedBytes context, long cycle) throws I
} }


@Override @Override
public boolean moveToIndex(@NotNull MappedBytes context, long index) { public long moveToIndex(@NotNull MappedBytes context, long index) {
return indexing.moveToIndex(context, index); return indexing.moveToIndex(context, index);
} }


Expand Down Expand Up @@ -213,7 +217,7 @@ public void install(


@Override @Override
public MappedBytes mappedBytes() { public MappedBytes mappedBytes() {
return mappedBytes; return new MappedBytes(mappedBytes.mappedFile());
} }


// ************************************************************************* // *************************************************************************
Expand Down Expand Up @@ -269,14 +273,31 @@ private <T> long read(
@NotNull Reader<T> reader, @NotNull Reader<T> reader,
@NotNull T marshaller) throws IOException { @NotNull T marshaller) throws IOException {


long position = context.readPosition(); final Wire wire = wireType.apply(context);

for (; wire.bytes().readRemaining() > 0; ) {

final int spbHeader = context.readVolatileInt(context.readPosition());

if (Wires.isNotInitialized(spbHeader) || !Wires.isReady(spbHeader)) {
Thread.yield();
continue;
}


final long start = wire.bytes().readPosition();
//boolean wasDocument =false
try (@NotNull final DocumentContext _ = wire.readingDocument()) {
if (!_.isPresent())
throw new IllegalStateException("document not present");

if (_.isData()) {
// wasDocument = true;
// wire.bytes().readPosition(start);
((ReadMarshallable) marshaller).readMarshallable(wire);
return wire.bytes().readPosition();
}


final int spbHeader = context.readVolatileInt(position);
if (!Wires.isNotInitialized(spbHeader) && Wires.isReady(spbHeader)) {
int len = Wires.lengthOf(spbHeader);
if (Wires.isData(spbHeader)) {
return reader.read(context, len, marshaller);
} else {
// In case of meta data, if we are found the "roll" meta, we returns // In case of meta data, if we are found the "roll" meta, we returns
// the next cycle (negative) // the next cycle (negative)
final StringBuilder sb = Wires.acquireStringBuilder(); final StringBuilder sb = Wires.acquireStringBuilder();
Expand All @@ -288,20 +309,19 @@ private <T> long read(
return read(context, reader, marshaller); return read(context, reader, marshaller);
} else if ("roll".contentEquals(sb)) { } else if ("roll".contentEquals(sb)) {
return -vi.int32(); return -vi.int32();
} else {
context.readLimit(context.capacity());
context.readPosition(position + len + SPB_DATA_HEADER_SIZE);
return read(context, reader, marshaller);
} }

} }

} }


return WireConstants.NO_DATA; return WireConstants.NO_DATA;
} }


//TODO : maybe move to wire //TODO : maybe move to wire
@ForceInline @ForceInline
private long readWire(@NotNull WireIn wireIn, long size, @NotNull ReadMarshallable dataConsumer) { private long readWire(@NotNull WireIn wireIn, long size,
@NotNull ReadMarshallable dataConsumer) {
final Bytes<?> bytes = wireIn.bytes(); final Bytes<?> bytes = wireIn.bytes();
final long limit0 = bytes.readLimit(); final long limit0 = bytes.readLimit();
final long limit = bytes.readPosition() + size; final long limit = bytes.readPosition() + size;
Expand Down Expand Up @@ -424,10 +444,8 @@ public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException {
long chunkSize = wire.read(MetaDataField.chunkSize).int64(); long chunkSize = wire.read(MetaDataField.chunkSize).int64();
long overlapSize = wire.read(MetaDataField.overlapSize).int64(); long overlapSize = wire.read(MetaDataField.overlapSize).int64();


final Bytes<?> bytes1 = wire.bytes(); final MappedBytes mappedBytes = new MappedBytes(this.mappedBytes.mappedFile());
final MappedBytes bytes = (MappedBytes) bytes1; indexing = new Indexing(wireType, mappedBytes.withSizes(chunkSize, overlapSize));
MappedBytes mappedBytes = bytes.withSizes(chunkSize, overlapSize);
indexing = new Indexing(wireType, mappedBytes);
wire.read(MetaDataField.indexing).marshallable(indexing); wire.read(MetaDataField.indexing).marshallable(indexing);
} }


Expand All @@ -439,7 +457,6 @@ enum MetaDataField implements WireKey {
bounds, bounds,
indexing, indexing,
roll, roll,
mappedFile,
chunkSize, chunkSize,
overlapSize overlapSize
} }
Expand Down Expand Up @@ -593,8 +610,6 @@ class Indexing implements Marshallable {
private LongValue index2Index; private LongValue index2Index;
private LongValue lastIndex; private LongValue lastIndex;
private ThreadLocal<LongArrayValues> longArray; private ThreadLocal<LongArrayValues> longArray;
private LongArrayValues values;
private boolean indexSuccess;
private long startIndex; private long startIndex;


Indexing(@NotNull WireType wireType, final MappedBytes mappedBytes) { Indexing(@NotNull WireType wireType, final MappedBytes mappedBytes) {
Expand Down Expand Up @@ -785,22 +800,27 @@ private long writeIndexBytes(
* occurs. The indexes are only built when the indexer is run, this could be on a background * occurs. The indexes are only built when the indexer is run, this could be on a background
* thread. Each targetIndex is created into chronicle as an excerpt. * thread. Each targetIndex is created into chronicle as an excerpt.
*/ */
public boolean moveToIndex(MappedBytes context, final long targetIndex) { public long moveToIndex(MappedBytes context, final long targetIndex) {
final LongArrayValues array = this.longArray.get(); final LongArrayValues array = this.longArray.get();
final long indexToIndex0 = indexToIndex(context); final long indexToIndex0 = indexToIndex(context);


final MappedBytes indexBytes = indexContext; final Wire w = wireType.apply(indexContext);
indexBytes.readLimit(indexBytes.capacity()).readPosition(indexToIndex0); final Bytes<?> bytes = w.bytes();
bytes.readLimit(indexContext.capacity()).readPosition(indexToIndex0);


final Wire w = wireType.apply(indexBytes);
indexSuccess = false;
this.startIndex = ((targetIndex / 64L)) * 64L; this.startIndex = ((targetIndex / 64L)) * 64L;
long result = bytes.readPosition();


w.readDocument(d -> { try (@NotNull final DocumentContext documentContext0 = w.readingDocument()) {
// todo improve this


if (!documentContext0.isPresent())
throw new IllegalStateException("document is not present");


final LongArrayValues primaryIndex = array(d, array); if (documentContext0.isData())
throw new IllegalStateException("Invalid index, expecting and index at " +
"pos=" + indexToIndex0 + ", but found data instead.");

final LongArrayValues primaryIndex = array(w, array);
long primaryOffset = toAddress0(targetIndex); long primaryOffset = toAddress0(targetIndex);


do { do {
Expand All @@ -814,18 +834,23 @@ public boolean moveToIndex(MappedBytes context, final long targetIndex) {
continue; continue;
} }


indexBytes.readLimit(indexBytes.capacity()); //indexContext.readLimit(indexContext.capacity());
final Bytes bytes = indexBytes.readPosition(secondaryAddress); final Wire wire1 = wireType.apply(indexContext.readPosition(secondaryAddress));
final Wire wire1 = wireType.apply(bytes);


wire1.readDocument(document -> { final long limit = wire1.bytes().readLimit();


final LongArrayValues array1 = array(document, array); try (@NotNull final DocumentContext documentContext1 = wire1.readingDocument()) {
long secondaryOffset = toAddress1(targetIndex);


do { if (!documentContext1.isPresent())
throw new IllegalStateException("document is not present");

if (documentContext1.isData())
continue;


final LongArrayValues array1 = array(wire1, array);
long secondaryOffset = toAddress1(targetIndex);


do {
long fromAddress = array1.getValueAt(secondaryOffset); long fromAddress = array1.getValueAt(secondaryOffset);
if (fromAddress == 0) { if (fromAddress == 0) {
secondaryOffset--; secondaryOffset--;
Expand All @@ -835,32 +860,27 @@ public boolean moveToIndex(MappedBytes context, final long targetIndex) {
} }


if (targetIndex == startIndex) { if (targetIndex == startIndex) {
readPosition(context, fromAddress); return fromAddress;
indexSuccess = true; } else {

wire1.bytes().readLimit(limit);
} else return linearScan(wire1, targetIndex, startIndex, fromAddress);
indexSuccess = linearScan(context, targetIndex, startIndex, fromAddress); }



break;
} while (secondaryOffset >= 0); } while (secondaryOffset >= 0);


}, null); }

break; break;


} while (primaryOffset >= 0); } while (primaryOffset >= 0);
}, null); }

return indexSuccess;
}




private void readPosition(MappedBytes context, long position) { return result;


context.readLimit(context.capacity());
context.readPosition(position);
} }



/** /**
* moves the context to the index of {@code toIndex} by doing a linear scans form a {@code * moves the context to the index of {@code toIndex} by doing a linear scans form a {@code
* fromKnownIndex} at {@code knownAddress} <p/> note meta data is skipped and does not * fromKnownIndex} at {@code knownAddress} <p/> note meta data is skipped and does not
Expand All @@ -871,41 +891,44 @@ private void readPosition(MappedBytes context, long position) {
* @param toIndex the index that we wish to move the context to * @param toIndex the index that we wish to move the context to
* @param fromKnownIndex a know index ( used as a starting point ) * @param fromKnownIndex a know index ( used as a starting point )
* @param knownAddress a know address ( used as a starting point ) * @param knownAddress a know address ( used as a starting point )
* @return {@code true} if successful * @return > -1, if successful
* @see net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore.Indexing#moveToIndex * @see net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore.Indexing#moveToIndex
*/ */
private boolean linearScan(MappedBytes context, long toIndex, long fromKnownIndex, long knownAddress) { private long linearScan(Wire context, long toIndex, long fromKnownIndex, long knownAddress) {

long position = knownAddress;


for (long i = fromKnownIndex; i <= toIndex; ) { final Bytes<?> bytes = context.bytes();


readPosition(context, position); final long p = bytes.readPosition();
final long l = bytes.readLimit();


final int spbHeader = context.readVolatileInt(position); bytes.readPosition(knownAddress);
if (Wires.isReady(spbHeader)) {
if (Wires.isData(spbHeader)) {
if (toIndex == i) {
return true;
}


i++; for (long i = fromKnownIndex; bytes.readRemaining() > 0; ) {
}


// todo // wait until ready - todo add timeout
// if (i % 64 == 0) for (; ; ) {
// storeIndexLocation(context,context.readPosition(),i); if (Wires.isReady(bytes.readVolatileInt(bytes.readPosition()))) {


final int len = Wires.lengthOf(spbHeader); break;
context.readSkip(len + SPB_DATA_HEADER_SIZE); } else
position = context.readPosition(); Thread.yield();
} else {
return false;
} }
}


try (@NotNull final DocumentContext documentContext = context.readingDocument()) {

if (!documentContext.isPresent())
throw new IllegalStateException("document is not present");


return false; if (!documentContext.isData())
continue;

if (toIndex == i)
return context.bytes().readPosition() - 4;
i++;
}
}
bytes.readLimit(l).readPosition(p);
return -1;
} }
} }


Expand Down

0 comments on commit e02405d

Please sign in to comment.