Skip to content

Commit

Permalink
Most tests passing.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Mar 7, 2016
1 parent ff7e885 commit 670a82c
Show file tree
Hide file tree
Showing 30 changed files with 2,289 additions and 1,347 deletions.
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
~ along with this program. If not, see <http://www.gnu.org/licenses />. ~ along with this program. If not, see <http://www.gnu.org/licenses />.
--> -->


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<additionalparam>-Xdoclint:none</additionalparam> <additionalparam>-Xdoclint:none</additionalparam>
Expand All @@ -28,7 +29,7 @@
</parent> </parent>


<artifactId>chronicle-queue</artifactId> <artifactId>chronicle-queue</artifactId>
<version>4.0.10-SNAPSHOT</version> <version>4.1.0-SNAPSHOT</version>
<packaging>bundle</packaging> <packaging>bundle</packaging>
<name>OpenHFT/chronicle-queue</name> <name>OpenHFT/chronicle-queue</name>


Expand All @@ -47,7 +48,7 @@
<dependency> <dependency>
<groupId>net.openhft</groupId> <groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId> <artifactId>chronicle-bom</artifactId>
<version>1.11.15</version> <version>1.11.17-SNAPSHOT</version>
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
Expand Down
38 changes: 22 additions & 16 deletions src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java
100644 → 100755
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/ */
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.wire.WireType; import net.openhft.chronicle.wire.WireType;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import java.io.Closeable;
import java.io.File; import java.io.File;


