From 4deecb3ba71bd94ef681f9d991f1b869f7f82e14 Mon Sep 17 00:00:00 2001 From: Rob Austin Date: Thu, 7 Jan 2016 18:00:04 +0000 Subject: [PATCH] CHRON-130 Create and Epoc, so something other than jan 1970 can be used --- chronicle-queue/pom.xml | 2 +- .../queue/impl/AbstractChronicleQueue.java | 4 +- .../chronicle/queue/impl/Excerpts.java | 8 ++- .../chronicle/queue/impl/WireStore.java | 6 ++ .../chronicle/queue/impl/WireStorePool.java | 50 +++++++++++--- .../queue/impl/WireStoreSupplier.java | 4 +- .../impl/single/SingleChronicleQueue.java | 14 ++-- .../single/SingleChronicleQueueBuilder.java | 21 +++++- .../single/SingleChronicleQueueStore.java | 69 +++++++++++++++---- 9 files changed, 140 insertions(+), 38 deletions(-) diff --git a/chronicle-queue/pom.xml b/chronicle-queue/pom.xml index 8ab97f9e32..39cf752492 100755 --- a/chronicle-queue/pom.xml +++ b/chronicle-queue/pom.xml @@ -49,7 +49,7 @@ net.openhft chronicle-bom - 1.10.41 + 1.10.43-SNAPSHOT pom import diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java index 8bed65e992..b36a5cc238 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java @@ -80,10 +80,12 @@ public WireType wireType() { /** * @param cycle + * @param epoc an epoc offset as the number of number of milliseconds since January + * 1, 1970, 00:00:00 GMT * @return * @throws IOException */ - protected abstract WireStore storeForCycle(long cycle) throws IOException; + protected abstract WireStore storeForCycle(long cycle, final long epoc) throws IOException; /** * @param store diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java index 280bfacd7a..cb197f128f 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java @@ -154,6 +154,7 @@ public long cycle() { public static class StoreAppender extends DefaultAppender { private final MappedBytes writeContext; + private long epoc; private long cycle; private long index = -1; private WireStore store; @@ -170,7 +171,7 @@ public StoreAppender(@NotNull AbstractChronicleQueue queue) throws IOException { if (this.cycle <= 0) throw new IllegalArgumentException(); - this.store = queue.storeForCycle(this.cycle); + this.store = queue.storeForCycle(this.cycle, this.epoc); this.index = this.store.lastIndex(); final MappedFile mappedFile = store.mappedFile(); @@ -230,7 +231,7 @@ private WireStore store() throws IOException { } this.cycle = nextCycle; - this.store = queue.storeForCycle(this.cycle); + this.store = queue.storeForCycle(this.cycle, epoc); //this.store.acquireBytesAtWritePositionForWrite(this.writeContext.bytes); } @@ -253,6 +254,7 @@ public static class StoreTailer implements ExcerptTailer { private MappedBytes readContext; private long cycle; + private long epoc; private long index; private WireStore store; @@ -411,7 +413,7 @@ private StoreTailer cycle(long cycle) throws IOException { } this.cycle = cycle; this.index = -1; - this.store = this.queue.storeForCycle(this.cycle); + this.store = this.queue.storeForCycle(this.cycle, this.epoc); final MappedFile mappedFile = store.mappedFile(); this.readContext = new MappedBytes(mappedFile); diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStore.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStore.java index 1ba8254243..f45170bdf7 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStore.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStore.java @@ -36,6 +36,12 @@ public interface WireStore extends ReferenceCounted, Marshallable { */ long cycle(); + /** + * @return an epoc offset as the number of number of milliseconds since January 1, 1970, + * 00:00:00 GMT + */ + long epoc(); + /** * @return the first readable position */ diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStorePool.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStorePool.java index e1ce49c6e3..222b8bc26a 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStorePool.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStorePool.java @@ -15,25 +15,57 @@ */ package net.openhft.chronicle.queue.impl; -import net.openhft.koloboke.collect.map.hash.HashLongObjMaps; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class WireStorePool { private final WireStoreSupplier supplier; - private final Map stores; + + private class RollDetails { + + private long cycle; + private long epoc; + + public RollDetails(long cycle, long epoc) { + this.cycle = cycle; + this.epoc = epoc; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof RollDetails)) return false; + + RollDetails rollDetails = (RollDetails) o; + + if (cycle != rollDetails.cycle) return false; + return epoc == rollDetails.epoc; + + } + + @Override + public int hashCode() { + int result = (int) (cycle ^ (cycle >>> 32)); + result = 31 * result + (int) (epoc ^ (epoc >>> 32)); + return result; + } + } + + private final Map stores; public WireStorePool(@NotNull WireStoreSupplier supplier) { this.supplier = supplier; - this.stores = HashLongObjMaps.newMutableMap(); + this.stores = new ConcurrentHashMap<>(); } - public synchronized WireStore acquire(long cycle) throws IOException { - WireStore store = stores.get(cycle); - if(store == null) { - stores.put(cycle, store = this.supplier.apply(cycle)); + public synchronized WireStore acquire(long cycle, final long epoc) throws IOException { + final RollDetails rollDetails = new RollDetails(cycle, epoc); + WireStore store = stores.get(rollDetails); + if (store == null) { + stores.put(rollDetails, store = this.supplier.apply(cycle, epoc)); } else { store.reserve(); } @@ -43,8 +75,8 @@ public synchronized WireStore acquire(long cycle) throws IOException { public synchronized void release(WireStore store) { store.release(); - if(store.refCount() <= 0) { - stores.remove(store.cycle()); + if (store.refCount() <= 0) { + stores.remove(new RollDetails(store.cycle(), store.epoc())); } } diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStoreSupplier.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStoreSupplier.java index 3c1db2b681..605976f89a 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStoreSupplier.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/WireStoreSupplier.java @@ -15,8 +15,8 @@ */ package net.openhft.chronicle.queue.impl; -import java.util.function.LongFunction; +import java.util.function.BiFunction; @FunctionalInterface -public interface WireStoreSupplier extends LongFunction{ +public interface WireStoreSupplier extends BiFunction { } diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java index 4feb235df0..170f16d4e7 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java @@ -54,9 +54,10 @@ protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws this.cycle = builder.rollCycle(); this.dateCache = new RollDateCache(this.cycle); this.builder = builder; + this.pool = WireStorePool.withSupplier(this::newStore); this.firstCycle = -1; - this.pool.acquire(cycle()); + storeForCycle(cycle(), builder.epoc()); } @NotNull @@ -72,8 +73,8 @@ public ExcerptTailer createTailer() throws IOException { } @Override - protected WireStore storeForCycle(long cycle) throws IOException { - return this.pool.acquire(cycle); + protected WireStore storeForCycle(long cycle, final long epoc) throws IOException { + return this.pool.acquire(cycle, epoc); } @Override @@ -168,13 +169,12 @@ public WireType wireType() { // // ************************************************************************* - protected WireStore newStore(final long cycle) { + protected WireStore newStore(final long cycle, final long epoc) { final String cycleFormat = this.dateCache.formatFor(cycle); final File cycleFile = new File(this.builder.path(), cycleFormat + ".chronicle"); - - File parentFile = cycleFile.getParentFile(); + final File parentFile = cycleFile.getParentFile(); if (parentFile != null & !parentFile.exists()) { parentFile.mkdirs(); } @@ -191,7 +191,7 @@ protected WireStore newStore(final long cycle) { Function supplyStore = mappedFile -> new SingleChronicleQueueStore (SingleChronicleQueue.this.builder.rollCycle(), SingleChronicleQueue.this - .builder.wireType(), mappedFile); + .builder.wireType(), mappedFile, epoc); if (cycleFile.exists()) { diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java index 45937d44c8..74ba8b4b24 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java @@ -33,6 +33,7 @@ public class SingleChronicleQueueBuilder implements ChronicleQueueBuilder { private RollCycle rollCycle; private int appendTimeout; + private long epoc; public SingleChronicleQueueBuilder(String path) { this(new File(path)); @@ -44,6 +45,7 @@ public SingleChronicleQueueBuilder(File path) { this.wireType = WireType.BINARY; this.appendTimeout = 10_000; // 10 seconds; this.rollCycle = RollCycles.DAYS; + this.epoc = 0; } public File path() { @@ -82,6 +84,23 @@ public SingleChronicleQueueBuilder rollCycle(RollCycle rollCycle) { return this; } + + /** + * sets epoc offset in milliseconds + * + * @param epoc sets an epoc offset as the number of number of milliseconds since January 1, + * 1970, 00:00:00 GMT + * @return {@code this} + */ + public SingleChronicleQueueBuilder epoc(long epoc) { + this.epoc = epoc; + return this; + } + + public long epoc() { + return epoc; + } + public RollCycle rollCycle() { return this.rollCycle; } @@ -130,6 +149,6 @@ public static SingleChronicleQueueBuilder raw(File name) { public static SingleChronicleQueueBuilder raw(String name) { return new SingleChronicleQueueBuilder(name) - .wireType(WireType.RAW); + .wireType(WireType.RAW); } } diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java index 5f935bf8a5..6e41130112 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java @@ -77,14 +77,23 @@ enum MetaDataField implements WireKey { */ SingleChronicleQueueStore() { this.wireType = WireType.BINARY; - this.roll = new Roll(null); + this.roll = new Roll(null, 0); } + /** + * @param rollCycle + * @param wireType + * @param mappedFile + * @param rollEpoc sets an epoc offset as the number of number of milliseconds since January + * 1, 1970, 00:00:00 GMT + */ SingleChronicleQueueStore(@Nullable RollCycle rollCycle, final WireType wireType, - @NotNull MappedFile mappedFile) { + @NotNull MappedFile mappedFile, + long rollEpoc) { + + this.roll = new Roll(rollCycle, rollEpoc); - this.roll = new Roll(rollCycle); this.resourceCleaner = null; this.builder = null; this.wireType = wireType; @@ -108,7 +117,16 @@ public long writePosition() { @Override public long cycle() { - return this.roll.getCycle(); + return this.roll.cycle(); + } + + /** + * @return an epoc offset as the number of number of milliseconds since January 1, 1970, + * 00:00:00 GMT + */ + @Override + public long epoc() { + return this.roll.epoc(); } @Override @@ -152,7 +170,7 @@ public boolean appendRollMeta(@NotNull MappedBytes context, long cycle) throws I (MappedBytes ctx, long position, int size, WriteMarshallable w) -> { // todo improve this line Wires.writeMeta(wireType.apply(context), w); - roll.setNextCycleMetaPosition(position); + roll.nextCycleMetaPosition(position); return WireConstants.NO_INDEX; }, marshallable @@ -203,7 +221,7 @@ public void install( if (created) { this.bounds.setWritePosition(length); this.bounds.setReadPosition(length); - this.roll.setCycle(cycle); + this.roll.cycle(cycle); } } @@ -929,10 +947,11 @@ public static String toScale() { // ************************************************************************* enum RollFields implements WireKey { - cycle, length, format, timeZone, nextCycle, nextCycleMetaPosition + cycle, length, format, timeZone, nextCycle, epoc, nextCycleMetaPosition } class Roll implements Marshallable { + private LongValue epoc; private int length; private String format; private ZoneId zoneId; @@ -940,11 +959,11 @@ class Roll implements Marshallable { private LongValue nextCycle; private LongValue nextCycleMetaPosition; - Roll(RollCycle rollCycle) { + Roll(RollCycle rollCycle, long rollEpoc) { this.length = rollCycle != null ? rollCycle.length() : -1; this.format = rollCycle != null ? rollCycle.format() : null; this.zoneId = rollCycle != null ? rollCycle.zone() : null; - + this.epoc = null; this.cycle = null; this.nextCycle = null; this.nextCycleMetaPosition = null; @@ -957,6 +976,7 @@ public void writeMarshallable(@NotNull WireOut wire) { .write(RollFields.format).text(format) .write(RollFields.timeZone).text(zoneId.getId()) .write(RollFields.nextCycle).int64forBinding(-1, nextCycle = wire.newLongReference()) + .write(RollFields.epoc).int64forBinding(-1, epoc = wire.newLongReference()) .write(RollFields.nextCycleMetaPosition).int64forBinding(-1, nextCycleMetaPosition = wire.newLongReference()); } @@ -967,28 +987,49 @@ public void readMarshallable(@NotNull WireIn wire) { .read(RollFields.format).text(this, (o, i) -> o.format = i) .read(RollFields.timeZone).text(this, (o, i) -> o.zoneId = ZoneId.of(i)) .read(RollFields.nextCycle).int64(this.nextCycle, this, (o, i) -> o.nextCycle = i) + .read(RollFields.epoc).int64(this.epoc, this, (o, i) -> o.epoc = i) .read(RollFields.nextCycleMetaPosition).int64(this.nextCycleMetaPosition, this, (o, i) -> o.nextCycleMetaPosition = i); } - public long getCycle() { + /** + * + * @return an epoc offset as the number of number of milliseconds since January + * 1, 1970, 00:00:00 GMT + */ + public long epoc() { + return this.epoc.getVolatileValue(); + } + + + /** + * @param epoc sets an epoc offset as the number of number of milliseconds since January 1, + * 1970, 00:00:00 GMT + * @return {@code this} + */ + public Roll epoc(long epoc) { + this.epoc.setOrderedValue(epoc); + return this; + } + + public long cycle() { return this.cycle.getVolatileValue(); } - public Roll setCycle(long rollCycle) { + public Roll cycle(long rollCycle) { this.cycle.setOrderedValue(rollCycle); return this; } - public Roll setNextCycleMetaPosition(long position) { + public Roll nextCycleMetaPosition(long position) { this.nextCycleMetaPosition.setOrderedValue(position); return this; } - public long getNextCycleMetaPosition() { + public long nextCycleMetaPosition() { return this.nextCycleMetaPosition.getVolatileValue(); } - public long getNextRollCycle() { + public long nextRollCycle() { return this.nextCycle.getVolatileValue(); }