Skip to content

Commit

Permalink
QUEUE-24 add cycle param for random index lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 21, 2015
1 parent 4e93413 commit 096ff2b
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 98 deletions.
Expand Up @@ -15,9 +15,10 @@
*/ */
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.wire.Wire;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import java.io.IOException;

/** /**
* The main data container of a {@link ChronicleQueue}, an extended version of {@link ExcerptTailer} which also facilitates * The main data container of a {@link ChronicleQueue}, an extended version of {@link ExcerptTailer} which also facilitates
* random access. * random access.
Expand Down Expand Up @@ -46,10 +47,19 @@ public interface Excerpt extends ExcerptTailer {
/** /**
* Randomly select an Excerpt. * Randomly select an Excerpt.
* *
* @param l index to look up * @param index index to look up
* @return true if this is a valid entries and not padding.
*/
boolean index(long index) throws IOException;

/**
* Randomly select an Excerpt.
*
* @param cycle cycle
* @param index index to look up
* @return true if this is a valid entries and not padding. * @return true if this is a valid entries and not padding.
*/ */
boolean index(long l); boolean index(int cycle, long index) throws IOException;


/** /**
* Replay from the lower. * Replay from the lower.
Expand Down
Expand Up @@ -37,10 +37,19 @@ public interface ExcerptTailer extends ExcerptCommon {
/** /**
* Randomly select an Excerpt. * Randomly select an Excerpt.
* *
* @param l index to look up * @param index index to look up
* @return true if this is a valid entries and not padding. * @return true if this is a valid entries and not padding.
*/ */
boolean index(long l) throws IOException;; boolean index(long index) throws IOException;

/**
* Randomly select an Excerpt.
*
* @param cycle cycle
* @param index index to look up
* @return true if this is a valid entries and not padding.
*/
boolean index(int cycle, long index) throws IOException;


