Skip to content

Commit

Permalink
QUEUE-24 code tidy and initial code for index based access
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 20, 2015
1 parent e46e810 commit d761df4
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 161 deletions.
Expand Up @@ -20,6 +20,7 @@
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.WireUtil;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,8 +120,20 @@ public boolean readDocument(ReadMarshallable reader) throws IOException {
}

@Override
public boolean index(long l) throws IOException {
throw new UnsupportedOperationException();
public boolean index(long index) throws IOException {
if(this.store == null) {
cycle(queue.lastCycle());
this.position = this.store.dataPosition();
}

long idxpos = this.store.positionForIndex(index);
if(idxpos != WireUtil.NO_INDEX) {
this.position = idxpos;

return true;
}

return false;
}

@Override
Expand Down
Expand Up @@ -78,15 +78,19 @@ private enum RollFields implements WireKey {
}

LongValue writePosition() {
return writePosition;
return this.writePosition;
}

LongValue index2Index() {
return index2Index;
return this.index2Index;
}

LongValue lastIndex() {
return lastIndex;
return this.lastIndex;
}

int indexCount() {
return this.indexCount;
}

@Override
Expand Down
Expand Up @@ -30,10 +30,7 @@
import static net.openhft.chronicle.wire.WireUtil.*;

/**
* Implementation of ChronicleQueueFormat based on a single file.
*
* TODO:
* - rolling
* - indexing
*/
class SingleChronicleQueueStore {
Expand All @@ -56,9 +53,9 @@ enum MetaDataField implements WireKey {
private final MappedFile mappedFile;
private final BytesStore bytesStore;
private final SingleChronicleQueueHeader header;
private final ThreadLocal<Wire> wireInCache;
private final ThreadLocal<Wire> wireOutCache;
private final ThreadLocal<Long[]> positionsCache;
private final WirePool wireInPool;
private final WirePool wireOutPool;
private final ThreadLocal<WirePosition> positionPool;

/**
*
Expand All @@ -82,10 +79,11 @@ enum MetaDataField implements WireKey {

this.mappedFile = MappedFile.mappedFile(this.file, this.builder.blockSize());
this.bytesStore = mappedFile.acquireByteStore(SPB_HEADER_BYTE);
this.wireInPool = new WirePool(bytesStore::bytesForRead, wireSupplier);
this.wireOutPool = new WirePool(bytesStore::bytesForWrite, wireSupplier);
this.positionPool = ThreadLocal.withInitial(() -> new WirePosition());

this.header = new SingleChronicleQueueHeader(this.builder);
this.wireInCache = wireCache(bytesStore::bytesForRead, wireSupplier);
this.wireOutCache = wireCache(bytesStore::bytesForWrite, wireSupplier);
this.positionsCache = ThreadLocal.withInitial(() -> new Long[] { 0L, 0L} );
}

long dataPosition() {
Expand All @@ -107,15 +105,14 @@ long lastIndex() {

boolean appendRollMeta(int cycle) throws IOException {
if(header.casNextRollCycle(cycle)) {
// TODO: avoid new long[] { 0, 0 }
final Long positions[] = append(
positionsCache.get(),
final WirePosition position = append(
positionPool.get(),
true,
w -> w.write(MetaDataField.roll).int32(cycle)
);


header.setNextCycleMetaPosition(positions[0]);
header.setNextCycleMetaPosition(position.start);

return true;
}
Expand All @@ -130,10 +127,9 @@ boolean appendRollMeta(int cycle) throws IOException {
* @throws IOException
*/
long append(@NotNull WriteMarshallable writer) throws IOException {
// TODO: avoid new long[] { 0, 0 }
final Long positions[] = append(positionsCache.get(), false, writer);
final WirePosition position = append(positionPool.get(), false, writer);

header.setWritePosition(positions[1]);
header.setWritePosition(position.end);
return header.incrementLastIndex();
}

Expand All @@ -150,12 +146,12 @@ long read(long position, @NotNull ReadMarshallable reader) throws IOException {
}

if(Wires.isData(spbHeader)) {
return WireUtil.readDataAt(wireInCache.get(), position, reader);
return WireUtil.readData(wireInPool.acquireForReadAt(position), reader);
} else if (WireUtil.isKnownLength(spbHeader)) {
// In case of meta data, if we are found the position at which we have
// the roll meta data, we returns the next cycle (negative)
// In case of meta data, if we are found the "roll" meta, we returns
// the next cycle (negative)
final StringBuilder sb = WireUtil.SBP.acquireStringBuilder();
final ValueIn vi = wireInAt(position + 4).read(sb);
final ValueIn vi = wireInPool.acquireForReadAt(position + 4).read(sb);

if("roll".contentEquals(sb)) {
return -vi.int32();
Expand All @@ -168,6 +164,27 @@ long read(long position, @NotNull ReadMarshallable reader) throws IOException {
return WireUtil.NO_DATA;
}

/**
*
* @param index
* @return
*/
long positionForIndex(long index) {
long position = dataPosition();
for(long i = 0; i <= index; i++) {
final int spbHeader = bytesStore.readVolatileInt(position);
if(WireUtil.isData(spbHeader) && WireUtil.isKnownLength(spbHeader)) {
if(index == i) {
return position;
} else {
position += Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE;
}
}
}

return -1;
}

/**
* Check if there is room for append.
*
Expand All @@ -187,18 +204,16 @@ protected void checkRemainingForAppend() {
*/
protected SingleChronicleQueueStore buildHeader() throws IOException {
if(bytesStore.compareAndSwapLong(SPB_HEADER_BYTE, SPB_HEADER_UNSET, SPB_HEADER_BUILDING)) {
writeMetaAt(
wireOut(bytesStore.bytesForWrite()),
SPB_HEADER_BYTE + SPB_HEADER_BYTE_SIZE,
writeMeta(
wireOutPool.acquireForWriteAt(SPB_HEADER_BYTE + SPB_HEADER_BYTE_SIZE),
w -> w.write(MetaDataField.header).typedMarshallable(header)
);

// Needed because header.dataPosition, header.writePosition are initially
// null and initialized when needed. It may be better to initialize
// them upon header instantiation (?)
long readPosition = readMetaAt(
wireIn(bytesStore.bytesForRead()),
SPB_HEADER_BYTE + SPB_HEADER_BYTE_SIZE,
long readPosition = readMeta(
wireInPool.acquireForReadAt(SPB_HEADER_BYTE + SPB_HEADER_BYTE_SIZE),
w -> w.read().marshallable(header)
);

Expand All @@ -217,9 +232,8 @@ protected SingleChronicleQueueStore buildHeader() throws IOException {
} else {
waitForTheHeaderToBeBuilt();

readMetaAt(
wireIn(bytesStore.bytesForRead()),
SPB_HEADER_BYTE + SPB_HEADER_BYTE_SIZE,
readMeta(
wireInPool.acquireForReadAt(SPB_HEADER_BYTE + SPB_HEADER_BYTE_SIZE),
w -> w.read().marshallable(header)
);
}
Expand Down Expand Up @@ -257,20 +271,20 @@ protected void waitForTheHeaderToBeBuilt() throws IOException {
* @return
* @throws IOException
*/
protected Long[] append(Long[] positions, boolean meta, @NotNull WriteMarshallable writer) throws IOException {
protected WirePosition append(WirePosition position, boolean meta, @NotNull WriteMarshallable writer) throws IOException {
checkRemainingForAppend();

final int delay = builder.appendWaitDelay();
long lastWritePosition = header.getWritePosition();

for (int i = builder.appendWaitLoops(); i >= 0; i--) {
if(bytesStore.compareAndSwapInt(lastWritePosition, WireUtil.FREE, WireUtil.BUILDING)) {
positions[0] = lastWritePosition;
positions[1] = !meta
? WireUtil.writeDataAt(wireOutCache.get(), lastWritePosition, writer)
: WireUtil.writeMetaAt(wireOutCache.get(), lastWritePosition, writer);
position.start = lastWritePosition;
position.end = !meta
? WireUtil.writeData(wireOutPool.acquireForWriteAt(lastWritePosition), writer)
: WireUtil.writeMeta(wireOutPool.acquireForWriteAt(lastWritePosition), writer);

return positions;
return position;
} else {
int spbHeader = bytesStore.readInt(lastWritePosition);
if(WireUtil.isKnownLength(spbHeader)) {
Expand All @@ -286,43 +300,4 @@ protected Long[] append(Long[] positions, boolean meta, @NotNull WriteMarshallab

throw new AssertionError("Timeout waiting to append");
}

// *************************************************************************
// Wire Helpers
//
// TODO: cleanup once interface has been stabilized
//
// *************************************************************************

protected WireOut wireOut(@NotNull Bytes bytes) throws IOException {
return this.wireSupplier.apply(bytes);
}
protected WireOut wireOutAt(@NotNull Bytes bytes, long position) throws IOException {
bytes.writePosition(position);
return this.wireSupplier.apply(bytes);
}

protected WireOut wireOut(@NotNull MappedFile file, long offset) throws IOException {
return wireOut(file.acquireBytesForWrite(offset));
}

protected WireIn wireIn(@NotNull Bytes bytes) throws IOException {
return this.wireSupplier.apply(bytes);
}

protected WireIn wireInAt(@NotNull Bytes bytes, long offset) throws IOException {
bytes.readPosition(offset);
return this.wireSupplier.apply(bytes);
}

protected WireIn wireInAt(long offset) throws IOException {
WireIn wi = wireInCache.get();
wi.bytes().readPosition(offset);

return wi;
}

protected WireIn wireIn(@NotNull MappedFile file, long offset) throws IOException {
return wireIn(file.acquireBytesForRead(offset));
}
}
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.annotation.ForceInline;
import org.jetbrains.annotations.NotNull;

import java.util.function.Function;
import java.util.function.Supplier;

public class WirePool {
private final ThreadLocal<Wire> pool;
private final Supplier<Bytes> bytesSupplier;
private final Function<Bytes, Wire> wireSupplier;

public WirePool(
@NotNull final Supplier<Bytes> bytesSupplier,
@NotNull final Function<Bytes, Wire> wireSupplier) {

this.pool = new ThreadLocal();
this.bytesSupplier = bytesSupplier;
this.wireSupplier= wireSupplier;
}

@ForceInline
public WireIn acquireForReadAt(long position) {
Wire wire = pool.get();
if (wire == null) {
pool.set(wire = wireSupplier.apply(bytesSupplier.get()));
}

wire.bytes().readPosition(position);
return wire;
}

@ForceInline
public WireOut acquireForWriteAt(long position) {
Wire wire = pool.get();
if (wire == null) {
pool.set(wire = wireSupplier.apply(bytesSupplier.get()));
}

wire.bytes().writePosition(position);
return wire;
}
}

0 comments on commit d761df4

Please sign in to comment.