Skip to content

Commit

Permalink
QUEUE-24 cleanup implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 23, 2015
1 parent 096ff2b commit 3f3ba85
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 757 deletions.
659 changes: 3 additions & 656 deletions chronicle-queue/README.md

Large diffs are not rendered by default.

Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.core.ReferenceCounted;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

public interface WireStore extends ReferenceCounted {

/**
*
* @return
*/
int cycle();

/**
*
* @return
*/
long dataPosition();

/**
*
* @return
*/
long writePosition();

/**
*
* @return
*/
long lastIndex();


/**
*
* @param cycle
* @return
* @throws IOException
*/
boolean appendRollMeta(int cycle) throws IOException;

/**
*
* @param writer
* @return
* @throws IOException
*/
long append(@NotNull WriteMarshallable writer) throws IOException;

/**
*
* @param position
* @param reader
* @return
* @throws IOException
*/
long read(long position, @NotNull ReadMarshallable reader) throws IOException;

/**
*
* @param index
* @return
*/
long positionForIndex(long index);
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollDateCache; import net.openhft.chronicle.queue.RollDateCache;
import net.openhft.chronicle.queue.impl.AbstractChronicleQueue; import net.openhft.chronicle.queue.impl.AbstractChronicleQueue;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.koloboke.collect.map.hash.HashIntObjMaps; import net.openhft.koloboke.collect.map.hash.HashIntObjMaps;


import java.io.File; import java.io.File;
Expand All @@ -31,7 +32,7 @@ class SingleChronicleQueue extends AbstractChronicleQueue {


private final SingleChronicleQueueBuilder builder; private final SingleChronicleQueueBuilder builder;
private final RollDateCache dateCache; private final RollDateCache dateCache;
private final Map<Integer, SingleChronicleQueueStore> stores; private final Map<Integer, SingleChronicleQueueWireStore> stores;
private int firstCycle; private int firstCycle;


protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws IOException { protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws IOException {
Expand Down Expand Up @@ -59,12 +60,12 @@ SingleChronicleQueueBuilder builder() {
return this.builder; return this.builder;
} }


synchronized SingleChronicleQueueStore storeForCycle(int cycle) throws IOException { synchronized WireStore storeForCycle(int cycle) throws IOException {
SingleChronicleQueueStore format = stores.get(cycle); SingleChronicleQueueWireStore format = stores.get(cycle);
if(null == format) { if(null == format) {
stores.put( stores.put(
cycle, cycle,
format = new SingleChronicleQueueStore( format = new SingleChronicleQueueWireStore(
builder, builder,
cycle, cycle,
this.dateCache.formatFor(cycle)).buildHeader() this.dateCache.formatFor(cycle)).buildHeader()
Expand All @@ -76,7 +77,7 @@ synchronized SingleChronicleQueueStore storeForCycle(int cycle) throws IOExcepti
return format; return format;
} }


synchronized void release(SingleChronicleQueueStore store) { synchronized void release(WireStore store) {
store.release(); store.release();
if(store.refCount() <= 0) { if(store.refCount() <= 0) {
stores.remove(store.cycle()); stores.remove(store.cycle());
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.wire.ReadMarshallable; import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.WireUtil; import net.openhft.chronicle.wire.WireUtil;
import net.openhft.chronicle.wire.WriteMarshallable; import net.openhft.chronicle.wire.WriteMarshallable;
Expand All @@ -35,7 +36,7 @@ static class Appender implements ExcerptAppender {


private int cycle; private int cycle;
private long index; private long index;
private SingleChronicleQueueStore store; private WireStore store;


Appender(SingleChronicleQueue queue) throws IOException { Appender(SingleChronicleQueue queue) throws IOException {
this.queue = queue; this.queue = queue;
Expand Down Expand Up @@ -84,7 +85,7 @@ static class Tailer implements ExcerptTailer {


private int cycle; private int cycle;
private long position; private long position;
private SingleChronicleQueueStore store; private WireStore store;


Tailer(SingleChronicleQueue queue) throws IOException { Tailer(SingleChronicleQueue queue) throws IOException {
this.queue = queue; this.queue = queue;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package net.openhft.chronicle.queue.impl.single; package net.openhft.chronicle.queue.impl.single;


import net.openhft.chronicle.core.OS; import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.values.IntValue;
import net.openhft.chronicle.core.values.LongValue; import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.wire.*; import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
Expand All @@ -28,7 +29,7 @@
class SingleChronicleQueueHeader implements Marshallable { class SingleChronicleQueueHeader implements Marshallable {


private enum Fields implements WireKey { private enum Fields implements WireKey {
type, type, version,
uuid, created, user, host, uuid, created, user, host,
indexCount, indexSpacing, indexCount, indexSpacing,
writePosition, dataPosition, index2Index, lastIndex, writePosition, dataPosition, index2Index, lastIndex,
Expand All @@ -39,12 +40,14 @@ private enum RollFields implements WireKey {
cycle, length, format, timeZone, nextCycle, nextCycleMetaPosition cycle, length, format, timeZone, nextCycle, nextCycleMetaPosition
} }


public static final String QUEUE_TYPE = "SCV4"; public static final String QUEUE_TYPE = "SingleChronicleQueue";
public static final String QUEUE_VERSION = "4.0";
public static final String CLASS_ALIAS = "Header"; public static final String CLASS_ALIAS = "Header";
public static final long PADDED_SIZE = 512; public static final long PADDED_SIZE = 512;


// fields which can be serialized/deserialized in the normal way. // fields which can be serialized/deserialized in the normal way.
private String type; private String type;
private String version;
private UUID uuid; private UUID uuid;
private ZonedDateTime created; private ZonedDateTime created;
private String user; private String user;
Expand All @@ -62,6 +65,7 @@ private enum RollFields implements WireKey {


SingleChronicleQueueHeader(SingleChronicleQueueBuilder builder) { SingleChronicleQueueHeader(SingleChronicleQueueBuilder builder) {
this.type = QUEUE_TYPE; this.type = QUEUE_TYPE;
this.version = QUEUE_VERSION;
this.uuid = UUID.randomUUID(); this.uuid = UUID.randomUUID();
this.created = ZonedDateTime.now(); this.created = ZonedDateTime.now();
this.user = System.getProperty("user.name"); this.user = System.getProperty("user.name");
Expand Down Expand Up @@ -98,9 +102,10 @@ int indexCount() {
@Override @Override
public void writeMarshallable(@NotNull WireOut out) { public void writeMarshallable(@NotNull WireOut out) {
out.write(Fields.type).text(type) out.write(Fields.type).text(type)
.write(Fields.version).text(version)
.write(Fields.uuid).uuid(uuid) .write(Fields.uuid).uuid(uuid)
.write(Fields.writePosition).int64forBinding(WireUtil.SPB_HEADER_BYTE_SIZE) .write(Fields.writePosition).int64forBinding(WireUtil.HEADER_OFFSET)
.write(Fields.dataPosition).int64forBinding(WireUtil.SPB_HEADER_BYTE_SIZE) .write(Fields.dataPosition).int64forBinding(WireUtil.HEADER_OFFSET)
.write(Fields.created).zonedDateTime(created) .write(Fields.created).zonedDateTime(created)
.write(Fields.user).text(user) .write(Fields.user).text(user)
.write(Fields.host).text(host) .write(Fields.host).text(host)
Expand All @@ -109,13 +114,12 @@ public void writeMarshallable(@NotNull WireOut out) {
.write(Fields.index2Index).int64forBinding(0L) .write(Fields.index2Index).int64forBinding(0L)
.write(Fields.lastIndex).int64forBinding(-1L) .write(Fields.lastIndex).int64forBinding(-1L)
.write(Fields.roll).marshallable(roll); .write(Fields.roll).marshallable(roll);

//out.addPadding((int) (PADDED_SIZE - out.bytes().writePosition()));
} }


@Override @Override
public void readMarshallable(@NotNull WireIn in) { public void readMarshallable(@NotNull WireIn in) {
in.read(Fields.type).text(this, (o, i) -> o.type = i) in.read(Fields.type).text(this, (o, i) -> o.type = i)
.read(Fields.version).text(this, (o, i) -> o.version = i)
.read(Fields.uuid).uuid(this, (o, i) -> o.uuid = i) .read(Fields.uuid).uuid(this, (o, i) -> o.uuid = i)
.read(Fields.writePosition).int64(this.writePosition, this, (o, i) -> o.writePosition = i) .read(Fields.writePosition).int64(this.writePosition, this, (o, i) -> o.writePosition = i)
.read(Fields.dataPosition).int64(this.dataPosition, this, (o, i) -> o.dataPosition = i) .read(Fields.dataPosition).int64(this.dataPosition, this, (o, i) -> o.dataPosition = i)
Expand Down Expand Up @@ -178,7 +182,7 @@ public int getNextRollCycle() {
} }


public boolean casNextRollCycle(int rollCycle) { public boolean casNextRollCycle(int rollCycle) {
return this.roll.nextCycle.compareAndSwapValue(-1L, rollCycle); return this.roll.nextCycle.compareAndSwapValue(-1, rollCycle);
} }


private class Roll implements Marshallable { private class Roll implements Marshallable {
Expand All @@ -187,9 +191,8 @@ private class Roll implements Marshallable {
private String format; private String format;
private ZoneId zoneId; private ZoneId zoneId;


//TODO: it appears there is a problem with IntValue (LongValue as workaround) private IntValue cycle;
private LongValue cycle; private IntValue nextCycle;
private LongValue nextCycle;


// LongValue is right here // LongValue is right here
private LongValue nextCycleMetaPosition; private LongValue nextCycleMetaPosition;
Expand All @@ -206,21 +209,21 @@ private class Roll implements Marshallable {


@Override @Override
public void writeMarshallable(@NotNull WireOut out) { public void writeMarshallable(@NotNull WireOut out) {
out.write(RollFields.cycle).int64forBinding(-1) out.write(RollFields.cycle).int32forBinding(-1)
.write(RollFields.length).int32(length) .write(RollFields.length).int32(length)
.write(RollFields.format).text(format) .write(RollFields.format).text(format)
.write(RollFields.timeZone).text(zoneId.getId()) .write(RollFields.timeZone).text(zoneId.getId())
.write(RollFields.nextCycle).int64forBinding(-1) .write(RollFields.nextCycle).int32forBinding(-1)
.write(RollFields.nextCycleMetaPosition).int64forBinding(-1); .write(RollFields.nextCycleMetaPosition).int64forBinding(-1);
} }


@Override @Override
public void readMarshallable(@NotNull WireIn in) { public void readMarshallable(@NotNull WireIn in) {
in.read(RollFields.cycle).int64(this.cycle, this, (o, i) -> o.cycle = i) in.read(RollFields.cycle).int32(this.cycle, this, (o, i) -> o.cycle = i)
.read(RollFields.length).int32(this, (o, i) -> o.length = i) .read(RollFields.length).int32(this, (o, i) -> o.length = i)
.read(RollFields.format).text(this, (o, i) -> o.format = i) .read(RollFields.format).text(this, (o, i) -> o.format = i)
.read(RollFields.timeZone).text(this, (o, i) -> o.zoneId = ZoneId.of(i)) .read(RollFields.timeZone).text(this, (o, i) -> o.zoneId = ZoneId.of(i))
.read(RollFields.nextCycle).int64(this.nextCycle, this, (o, i) -> o.nextCycle = i) .read(RollFields.nextCycle).int32(this.nextCycle, this, (o, i) -> o.nextCycle = i)
.read(RollFields.nextCycleMetaPosition).int64(this.nextCycleMetaPosition, this, (o, i) -> o.nextCycleMetaPosition = i); .read(RollFields.nextCycleMetaPosition).int64(this.nextCycleMetaPosition, this, (o, i) -> o.nextCycleMetaPosition = i);
} }
} }
Expand Down
Loading

0 comments on commit 3f3ba85

Please sign in to comment.