Skip to content

Commit

Permalink
added - bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Jan 13, 2016
1 parent 60fd7f4 commit 0ae0c3a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 65 deletions.
Expand Up @@ -157,7 +157,7 @@ public long cycle() {
public static class StoreAppender extends DefaultAppender<AbstractChronicleQueue> {


private final Wire wire;
private Wire wire;
private long epoch;
private long cycle;
private long index = -1;
Expand All @@ -182,6 +182,7 @@ public StoreAppender(@NotNull AbstractChronicleQueue queue) throws IOException {
LOG.debug("appender file=" + mappedBytes.mappedFile().file().getAbsolutePath());
//this.writeContext = mappedBytes;
wire = this.queue().wireType().apply(mappedBytes);

}

@Override
Expand Down Expand Up @@ -232,7 +233,7 @@ private WireStore store() throws IOException {

this.cycle = nextCycle;
this.store = queue.storeForCycle(this.cycle, epoch);
//this.store.acquireBytesAtWritePositionForWrite(this.writeContext.bytes);
this.wire = this.queue().wireType().apply(store.mappedBytes());
}

return this.store;
Expand All @@ -258,9 +259,6 @@ public static class StoreTailer implements ExcerptTailer {
private long index;
private WireStore store;

//TODO: refactor
private boolean toStart;

public StoreTailer(@NotNull AbstractChronicleQueue queue) throws IOException {
this.queue = queue;
this.cycle = -1;
Expand Down Expand Up @@ -294,6 +292,8 @@ public boolean readDocument(@NotNull ReadMarshallable reader) throws IOException
// roll detected, move to next cycle;
cycle(Math.abs(position));
//context(store::acquireBytesAtReadPositionForRead);
wire.bytes().readPosition(0);
wire.bytes().readLimit(store.writePosition());

// try to read from new cycle
return readDocument(reader);
Expand Down Expand Up @@ -362,18 +362,17 @@ public boolean moveToIndex(long index) throws IOException {

cycle = expectedCycle;

final long position = this.store.moveToIndex(wire, subIndex(index));
final long subIndex = subIndex(index);
final long position = this.store.moveToIndex(wire, index);

if (position == -1)
return false;

final Bytes<?> readContext = wire.bytes();
readContext.readPosition(position);
readContext.readLimit(readContext.capacity());
this.index = index - 1;
this.index = ChronicleQueue.index(cycle, subIndex - 1);
return true;


}


Expand All @@ -385,12 +384,12 @@ public ExcerptTailer toStart() throws IOException {
if (index == -1)
return this;

LOG.info("index=> subIndex=" + ChronicleQueue.subIndex(index) + ",cycle=" + ChronicleQueue
LOG.info("index=> index=" + ChronicleQueue.subIndex(index) + ",cycle=" + ChronicleQueue
.cycle(index));


if (!moveToIndex(index))
throw new IllegalStateException("unable to move to the start");
throw new IllegalStateException("unable to move to the start, cycle=" + cycle);

return this;
}
Expand Down Expand Up @@ -420,6 +419,7 @@ private StoreTailer cycle(long cycle) throws IOException {
this.store = this.queue.storeForCycle(cycle, queue.epoch());

wire = queue.wireType().apply(store.mappedBytes());
moveToIndex(ChronicleQueue.index(cycle, 0));
if (LOG.isDebugEnabled())
LOG.debug("tailer=" + ((MappedBytes) wire.bytes()).mappedFile().file().getAbsolutePath());

Expand Down
Expand Up @@ -52,7 +52,20 @@ public class SingleChronicleQueueStore implements WireStore {
ClassAliasPool.CLASS_ALIASES.addAlias(Roll.class, "Roll");
}

private final WireType wireType;
static Wire TEXT_TEMPLATE;
static Wire BINARY_TEMPLATE;

static {
TEXT_TEMPLATE = WireType.TEXT.apply(Bytes.elasticByteBuffer());
TEXT_TEMPLATE.writeDocument(true, w -> w.write(() -> "index")
.int64array(NUMBER_OF_ENTRIES_IN_EACH_INDEX));

BINARY_TEMPLATE = WireType.BINARY.apply(Bytes.elasticByteBuffer());
BINARY_TEMPLATE.writeDocument(true, w -> w.write(() -> "index")
.int64array(NUMBER_OF_ENTRIES_IN_EACH_INDEX));
}

private WireType wireType;
private final Roll roll;
Bounds bounds = new Bounds();
private MappedFile mappedFile;
Expand All @@ -67,7 +80,6 @@ public class SingleChronicleQueueStore implements WireStore {
* Default constructor needed for self boot-strapping
*/
SingleChronicleQueueStore() {
this.wireType = WireType.BINARY;
this.roll = new Roll(null, 0);
}

Expand Down Expand Up @@ -229,7 +241,10 @@ public void install(
*/
@Override
public MappedBytes mappedBytes() {
return new MappedBytes(mappedFile);//.withSizes(this.chunkSize, this.overlapSize);
final MappedBytes mappedBytes = new MappedBytes(mappedFile);//.withSizes(this.chunkSize, this.overlapSize);
mappedBytes.writePosition(bounds.getWritePosition());
mappedBytes.readPosition(bounds.getReadPosition());
return mappedBytes;
}


Expand Down Expand Up @@ -288,8 +303,7 @@ private <T> long read(


final Bytes<?> context = wire.bytes();
for (; wire.bytes().readRemaining() > 0; ) {

while (wire.bytes().readLong(wire.bytes().readPosition()) != 0) {

final int spbHeader = context.readVolatileInt(context.readPosition());

Expand All @@ -308,19 +322,15 @@ private <T> long read(
return wire.bytes().readPosition();
}


// In case of meta data, if we are found the "roll" meta, we returns
// the next cycle (negative)
/* final StringBuilder sb = Wires.acquireStringBuilder();
final StringBuilder sb = Wires.acquireStringBuilder();

// todo improve this line
final ValueIn vi = wire.readEventName(sb);
if ("index".contentEquals(sb)) {
return read(wire, reader, marshaller);
} else if ("roll".contentEquals(sb)) {
if ("roll".contentEquals(sb)) {
return -vi.int32();
}*/
}

}

Expand Down Expand Up @@ -354,12 +364,15 @@ private long writeWireMarshallable(
@NotNull final WriteMarshallable marshallable) throws IOException {

final long positionDataWritten = Wires.writeData(wire, marshallable);
final Bytes<?> context = wire.bytes();


// todo improve this line
bounds.setWritePositionIfGreater(context.writePosition());
bounds.setWritePositionIfGreater(wire.bytes().writePosition());

final long index = indexing.incrementLastIndex();
indexing.storeIndexLocation(context, positionDataWritten, index);
indexing.storeIndexLocation(wire.bytes(), positionDataWritten, index);
System.out.println
("positionDataWritten=" + positionDataWritten + ",cycle=" + cycle() + ",index=" + index);
return index;
}

Expand Down Expand Up @@ -476,7 +489,7 @@ <T> long write(

@Override
public void writeMarshallable(@NotNull WireOut wire) {

wire.write(MetaDataField.wireType).object(wireType);
wire.write(MetaDataField.bounds).marshallable(this.bounds)
.write(MetaDataField.roll).object(this.roll)
.write(MetaDataField.chunkSize).int64(mappedFile.chunkSize())
Expand All @@ -488,7 +501,7 @@ public void writeMarshallable(@NotNull WireOut wire) {
public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException {

// System.out.println(Wires.fromSizePrefixedBlobs(wire.bytes()));

wireType = wire.read(MetaDataField.wireType).object(WireType.class);
wire.read(MetaDataField.bounds).marshallable(this.bounds);
wire.read(MetaDataField.roll).marshallable(this.roll);
long chunkSize = wire.read(MetaDataField.chunkSize).int64();
Expand All @@ -499,6 +512,7 @@ public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException {
indexing = new Indexing(wireType, mappedBytes.withSizes(chunkSize,
overlapSize));
wire.read(MetaDataField.indexing).marshallable(indexing);

}

// *************************************************************************
Expand All @@ -510,7 +524,7 @@ enum MetaDataField implements WireKey {
indexing,
roll,
chunkSize,
overlapSize
wireType, overlapSize
}

enum BoundsField implements WireKey {
Expand Down Expand Up @@ -675,13 +689,13 @@ class Indexing implements Marshallable {
this.firstIndex = wireType.newLongReference().get();

this.lastIndex = wireType.newLongReference().get();

final Bytes b = Bytes.elasticByteBuffer();

templateIndex = wireType.apply(b);
templateIndex.writeDocument(true, w -> w.write(() -> "index")
.int64array(NUMBER_OF_ENTRIES_IN_EACH_INDEX));

if (wireType == WireType.TEXT)
templateIndex = TEXT_TEMPLATE;
else if (wireType == WireType.BINARY)
templateIndex = BINARY_TEMPLATE;
else {
throw new UnsupportedOperationException("type is not supported");
}
this.wireType = wireType;
this.longArray = withInitial(wireType.newLongArrayReference());
this.indexContext = mappedBytes;
Expand Down Expand Up @@ -753,6 +767,7 @@ long indexToIndex(@Nullable final Bytes writeContext) {
return -1;

final long index = newIndex(writeContext);
System.out.println("index2Index=" + index);
this.index2Index.setOrderedValue(index);
return index;
}
Expand Down Expand Up @@ -805,15 +820,11 @@ public void storeIndexLocation(Bytes context,
}

private LongArrayValues array(WireIn w, LongArrayValues using) {

// final StringBuilder sb = Wires.acquireStringBuilder();
// final ValueIn valueIn = w.readEventName(sb);

//if (!"index".contentEquals(sb))
// throw new IllegalStateException("expecting and index");


w.read(() -> "index").int64array(using, this, (o1, o2) -> {
final StringBuilder sb = Wires.acquireStringBuilder();
final ValueIn valueIn = w.readEventName(sb);
if (!"index".contentEquals(sb))
throw new IllegalStateException("expecting index");
valueIn.int64array(using, this, (o1, o2) -> {
});
return using;
}
Expand Down Expand Up @@ -882,11 +893,11 @@ private long writeIndexBytes(
public long moveToIndex(@NotNull final Wire wire, final long index) {
final LongArrayValues array = this.longArray.get();
final long indexToIndex0 = indexToIndex(wire.bytes());

System.out.println("indexToIndex=" + indexToIndex0);
final Bytes<?> bytes = wire.bytes();
bytes.readLimit(indexContext.capacity()).readPosition(indexToIndex0);
long startIndex = ((index / 64L)) * 64L;

// final long limit = wire.bytes().readLimit();
try (@NotNull final DocumentContext documentContext0 = wire.readingDocument()) {

if (!documentContext0.isPresent())
Expand All @@ -910,20 +921,21 @@ public long moveToIndex(@NotNull final Wire wire, final long index) {
continue;
}

//indexContext.readLimit(indexContext.capacity());
final Wire wire1 = wireType.apply(indexContext.readPosition(secondaryAddress));
wire.bytes().readPosition(secondaryAddress);

final long limit = wire1.bytes().readLimit();
//indexContext.readLimit(indexContext.capacity());
// final Wire wire1 = wireType.apply(indexContext.readPosition
// (secondaryAddress));

try (@NotNull final DocumentContext documentContext1 = wire1.readingDocument()) {
try (@NotNull final DocumentContext documentContext1 = wire.readingDocument()) {

if (!documentContext1.isPresent())
throw new IllegalStateException("document is not present");

if (documentContext1.isData())
continue;

final LongArrayValues array1 = array(wire1, array);
final LongArrayValues array1 = array(wire, array);
long secondaryOffset = toAddress1(index);

do {
Expand All @@ -938,8 +950,8 @@ public long moveToIndex(@NotNull final Wire wire, final long index) {
if (index == startIndex) {
return fromAddress;
} else {
wire1.bytes().readLimit(limit);
return linearScan(wire1, index, startIndex, fromAddress);
wire.bytes().readLimit(bounds.getWritePosition());
return linearScan(wire, index, startIndex, fromAddress);
}

} while (secondaryOffset >= 0);
Expand Down Expand Up @@ -1076,14 +1088,6 @@ public Roll nextCycleMetaPosition(long position) {
return this;
}

public long nextCycleMetaPosition() {
return this.nextCycleMetaPosition.getVolatileValue();
}

public long nextRollCycle() {
return this.nextCycle.getVolatileValue();
}

public boolean casNextRollCycle(long rollCycle) {
return this.nextCycle.compareAndSwapValue(-1, rollCycle);
}
Expand Down
Expand Up @@ -183,6 +183,7 @@ public void testAppendAndReadWithRolling() throws IOException {
}
}


@Ignore("rob to fix")
@Test
public void testAppendAndReadWithRolling2() throws IOException {
Expand All @@ -191,13 +192,13 @@ public void testAppendAndReadWithRolling2() throws IOException {
final ChronicleQueue queue = new SingleChronicleQueueBuilder(dir)
.wireType(this.wireType)
.rollCycle(RollCycles.SECONDS)
.epoch(System.currentTimeMillis())
.epoch(1452701442361L)
.build();


final ExcerptAppender appender = queue.createAppender();
for (int i = 0; i < 10; i++) {
final int n = i;
final ExcerptAppender appender = queue.createAppender();
appender.writeDocument(w -> w.write(TestKey.test).int32(n));
Jvm.pause(500);
}
Expand All @@ -210,6 +211,7 @@ public void testAppendAndReadWithRolling2() throws IOException {
@Override
public void readMarshallable(@NotNull WireIn r) throws IORuntimeException {
assertEquals(n, r.read(TestKey.test).int32());
System.out.println("**** read=" + n);
}
});
assertTrue(condition);
Expand Down Expand Up @@ -304,7 +306,7 @@ public void testReadAtIndex() throws Exception {

StringBuilder sb = new StringBuilder();

for (int i : new int[]{65}) {
for (int i : new int[]{0, 64, 65, 66}) {
tailer.moveToIndex(index(cycle, i));
tailer.readDocument(wire -> wire.read(() -> "key").text(sb));
Assert.assertEquals("value=" + i, sb.toString());
Expand Down

0 comments on commit 0ae0c3a

Please sign in to comment.