Skip to content

Commit

Permalink
CHRON-130 Create and Epoc, so something other than jan 1970 can be used
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Jan 7, 2016
1 parent 2df4111 commit 4deecb3
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 38 deletions.
2 changes: 1 addition & 1 deletion chronicle-queue/pom.xml
Expand Up @@ -49,7 +49,7 @@
<dependency> <dependency>
<groupId>net.openhft</groupId> <groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId> <artifactId>chronicle-bom</artifactId>
<version>1.10.41</version> <version>1.10.43-SNAPSHOT</version>
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
Expand Down
Expand Up @@ -80,10 +80,12 @@ public WireType wireType() {


/** /**
* @param cycle * @param cycle
* @param epoc an epoc offset as the number of number of milliseconds since January
* 1, 1970, 00:00:00 GMT
* @return * @return
* @throws IOException * @throws IOException
*/ */
protected abstract WireStore storeForCycle(long cycle) throws IOException; protected abstract WireStore storeForCycle(long cycle, final long epoc) throws IOException;


/** /**
* @param store * @param store
Expand Down
Expand Up @@ -154,6 +154,7 @@ public long cycle() {
public static class StoreAppender extends DefaultAppender<AbstractChronicleQueue> { public static class StoreAppender extends DefaultAppender<AbstractChronicleQueue> {


private final MappedBytes writeContext; private final MappedBytes writeContext;
private long epoc;
private long cycle; private long cycle;
private long index = -1; private long index = -1;
private WireStore store; private WireStore store;
Expand All @@ -170,7 +171,7 @@ public StoreAppender(@NotNull AbstractChronicleQueue queue) throws IOException {
if (this.cycle <= 0) if (this.cycle <= 0)
throw new IllegalArgumentException(); throw new IllegalArgumentException();


this.store = queue.storeForCycle(this.cycle); this.store = queue.storeForCycle(this.cycle, this.epoc);
this.index = this.store.lastIndex(); this.index = this.store.lastIndex();


final MappedFile mappedFile = store.mappedFile(); final MappedFile mappedFile = store.mappedFile();
Expand Down Expand Up @@ -230,7 +231,7 @@ private WireStore store() throws IOException {
} }


this.cycle = nextCycle; this.cycle = nextCycle;
this.store = queue.storeForCycle(this.cycle); this.store = queue.storeForCycle(this.cycle, epoc);
//this.store.acquireBytesAtWritePositionForWrite(this.writeContext.bytes); //this.store.acquireBytesAtWritePositionForWrite(this.writeContext.bytes);
} }


Expand All @@ -253,6 +254,7 @@ public static class StoreTailer implements ExcerptTailer {
private MappedBytes readContext; private MappedBytes readContext;


private long cycle; private long cycle;
private long epoc;
private long index; private long index;
private WireStore store; private WireStore store;


Expand Down Expand Up @@ -411,7 +413,7 @@ private StoreTailer cycle(long cycle) throws IOException {
} }
this.cycle = cycle; this.cycle = cycle;
this.index = -1; this.index = -1;
this.store = this.queue.storeForCycle(this.cycle); this.store = this.queue.storeForCycle(this.cycle, this.epoc);
final MappedFile mappedFile = store.mappedFile(); final MappedFile mappedFile = store.mappedFile();
this.readContext = new MappedBytes(mappedFile); this.readContext = new MappedBytes(mappedFile);


Expand Down
Expand Up @@ -36,6 +36,12 @@ public interface WireStore extends ReferenceCounted, Marshallable {
*/ */
long cycle(); long cycle();


/**
* @return an epoc offset as the number of number of milliseconds since January 1, 1970,
* 00:00:00 GMT
*/
long epoc();

/** /**
* @return the first readable position * @return the first readable position
*/ */
Expand Down
Expand Up @@ -15,25 +15,57 @@
*/ */
package net.openhft.chronicle.queue.impl; package net.openhft.chronicle.queue.impl;


import net.openhft.koloboke.collect.map.hash.HashLongObjMaps;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class WireStorePool { public class WireStorePool {
private final WireStoreSupplier supplier; private final WireStoreSupplier supplier;
private final Map<Long, WireStore> stores;
private class RollDetails {

private long cycle;
private long epoc;

public RollDetails(long cycle, long epoc) {
this.cycle = cycle;
this.epoc = epoc;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof RollDetails)) return false;

RollDetails rollDetails = (RollDetails) o;

if (cycle != rollDetails.cycle) return false;
return epoc == rollDetails.epoc;

}

@Override
public int hashCode() {
int result = (int) (cycle ^ (cycle >>> 32));
result = 31 * result + (int) (epoc ^ (epoc >>> 32));
return result;
}
}

private final Map<RollDetails, WireStore> stores;


public WireStorePool(@NotNull WireStoreSupplier supplier) { public WireStorePool(@NotNull WireStoreSupplier supplier) {
this.supplier = supplier; this.supplier = supplier;
this.stores = HashLongObjMaps.newMutableMap(); this.stores = new ConcurrentHashMap<>();
} }


public synchronized WireStore acquire(long cycle) throws IOException { public synchronized WireStore acquire(long cycle, final long epoc) throws IOException {
WireStore store = stores.get(cycle); final RollDetails rollDetails = new RollDetails(cycle, epoc);
if(store == null) { WireStore store = stores.get(rollDetails);
stores.put(cycle, store = this.supplier.apply(cycle)); if (store == null) {
stores.put(rollDetails, store = this.supplier.apply(cycle, epoc));
} else { } else {
store.reserve(); store.reserve();
} }
Expand All @@ -43,8 +75,8 @@ public synchronized WireStore acquire(long cycle) throws IOException {


public synchronized void release(WireStore store) { public synchronized void release(WireStore store) {
store.release(); store.release();
if(store.refCount() <= 0) { if (store.refCount() <= 0) {
stores.remove(store.cycle()); stores.remove(new RollDetails(store.cycle(), store.epoc()));
} }
} }


Expand Down
Expand Up @@ -15,8 +15,8 @@
*/ */
package net.openhft.chronicle.queue.impl; package net.openhft.chronicle.queue.impl;


import java.util.function.LongFunction; import java.util.function.BiFunction;


@FunctionalInterface @FunctionalInterface
public interface WireStoreSupplier extends LongFunction<WireStore>{ public interface WireStoreSupplier extends BiFunction<Long, Long, WireStore> {
} }
Expand Up @@ -54,9 +54,10 @@ protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws
this.cycle = builder.rollCycle(); this.cycle = builder.rollCycle();
this.dateCache = new RollDateCache(this.cycle); this.dateCache = new RollDateCache(this.cycle);
this.builder = builder; this.builder = builder;

this.pool = WireStorePool.withSupplier(this::newStore); this.pool = WireStorePool.withSupplier(this::newStore);
this.firstCycle = -1; this.firstCycle = -1;
this.pool.acquire(cycle()); storeForCycle(cycle(), builder.epoc());
} }


@NotNull @NotNull
Expand All @@ -72,8 +73,8 @@ public ExcerptTailer createTailer() throws IOException {
} }


@Override @Override
protected WireStore storeForCycle(long cycle) throws IOException { protected WireStore storeForCycle(long cycle, final long epoc) throws IOException {
return this.pool.acquire(cycle); return this.pool.acquire(cycle, epoc);
} }


@Override @Override
Expand Down Expand Up @@ -168,13 +169,12 @@ public WireType wireType() {
// //
// ************************************************************************* // *************************************************************************


protected WireStore newStore(final long cycle) { protected WireStore newStore(final long cycle, final long epoc) {


final String cycleFormat = this.dateCache.formatFor(cycle); final String cycleFormat = this.dateCache.formatFor(cycle);
final File cycleFile = new File(this.builder.path(), cycleFormat + ".chronicle"); final File cycleFile = new File(this.builder.path(), cycleFormat + ".chronicle");



final File parentFile = cycleFile.getParentFile();
File parentFile = cycleFile.getParentFile();
if (parentFile != null & !parentFile.exists()) { if (parentFile != null & !parentFile.exists()) {
parentFile.mkdirs(); parentFile.mkdirs();
} }
Expand All @@ -191,7 +191,7 @@ protected WireStore newStore(final long cycle) {


Function<MappedFile, WireStore> supplyStore = mappedFile -> new SingleChronicleQueueStore Function<MappedFile, WireStore> supplyStore = mappedFile -> new SingleChronicleQueueStore
(SingleChronicleQueue.this.builder.rollCycle(), SingleChronicleQueue.this (SingleChronicleQueue.this.builder.rollCycle(), SingleChronicleQueue.this
.builder.wireType(), mappedFile); .builder.wireType(), mappedFile, epoc);




if (cycleFile.exists()) { if (cycleFile.exists()) {
Expand Down
Expand Up @@ -33,6 +33,7 @@ public class SingleChronicleQueueBuilder implements ChronicleQueueBuilder {
private RollCycle rollCycle; private RollCycle rollCycle;


private int appendTimeout; private int appendTimeout;
private long epoc;


public SingleChronicleQueueBuilder(String path) { public SingleChronicleQueueBuilder(String path) {
this(new File(path)); this(new File(path));
Expand All @@ -44,6 +45,7 @@ public SingleChronicleQueueBuilder(File path) {
this.wireType = WireType.BINARY; this.wireType = WireType.BINARY;
this.appendTimeout = 10_000; // 10 seconds; this.appendTimeout = 10_000; // 10 seconds;
this.rollCycle = RollCycles.DAYS; this.rollCycle = RollCycles.DAYS;
this.epoc = 0;
} }


public File path() { public File path() {
Expand Down Expand Up @@ -82,6 +84,23 @@ public SingleChronicleQueueBuilder rollCycle(RollCycle rollCycle) {
return this; return this;
} }



/**
* sets epoc offset in milliseconds
*
* @param epoc sets an epoc offset as the number of number of milliseconds since January 1,
* 1970, 00:00:00 GMT
* @return {@code this}
*/
public SingleChronicleQueueBuilder epoc(long epoc) {
this.epoc = epoc;
return this;
}

public long epoc() {
return epoc;
}

public RollCycle rollCycle() { public RollCycle rollCycle() {
return this.rollCycle; return this.rollCycle;
} }
Expand Down Expand Up @@ -130,6 +149,6 @@ public static SingleChronicleQueueBuilder raw(File name) {


public static SingleChronicleQueueBuilder raw(String name) { public static SingleChronicleQueueBuilder raw(String name) {
return new SingleChronicleQueueBuilder(name) return new SingleChronicleQueueBuilder(name)
.wireType(WireType.RAW); .wireType(WireType.RAW);
} }
} }

0 comments on commit 4deecb3

Please sign in to comment.