Skip to content

Commit

Permalink
Cleanup Read/Write Context
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Oct 29, 2015
1 parent f4e646e commit 8cb0ba8
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 115 deletions.
Expand Up @@ -18,7 +18,9 @@




import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.VanillaBytes;
import net.openhft.chronicle.core.annotation.ForceInline; import net.openhft.chronicle.core.annotation.ForceInline;
import net.openhft.chronicle.core.util.ThrowingAcceptor;
import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
Expand All @@ -30,7 +32,6 @@
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.ToLongFunction;


import static net.openhft.chronicle.bytes.Bytes.elasticByteBuffer; import static net.openhft.chronicle.bytes.Bytes.elasticByteBuffer;


Expand Down Expand Up @@ -208,18 +209,19 @@ public boolean readDocument(ReadMarshallable reader) throws IOException {
} }


//TODO: what should be done at the beginning ? toEnd/toStart //TODO: what should be done at the beginning ? toEnd/toStart
cycle(lastCycle, WireStore::readPosition); cycle(lastCycle);
context(store::acquireBytesAtReadPositionForRead);
} }


long position = store.read(this.context, reader); long position = store.read(this.context, reader);
if(position > 0) { if(position > 0) {
this.context.position(position);
this.index++; this.index++;


return true; return true;
} else if(position < 0) { } else if(position < 0) {
// roll detected, move to next cycle; // roll detected, move to next cycle;
cycle(Math.abs(position), WireStore::readPosition); cycle(Math.abs(position));
context(store::acquireBytesAtReadPositionForRead);


// try to read from new cycle // try to read from new cycle
return readDocument(reader); return readDocument(reader);
Expand Down Expand Up @@ -250,14 +252,13 @@ public long cycle() {
@Override @Override
public boolean index(long index) throws IOException { public boolean index(long index) throws IOException {
if(this.store == null) { if(this.store == null) {
cycle(queue.lastCycle(), WireStore::readPosition); cycle(queue.lastCycle());
context(store::acquireBytesAtReadPositionForRead);
} }


long idxpos = this.store.positionForIndex(index); this.context.clear();
if(idxpos != WireConstants.NO_INDEX) { if(this.store.moveToIndex(this.context, index)) {
this.context.position(idxpos);
this.index = index - 1; this.index = index - 1;

return true; return true;
} }


Expand All @@ -266,15 +267,18 @@ public boolean index(long index) throws IOException {


@Override @Override
public boolean index(long cycle, long index) throws IOException { public boolean index(long cycle, long index) throws IOException {
cycle(cycle, WireStore::readPosition); cycle(cycle);
context(store::acquireBytesAtReadPositionForRead);

return index(index); return index(index);
} }


@Override @Override
public ExcerptTailer toStart() throws IOException { public ExcerptTailer toStart() throws IOException {
long firstCycle = queue.firstCycle(); long firstCycle = queue.firstCycle();
if(firstCycle > 0) { if(firstCycle > 0) {
cycle(firstCycle, WireStore::readPosition); cycle(firstCycle);
context(store::acquireBytesAtReadPositionForRead);
this.toStart = false; this.toStart = false;
} else { } else {
this.toStart = true; this.toStart = true;
Expand All @@ -287,7 +291,8 @@ public ExcerptTailer toStart() throws IOException {
public ExcerptTailer toEnd() throws IOException { public ExcerptTailer toEnd() throws IOException {
long firstCycle = queue.firstCycle(); long firstCycle = queue.firstCycle();
if(firstCycle > 0) { if(firstCycle > 0) {
cycle(firstCycle, WireStore::writePosition); cycle(firstCycle);
context(store::acquireBytesAtWritePositionForRead);
} }


this.toStart = false; this.toStart = false;
Expand All @@ -300,7 +305,7 @@ public ChronicleQueue queue() {
return this.queue; return this.queue;
} }


private void cycle(long cycle, @NotNull ToLongFunction<WireStore> positionSupplier) throws IOException { private StoreTailer cycle(long cycle) throws IOException {
if(this.cycle != cycle) { if(this.cycle != cycle) {
if(null != this.store) { if(null != this.store) {
this.queue.release(this.store); this.queue.release(this.store);
Expand All @@ -310,8 +315,14 @@ private void cycle(long cycle, @NotNull ToLongFunction<WireStore> positionSuppli
this.index = -1; this.index = -1;
this.store = this.queue.storeForCycle(this.cycle); this.store = this.queue.storeForCycle(this.cycle);
this.context.clear(); this.context.clear();
this.context.position(positionSupplier.applyAsLong(this.store));
} }

return this;
}

private StoreTailer context(@NotNull ThrowingAcceptor<VanillaBytes, IOException> acceptor) throws IOException{
acceptor.accept(this.context.bytes);
return this;
} }
} }
} }
Expand Up @@ -29,12 +29,9 @@ public class ReadContext {
public final VanillaBytes bytes; public final VanillaBytes bytes;
public final WireIn wire; public final WireIn wire;


private long position;

public ReadContext(@NotNull WireType wireType) { public ReadContext(@NotNull WireType wireType) {
this.bytes = VanillaBytes.vanillaBytes(); this.bytes = VanillaBytes.vanillaBytes();
this.wire = wireType.apply(this.bytes); this.wire = wireType.apply(this.bytes);
this.position = 0;
} }


public WireIn wire(long position, long size) { public WireIn wire(long position, long size) {
Expand All @@ -45,32 +42,7 @@ public WireIn wire(long position, long size) {
return this.wire; return this.wire;
} }


public long position() {
return this.position;
}

public ReadContext position(long position) {
this.position = position;
return this;
}

public void clear() { public void clear() {
this.bytes.bytesStore(noBytesStore(), 0, 0); this.bytes.bytesStore(noBytesStore(), 0, 0);
} }

/*
public ReadContext store(@NotNull BytesStore store, long position) {
return store(store, position, store.writeLimit() - position);
}
public ReadContext store(@NotNull BytesStore store, long position, long size) {
if(store != this.bytes.bytesStore()) {
this.bytes.bytesStore(store, position, size);
}
this.position = position;
return this;
}
*/
} }
Expand Up @@ -17,6 +17,7 @@


import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MappedFile; import net.openhft.chronicle.bytes.MappedFile;
import net.openhft.chronicle.bytes.VanillaBytes;
import net.openhft.chronicle.core.ReferenceCounted; import net.openhft.chronicle.core.ReferenceCounted;
import net.openhft.chronicle.queue.ChronicleQueueBuilder; import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.wire.Marshallable; import net.openhft.chronicle.wire.Marshallable;
Expand Down Expand Up @@ -44,12 +45,44 @@ public interface WireStore extends ReferenceCounted, Marshallable {
*/ */
long readPosition(); long readPosition();


/**
*
* @param bytes
* @return
* @throws IOException
*/
void acquireBytesAtReadPositionForRead(@NotNull VanillaBytes<?> bytes) throws IOException;

/**
*
* @param bytes
* @return
* @throws IOException
*/
void acquireBytesAtReadPositionForWrite(@NotNull VanillaBytes<?> bytes) throws IOException;

/** /**
* *
* @return the first writable position * @return the first writable position
*/ */
long writePosition(); long writePosition();


/**
*
* @param bytes
* @return
* @throws IOException
*/
void acquireBytesAtWritePositionForRead(@NotNull VanillaBytes<?> bytes) throws IOException;

/**
*
* @param bytes
* @return
* @throws IOException
*/
void acquireBytesAtWritePositionForWrite(@NotNull VanillaBytes<?> bytes) throws IOException;

/** /**
* *
* @return the last index * @return the last index
Expand Down Expand Up @@ -94,10 +127,11 @@ public interface WireStore extends ReferenceCounted, Marshallable {


/** /**
* *
* @param context
* @param index * @param index
* @return * @return
*/ */
long positionForIndex(long index); boolean moveToIndex(@NotNull ReadContext context, long index);


/** /**
* *
Expand Down
Expand Up @@ -19,8 +19,10 @@
import net.openhft.chronicle.bytes.BytesStore; import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.bytes.IORuntimeException; import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.bytes.MappedFile; import net.openhft.chronicle.bytes.MappedFile;
import net.openhft.chronicle.bytes.VanillaBytes;
import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.ReferenceCounter; import net.openhft.chronicle.core.ReferenceCounter;
import net.openhft.chronicle.core.annotation.ForceInline;
import net.openhft.chronicle.core.pool.ClassAliasPool; import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.core.values.LongValue; import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder; import net.openhft.chronicle.queue.ChronicleQueueBuilder;
Expand Down Expand Up @@ -100,11 +102,31 @@ public long readPosition() {
return this.bounds.getReadPosition(); return this.bounds.getReadPosition();
} }


@Override
public void acquireBytesAtReadPositionForRead(@NotNull VanillaBytes<?> bytes) throws IOException {
this.mappedFile.acquireBytesForRead(readPosition(), bytes);
}

@Override
public void acquireBytesAtReadPositionForWrite(@NotNull VanillaBytes<?> bytes) throws IOException {
this.mappedFile.acquireBytesForWrite(readPosition(), bytes);
}

@Override @Override
public long writePosition() { public long writePosition() {
return this.bounds.getWritePosition(); return this.bounds.getWritePosition();
} }


@Override
public void acquireBytesAtWritePositionForRead(@NotNull VanillaBytes<?> bytes) throws IOException {
this.mappedFile.acquireBytesForRead(writePosition(), bytes);
}

@Override
public void acquireBytesAtWritePositionForWrite(@NotNull VanillaBytes<?> bytes) throws IOException {
this.mappedFile.acquireBytesForWrite(writePosition(), bytes);
}

@Override @Override
public long cycle() { public long cycle() {
return this.roll.getCycle(); return this.roll.getCycle();
Expand Down Expand Up @@ -160,6 +182,12 @@ public long append(@NotNull WriteContext context, @NotNull final Bytes bytes) th
*/ */
@Override @Override
public long read(@NotNull ReadContext context, @NotNull ReadMarshallable reader) throws IOException { public long read(@NotNull ReadContext context, @NotNull ReadMarshallable reader) throws IOException {
long position = context.bytes.readPosition();
if(context.bytes.readRemaining() == 0) {
mappedFile.acquireBytesForRead(position, context.bytes);
}

/*
long position = context.position(); long position = context.position();
if (position > context.bytes.safeLimit()) { if (position > context.bytes.safeLimit()) {
mappedFile.acquireBytesForRead(position, context.bytes); mappedFile.acquireBytesForRead(position, context.bytes);
Expand All @@ -168,11 +196,14 @@ public long read(@NotNull ReadContext context, @NotNull ReadMarshallable reader)
mappedFile.acquireBytesForRead(position, context.bytes); mappedFile.acquireBytesForRead(position, context.bytes);
context.position(position); context.position(position);
} }
*/


final int spbHeader = context.bytes.readVolatileInt(position); final int spbHeader = context.bytes.readVolatileInt(position);
if(!Wires.isNotInitialized(spbHeader) && Wires.isReady(spbHeader)) { if(!Wires.isNotInitialized(spbHeader) && Wires.isReady(spbHeader)) {
int len = Wires.lengthOf(spbHeader);
if(Wires.isData(spbHeader)) { if(Wires.isData(spbHeader)) {
return Wires.readData(context.wire(position, builder.blockSize()), reader); context.bytes.readSkip(SPB_DATA_HEADER_SIZE);
return readWire(context.wire, len, reader);
} else { } else {
// In case of meta data, if we are found the "roll" meta, we returns // In case of meta data, if we are found the "roll" meta, we returns
// the next cycle (negative) // the next cycle (negative)
Expand All @@ -182,8 +213,7 @@ public long read(@NotNull ReadContext context, @NotNull ReadMarshallable reader)
if("roll".contentEquals(sb)) { if("roll".contentEquals(sb)) {
return -vi.int32(); return -vi.int32();
} else { } else {
// it it is meta-data and length is know, try a new read context.bytes.readPosition(position + len + SPB_DATA_HEADER_SIZE);
context.position(position + Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE);
return read(context, reader); return read(context, reader);
} }
} }
Expand All @@ -198,24 +228,34 @@ public long read(@NotNull ReadContext context, @NotNull ReadMarshallable reader)
* @return * @return
*/ */
@Override @Override
public long positionForIndex(long index) { public boolean moveToIndex(@NotNull ReadContext context, long index){
long position = readPosition(); long position = readPosition();
try { try {
for (long i = 0; i <= index; i++) { for (long i = 0; i <= index;) {
final int spbHeader = mappedFile.acquireByteStore(position).readVolatileInt(position); if(context.bytes.readRemaining() == 0 || position > context.bytes.safeLimit()) {
if (Wires.isData(spbHeader) && Wires.isKnownLength(spbHeader)) { mappedFile.acquireBytesForRead(position, context.bytes);
if (index == i) { }
return position;
} else { final int spbHeader = context.bytes.readVolatileInt(position);
position += Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE; if(Wires.isReady(spbHeader)) {
if(Wires.isData(spbHeader)) {
if (index == i) {
return true;
}

i++;
} }

context.bytes.readSkip(Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE);
} else {
return false;
} }
} }
} catch(IOException e) { } catch(IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }


return -1; return false;
} }


@Override @Override
Expand Down Expand Up @@ -288,6 +328,24 @@ protected void checkRemainingForAppend(long position, long size) {
} }
} }



//TODO move to wire
@ForceInline
static long readWire(@NotNull WireIn wireIn, int len, @NotNull ReadMarshallable dataConsumer) {
final Bytes<?> bytes = wireIn.bytes();
final long limit0 = bytes.readLimit();
final long limit = bytes.readPosition() + (long) len;
try {
bytes.readLimit(limit);
dataConsumer.readMarshallable(wireIn);
} finally {
bytes.readLimit(limit0);
bytes.readPosition(limit);
}

return bytes.readPosition();
}

//TODO move to wire //TODO move to wire
protected boolean acquireLock(BytesStore store, long position, int size) { protected boolean acquireLock(BytesStore store, long position, int size) {
return store.compareAndSwapInt(position, Wires.NOT_INITIALIZED, Wires.NOT_READY | size); return store.compareAndSwapInt(position, Wires.NOT_INITIALIZED, Wires.NOT_READY | size);
Expand Down

0 comments on commit 8cb0ba8

Please sign in to comment.