/** /**
* Replay from the lower. * Replay from the lower.
Expand Down
Expand Up @@ -32,6 +32,7 @@ class SingleChronicleQueue extends AbstractChronicleQueue {
private final SingleChronicleQueueBuilder builder; private final SingleChronicleQueueBuilder builder;
private final RollDateCache dateCache; private final RollDateCache dateCache;
private final Map<Integer, SingleChronicleQueueStore> stores; private final Map<Integer, SingleChronicleQueueStore> stores;
private int firstCycle;


protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws IOException { protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws IOException {
this.dateCache = new RollDateCache( this.dateCache = new RollDateCache(
Expand All @@ -41,6 +42,7 @@ protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws


this.builder = builder; this.builder = builder;
this.stores = HashIntObjMaps.newMutableMap(); this.stores = HashIntObjMaps.newMutableMap();
this.firstCycle = -1;
} }


@Override @Override
Expand All @@ -67,17 +69,30 @@ synchronized SingleChronicleQueueStore storeForCycle(int cycle) throws IOExcepti
cycle, cycle,
this.dateCache.formatFor(cycle)).buildHeader() this.dateCache.formatFor(cycle)).buildHeader()
); );
} else {
format.reserve();
} }


return format; return format;
} }


synchronized void release(SingleChronicleQueueStore store) {
store.release();
if(store.refCount() <= 0) {
stores.remove(store.cycle());
}
}

int cycle() { int cycle() {
return (int) (System.currentTimeMillis() / builder.rollCycleLength()); return (int) (System.currentTimeMillis() / builder.rollCycleLength());
} }


//TODO: reduce garbage //TODO: reduce garbage
int firstCycle() { synchronized int firstCycle() {
if(-1 != firstCycle ) {
return firstCycle;
}

final String basePath = builder.path().getAbsolutePath(); final String basePath = builder.path().getAbsolutePath();
final File[] files = builder.path().listFiles(); final File[] files = builder.path().listFiles();


Expand All @@ -103,10 +118,10 @@ int firstCycle() {
} }
} }


return (int)firstDate; firstCycle = (int)firstDate;
} }


return -1; return firstCycle;
} }


//TODO: reduce garbage //TODO: reduce garbage
Expand Down
Expand Up @@ -22,13 +22,10 @@
import net.openhft.chronicle.wire.ReadMarshallable; import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.WireUtil; import net.openhft.chronicle.wire.WireUtil;
import net.openhft.chronicle.wire.WriteMarshallable; import net.openhft.chronicle.wire.WriteMarshallable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.IOException; import java.io.IOException;


public class SingleChronicleQueueExcerpts { public class SingleChronicleQueueExcerpts {
private static final Logger LOGGER = LoggerFactory.getLogger(SingleChronicleQueueExcerpts.class);


/** /**
* Appender * Appender
Expand All @@ -55,6 +52,7 @@ public long writeDocument(WriteMarshallable writer) throws IOException {
int nextCycle = queue.cycle(); int nextCycle = queue.cycle();
if(this.store != null) { if(this.store != null) {
this.store.appendRollMeta(nextCycle); this.store.appendRollMeta(nextCycle);
this.queue.release(this.store);
} }


this.cycle = nextCycle; this.cycle = nextCycle;
Expand Down Expand Up @@ -136,6 +134,14 @@ public boolean index(long index) throws IOException {
return false; return false;
} }


@Override
public boolean index(int cycle, long index) throws IOException {
cycle(cycle);
this.position = this.store.dataPosition();

return index(index);
}

@Override @Override
public ExcerptTailer toStart() throws IOException { public ExcerptTailer toStart() throws IOException {
cycle(queue.firstCycle()); cycle(queue.firstCycle());
Expand All @@ -158,8 +164,14 @@ public ChronicleQueue queue() {
} }


private void cycle(int cycle) throws IOException { private void cycle(int cycle) throws IOException {
this.cycle = cycle; if(this.cycle != cycle) {
this.store = queue.storeForCycle(this.cycle); if(null != this.store) {
this.queue.release(this.store);
}

this.cycle = cycle;
this.store = this.queue.storeForCycle(this.cycle);
}
} }
} }
} }
Expand Up @@ -19,6 +19,8 @@
import net.openhft.chronicle.bytes.BytesStore; import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.bytes.MappedFile; import net.openhft.chronicle.bytes.MappedFile;
import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.ReferenceCounted;
import net.openhft.chronicle.core.ReferenceCounter;
import net.openhft.chronicle.core.pool.ClassAliasPool; import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.wire.*; import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
Expand All @@ -33,7 +35,7 @@
* TODO: * TODO:
* - indexing * - indexing
*/ */
class SingleChronicleQueueStore { class SingleChronicleQueueStore implements ReferenceCounted {
static { static {
ClassAliasPool.CLASS_ALIASES.addAlias( ClassAliasPool.CLASS_ALIASES.addAlias(
SingleChronicleQueueHeader.class, SingleChronicleQueueHeader.class,
Expand All @@ -55,6 +57,7 @@ enum MetaDataField implements WireKey {
private final SingleChronicleQueueHeader header; private final SingleChronicleQueueHeader header;
private final WirePool wirePool; private final WirePool wirePool;
private final ThreadLocal<WireBounds> positionPool; private final ThreadLocal<WireBounds> positionPool;
private final ReferenceCounter refCount;


/** /**
* *
Expand All @@ -80,6 +83,7 @@ enum MetaDataField implements WireKey {
this.bytesStore = mappedFile.acquireByteStore(SPB_HEADER_BYTE); this.bytesStore = mappedFile.acquireByteStore(SPB_HEADER_BYTE);
this.wirePool = new WirePool(bytesStore, wireSupplier); this.wirePool = new WirePool(bytesStore, wireSupplier);
this.positionPool = ThreadLocal.withInitial(() -> new WireBounds()); this.positionPool = ThreadLocal.withInitial(() -> new WireBounds());
this.refCount = ReferenceCounter.onReleased(this::performRelease);


this.header = new SingleChronicleQueueHeader(this.builder); this.header = new SingleChronicleQueueHeader(this.builder);
} }
Expand Down Expand Up @@ -299,4 +303,23 @@ protected WireBounds append(WireBounds bounds, boolean meta, @NotNull WriteMarsh


throw new AssertionError("Timeout waiting to append"); throw new AssertionError("Timeout waiting to append");
} }

private synchronized void performRelease() {
this.mappedFile.close();
}

@Override
public void reserve() throws IllegalStateException {
this.refCount.reserve();
}

@Override
public void release() throws IllegalStateException {
this.refCount.release();
}

@Override
public long refCount() {
return this.refCount.get();
}
} }
Expand Up @@ -72,23 +72,7 @@ public static <T extends ReadMarshallable> long readData(


// We assume that check on data readiness and type has been done by the // We assume that check on data readiness and type has been done by the
// caller // caller
WireInternal.rawReadData(wireIn, reader); return rawRead(wireIn, reader);

return wireIn.bytes().readPosition();
}

public static <T extends ReadMarshallable> long readDataAt(
@NotNull WireIn wireIn,
long position,
@NotNull T reader) {

final Bytes rb = wireIn.bytes().readPosition(position);
boolean result = WireInternal.readData(wireIn, null, reader);
if (result) {
return rb.readPosition();
}

return NO_DATA;
} }


@ForceInline @ForceInline
Expand All @@ -101,18 +85,6 @@ public static <T extends WriteMarshallable> long writeData(
return wireOut.bytes().writePosition(); return wireOut.bytes().writePosition();
} }



public static <T extends WriteMarshallable> long writeDataAt(
@NotNull WireOut wireOut,
long position,
@NotNull T writer) {

final Bytes wb = wireOut.bytes().writePosition(position);
WireInternal.writeData(wireOut, false, false, writer);

return wb.writePosition();
}

@ForceInline @ForceInline
public static <T extends WriteMarshallable> long writeMeta( public static <T extends WriteMarshallable> long writeMeta(
@NotNull WireOut wireOut, @NotNull WireOut wireOut,
Expand All @@ -123,41 +95,35 @@ public static <T extends WriteMarshallable> long writeMeta(
return wireOut.bytes().writePosition(); return wireOut.bytes().writePosition();
} }


public static <T extends WriteMarshallable> long writeMetaAt(
@NotNull WireOut wireOut,
long position,
@NotNull T writer) {

final Bytes wb = wireOut.bytes().writePosition(position);
WireInternal.writeData(wireOut, true, false, writer);

return wb.writePosition();
}

@ForceInline @ForceInline
public static <T extends ReadMarshallable> long readMeta( public static <T extends ReadMarshallable> long readMeta(
@NotNull WireIn wireIn, @NotNull WireIn wireIn,
@NotNull T reader) { @NotNull T reader) {


boolean result = WireInternal.readData(wireIn, reader, null); // We assume that check on meta-data readiness and type has been done by
if (result) { // the caller
return wireIn.bytes().readPosition(); return rawRead(wireIn, reader);
}

return NO_DATA;
} }


public static <T extends ReadMarshallable> long readMetaAt( @ForceInline
@NotNull WireIn wireIn, static long rawRead(@NotNull WireIn wireIn, @NotNull ReadMarshallable dataConsumer) {
long position,
@NotNull T reader) { final Bytes<?> bytes = wireIn.bytes();

final int header = bytes.readVolatileInt(bytes.readPosition());
final Bytes rb = wireIn.bytes().readPosition(position); final int len = Wires.lengthOf(header);
boolean result = WireInternal.readData(wireIn, reader, null);
if(result) { bytes.readSkip(4);
return wireIn.bytes().readPosition();
final long limit0 = bytes.readLimit();
final long limit = bytes.readPosition() + (long) len;
try {
bytes.readLimit(limit);
dataConsumer.readMarshallable(wireIn);
} finally {
bytes.readLimit(limit0);
bytes.readPosition(limit);
} }


return NO_DATA; return bytes.readPosition();
} }
} }

0 comments on commit 096ff2b

Please sign in to comment.