/** /**
Expand Down Expand Up @@ -56,36 +56,31 @@
*/ */
public interface ChronicleQueue extends Closeable { public interface ChronicleQueue extends Closeable {
/** /**
* An Excerpt can be used access entries randomly and optionally change them. * @return a new ExcerptTailer to read sequentially.
*
* @return Excerpt
*/
@NotNull
Excerpt createExcerpt();

/**
* A Tailer can be used to read sequentially from the lower of a given position.
*
* @return ExcerptTailer
*/ */
@NotNull @NotNull
ExcerptTailer createTailer(); ExcerptTailer createTailer();


/** /**
* An Appender can be used to writeBytes new excerpts sequentially to the upper. * An Appender can be used to writeBytes new excerpts sequentially to the upper.
* *
* @return ExcerptAppender * @return A thread local Appender for writing new entries to the end.
*/ */
@NotNull @NotNull
ExcerptAppender createAppender(); ExcerptAppender createAppender();


/** /**
* @return the lowest valid index available, or sequenceNumber=0 if none are found * @return the lowest valid index available, or Long.MAX_VALUE if none are found
*/ */
long firstIndex(); long firstIndex();


/** /**
* @return the highest valid index immediately available. Or -1 if none available. * @return the first cycle number found, or Integer.MAX_VALUE is none found.
*/
int firstCycle();

/**
* @return the index one more than the highest valid index immediately available. Or Long.MIN_VALUE if none available.
* *
* The lowest 40bits of the index refer to the sequence number with the cycle, giving a maximum * The lowest 40bits of the index refer to the sequence number with the cycle, giving a maximum
* of 1099511627776 excerpt per cycle. Each cycle has its own file. Each file holds its own * of 1099511627776 excerpt per cycle. Each cycle has its own file. Each file holds its own
Expand All @@ -96,7 +91,12 @@ public interface ChronicleQueue extends Closeable {
long lastIndex(); long lastIndex();


/** /**
* @return the type of wire used, for example TEXT_WIRE or BINARY WIRE * @return the lastCycle available or Integer.MIN_VALUE if none is found.
*/
int lastCycle();

/**
* @return the type of wire used, for example WireTypes.TEXT or WireTypes.BINARY
*/ */
@NotNull @NotNull
WireType wireType(); WireType wireType();
Expand All @@ -112,4 +112,10 @@ public interface ChronicleQueue extends Closeable {
@NotNull @NotNull
File path(); File path();


/**
* Dump a Queue in YAML format.
*
* @return the contents of the Queue as YAML.
*/
String dump();
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,12 +15,80 @@
*/ */
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.bytes.BytesRingBufferStats;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.wire.WireType;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.util.function.Consumer;


/** /**
* @author Rob Austin. * @author Rob Austin.
*/ */
public interface ChronicleQueueBuilder extends Cloneable { public interface ChronicleQueueBuilder<B extends ChronicleQueueBuilder, Q extends ChronicleQueue> extends Cloneable {
@NotNull
Q build();

@NotNull
B onRingBufferStats(@NotNull Consumer<BytesRingBufferStats> onRingBufferStats);

Consumer<BytesRingBufferStats> onRingBufferStats();

@NotNull
File path();

@NotNull
B blockSize(int blockSize);

long blockSize();

@NotNull
B wireType(@NotNull WireType wireType);

@NotNull
WireType wireType();

@NotNull
B rollCycle(@NotNull RollCycle rollCycle);

long bufferCapacity();

@NotNull
B bufferCapacity(long ringBufferSize);

@NotNull
B epoch(long epoch);

long epoch();

@NotNull
RollCycle rollCycle();

@NotNull
B onThrowable(@NotNull Consumer<Throwable> onThrowable);

@NotNull @NotNull
ChronicleQueue build(); B buffered(boolean isBuffered);

boolean buffered();

@Nullable
EventLoop eventLoop();

@NotNull
B eventLoop(EventLoop eventLoop);

@NotNull
B bufferCapacity(int bufferCapacity);

B indexCount(int indexCount);

int indexCount();

B indexSpacing(int indexSpacing);

int indexSpacing();

} }
33 changes: 33 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/DumpQueueMain.java
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,33 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.wire.Wires;

import java.io.File;
import java.io.IOException;

/**
* Created by Peter on 07/03/2016.
*/
public class DumpQueueMain {
public static void main(String[] args) {
dump(args[0]);
}

public static void dump(String dir) {
File[] files = new File(dir).listFiles();
if (files == null) {
System.err.println("Directory not found " + dir);
}
for (File file : files) {
if (file.getName().endsWith(SingleChronicleQueue.SUFFIX)) {
try (MappedBytes bytes = MappedBytes.mappedBytes(file, 4 << 20)) {
System.out.println(Wires.fromSizePrefixedBlobs(bytes));
} catch (IOException ioe) {
System.err.println("Failed to read " + file + " " + ioe);
}
}
}
}
}
9 changes: 5 additions & 4 deletions src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java
100644 → 100755
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -34,29 +34,30 @@ public interface ExcerptAppender extends ExcerptCommon {
* @param writer to write to excerpt. * @param writer to write to excerpt.
* @return the index last written or -1 if a buffered appender is being used * @return the index last written or -1 if a buffered appender is being used
*/ */
long writeDocument(@NotNull WriteMarshallable writer); void writeDocument(@NotNull WriteMarshallable writer);


/** /**
* @param marshallable to write to excerpt. * @param marshallable to write to excerpt.
* @return the index last written or -1 if a buffered appender is being used * @return the index last written or -1 if a buffered appender is being used
*/ */
long writeBytes(@NotNull WriteBytesMarshallable marshallable); void writeBytes(@NotNull WriteBytesMarshallable marshallable);


/** /**
* @param bytes to write to excerpt. * @param bytes to write to excerpt.
* @return the index last written -1 if a buffered appender is being used * @return the index last written -1 if a buffered appender is being used
*/ */
long writeBytes(@NotNull Bytes<?> bytes); void writeBytes(@NotNull Bytes<?> bytes);


/** /**
* @return the index last written, this index includes the cycle and the sequence number * @return the index last written, this index includes the cycle and the sequence number
* @throws IllegalStateException if no index is available * @throws IllegalStateException if no index is available
*/ */
long index(); long lastIndexAppended();


/** /**
* @return the cycle this tailer is on, usually with chronicle-queue each cycle will have * @return the cycle this tailer is on, usually with chronicle-queue each cycle will have
* its own unique data file to store the excerpt * its own unique data file to store the excerpt
*/ */
long cycle(); long cycle();

} }
5 changes: 0 additions & 5 deletions src/main/java/net/openhft/chronicle/queue/ExcerptCommon.java
100644 → 100755
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,9 +20,4 @@
*/ */
public interface ExcerptCommon { public interface ExcerptCommon {


/**
* Hint to the underlying store to get the next page(s)
*/
default void prefetch() {
}
} }
17 changes: 15 additions & 2 deletions src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java
100644 → 100755
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -77,18 +77,31 @@ public interface ExcerptTailer extends ExcerptCommon {
boolean moveToIndex(long index); boolean moveToIndex(long index);


/** /**
* Replay from the lower. * Replay from the first entry in the first cycle.
* *
* @return this Excerpt * @return this Excerpt
*/ */
@NotNull @NotNull
ExcerptTailer toStart(); ExcerptTailer toStart();


/** /**
* Wind to the upper. * Wind to the last entry int eh last entry
* *
* @return this Excerpt * @return this Excerpt
*/ */
@NotNull @NotNull
ExcerptTailer toEnd(); ExcerptTailer toEnd();

/**
* @return the direction of movement after reading an entry.
*/
TailerDirection direction();

/**
* Set the direction of movement.
*
* @param direction NONE, FORWARD, BACKWARD
* @return this
*/
ExcerptTailer direction(TailerDirection direction);
} }
31 changes: 3 additions & 28 deletions src/main/java/net/openhft/chronicle/queue/RollCycle.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,40 +15,15 @@
*/ */
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import org.jetbrains.annotations.NotNull;

import java.time.ZoneId;


public interface RollCycle { public interface RollCycle {


@NotNull
static RollCycle from(final int length, @NotNull final String format, @NotNull final ZoneId zone) {
return new RollCycle() {
@NotNull
@Override
public String format() {
return format;
}

@Override
public int length() {
return length;
}

@NotNull
@Override
public ZoneId zone() {
return zone;
}
};
}

String format(); String format();


int length(); int length();


ZoneId zone(); int defaultIndexCount();

int defaultIndexSpacing();


/** /**
* @param epoch and EPOCH offset, to all the user to define thier own epoch * @param epoch and EPOCH offset, to all the user to define thier own epoch
Expand Down
Loading

0 comments on commit 670a82c

Please sign in to comment.