diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java index bccc00a16c..5903230876 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java @@ -104,5 +104,9 @@ public interface ChronicleQueue extends Closeable { */ long lastWrittenIndex(); + /** + * + * @return + */ WireType wireType(); } diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java index e25b1a06ae..c2be2f63c3 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/AbstractChronicleQueue.java @@ -19,7 +19,7 @@ import net.openhft.chronicle.queue.Excerpt; import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptTailer; -import net.openhft.chronicle.wire.Wire; +import net.openhft.chronicle.wire.WireType; import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -71,6 +71,11 @@ public void close() throws IOException { throw new UnsupportedOperationException("Not implemented"); } + @Override + public WireType wireType() { + throw new UnsupportedOperationException("Not implemented"); + } + /** * * @param cycle @@ -109,12 +114,6 @@ public void close() throws IOException { */ public abstract long indexToIndex(); - /** - * - * @return - */ - public abstract Wire wire(); - /** * * @return diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java index 7326612a1a..d67bfd50f1 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java @@ -19,7 +19,7 @@ import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.BytesStore; -import net.openhft.chronicle.bytes.NativeBytes; +import net.openhft.chronicle.bytes.NativeBytesStore; import net.openhft.chronicle.bytes.ReadBytesMarshallable; import net.openhft.chronicle.core.annotation.ForceInline; import net.openhft.chronicle.core.util.ThrowingFunction; @@ -76,7 +76,7 @@ public ChronicleQueue queue() { * Delegates the appender */ public static class DelegatedAppender extends DefaultAppender { - private final Bytes buffer; + private final BytesStore store; private final Wire wire; private final Consumer consumer; @@ -86,16 +86,16 @@ public DelegatedAppender( super(queue); - this.buffer = NativeBytes.nativeBytes(); - this.wire = queue.wireType().apply(this.buffer); + this.store = NativeBytesStore.nativeStoreWithFixedCapacity(1024); + this.wire = queue.wireType().apply(this.store.bytesForWrite()); this.consumer = consumer; } @Override public long writeDocument(WriteMarshallable writer) throws IOException { - buffer.clear(); + wire.bytes().clear(); writer.writeMarshallable(wire); - consumer.accept(buffer); + consumer.accept(wire.bytes()); return WireConstants.NO_INDEX; } @@ -138,8 +138,11 @@ public boolean write( final Bytes bytes = bytesFunction.apply(store::acquire); if(bytes != null && !bytes.isClear()) { bytes.writeVolatileInt( - bytes.start(), - Wires.toIntU30(bytes.writePosition() - bytes.start() + 4, "TODO")); + bytes.readPosition(), + Wires.toIntU30( + bytes.length() - 4, + "Document length %,d out of 30-bit int range.") + ); return true; } @@ -186,7 +189,7 @@ private WireStore store() throws IOException { this.store = queue.storeForCycle(this.cycle); } - return this.store(); + return this.store; } } diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/async/AsyncChronicleQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/async/AsyncChronicleQueue.java index 456ccd4e0e..55047b0e02 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/async/AsyncChronicleQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/async/AsyncChronicleQueue.java @@ -42,7 +42,7 @@ public class AsyncChronicleQueue extends DelegatedChronicleQueue { public AsyncChronicleQueue(@NotNull ChronicleQueue queue, long capacity) throws IOException { super(queue); - this.store = NativeBytesStore.nativeStoreWithFixedCapacity(capacity); + this.store = NativeBytesStore.nativeStoreWithFixedCapacity(capacity); this.buffer = new BytesRingBuffer(this.store.bytesForWrite()); this.appender = null; @@ -60,6 +60,8 @@ public AsyncChronicleQueue(@NotNull ChronicleQueue queue, long capacity) throws return false; }); + + this.eventGroup.start(); } @NotNull diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java index a602493940..1f05573a91 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java @@ -63,7 +63,7 @@ public BytesRingBuffer(@NotNull final Bytes buffer) { * @return returning {@code true} upon success and {@code false} if this queue is full. */ public boolean offer(@NotNull Bytes bytes0) throws InterruptedException { - bytes0.readLimit(bytes0.writeLimit()); + //bytes0.readLimit(bytes0.writeLimit()); try { @@ -81,7 +81,7 @@ public boolean offer(@NotNull Bytes bytes0) throws InterruptedException { return false; // write the size - long len = bytes0.readLimit(); + long len = bytes0.length(); long messageLen = SIZE + FLAG + len; diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java index e78357c41a..b0b0fb6ca7 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java @@ -22,7 +22,6 @@ import net.openhft.chronicle.queue.impl.AbstractChronicleQueue; import net.openhft.chronicle.queue.impl.WireStore; import net.openhft.chronicle.queue.impl.WireStorePool; -import net.openhft.chronicle.wire.Wire; import net.openhft.chronicle.wire.WireType; import net.openhft.chronicle.wire.WiredFile; import org.jetbrains.annotations.NotNull; @@ -140,7 +139,7 @@ protected long lastCycle() { @Override public WireType wireType() { - throw new UnsupportedOperationException("todo"); + return builder.wireType(); } @Override @@ -148,11 +147,6 @@ public long indexToIndex() { throw new UnsupportedOperationException("todo"); } - @Override - public Wire wire() { - throw new UnsupportedOperationException("todo"); - } - @Override public long newIndex() { throw new UnsupportedOperationException("todo"); diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java index e83c8803d3..006f441c2e 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java @@ -172,7 +172,11 @@ public Bytes acquire(long size) throws IOException { final NativeBytes bytes = WireConstants.NBP.get(); withLock( - (store, position) -> bytes.bytesStore(store, position + 4, position + 4 + size), + (store, position) -> { + bytes.bytesStore(store, position, 4 + size); + bytes.writePosition(position + 4); + bytes.writeLimit(position + 4 + size); + }, size30 ); @@ -310,37 +314,37 @@ public long refCount() { // Utilities // ************************************************************************* - //TODO move top wire + //TODO move to wire protected boolean acquireLock(BytesStore store, long position, int size) { return store.compareAndSwapInt(position, Wires.NOT_INITIALIZED, Wires.NOT_READY | size); } - protected void withLock(BytesStoreFunction function) + protected void withLock(@NotNull BytesStoreFunction function) throws IOException { withLock(function, 0x0); } - protected void withLock(BytesStoreFunction function, int size) + protected void withLock(@NotNull BytesStoreFunction function, int size) throws IOException { long TIMEOUT_MS = 10_000; // 10 seconds. long end = System.currentTimeMillis() + TIMEOUT_MS; - long lastWritePosition = writePosition(); + long writePosition = writePosition(); BytesStore store; for (; ;) { - checkRemainingForAppend(lastWritePosition); + checkRemainingForAppend(writePosition); //TODO: a byte store should be acquired only if lastWrittenPosition is out its limits - store = mappedFile.acquireByteStore(lastWritePosition); + store = mappedFile.acquireByteStore(writePosition); - if(acquireLock(store, lastWritePosition, size)) { - function.apply(store, lastWritePosition); + if(acquireLock(store, writePosition, size)) { + function.apply(store, writePosition); return; } else { - int spbHeader = store.readInt(lastWritePosition); + int spbHeader = store.readInt(writePosition); if (Wires.isKnownLength(spbHeader)) { - lastWritePosition += Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE; + writePosition += Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE; } else { // TODO: wait strategy if(System.currentTimeMillis() > end) { diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/work/in/progress/Indexer.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/work/in/progress/Indexer.java index d1408eab84..abc998955b 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/work/in/progress/Indexer.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/single/work/in/progress/Indexer.java @@ -18,17 +18,16 @@ package net.openhft.chronicle.queue.impl.single.work.in.progress; -import net.openhft.chronicle.bytes.IORuntimeException; -import net.openhft.chronicle.core.values.LongArrayValues; import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.impl.AbstractChronicleQueue; -import net.openhft.chronicle.wire.*; +import net.openhft.chronicle.wire.BinaryLongArrayReference; +import net.openhft.chronicle.wire.ByteableLongArrayValues; +import net.openhft.chronicle.wire.TextLongArrayReference; +import net.openhft.chronicle.wire.WireType; import org.jetbrains.annotations.NotNull; import static java.lang.ThreadLocal.withInitial; -import static net.openhft.chronicle.queue.impl.single.work.in.progress.Indexer.IndexOffset.toAddress0; -import static net.openhft.chronicle.queue.impl.single.work.in.progress.Indexer.IndexOffset.toAddress1; /** @@ -96,6 +95,7 @@ public synchronized void index() throws Exception { */ private void recordAddress(long index, long address) { + /* if (index % 64 != 0) return; @@ -124,6 +124,7 @@ public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException { }, null); + */ } diff --git a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/ChronicleQueueTestBase.java b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/ChronicleQueueTestBase.java index 8b394c4ab9..b261b81d4d 100644 --- a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/ChronicleQueueTestBase.java +++ b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/ChronicleQueueTestBase.java @@ -16,6 +16,7 @@ package net.openhft.chronicle.queue; import net.openhft.chronicle.core.OS; +import net.openhft.chronicle.wire.WireKey; import org.junit.Rule; import org.junit.rules.*; import org.junit.runner.Description; @@ -62,6 +63,14 @@ protected void starting(Description description) { // // ************************************************************************* + public static enum TestKey implements WireKey { + test + } + + // ************************************************************************* + // + // ************************************************************************* + protected File getTmpDir() { try { final File tmpDir = Files.createTempDirectory(getClass().getSimpleName() + "-").toFile(); diff --git a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBufferTest.java b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBufferTest.java new file mode 100644 index 0000000000..c490f70dcb --- /dev/null +++ b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBufferTest.java @@ -0,0 +1,42 @@ +/* + * + * Copyright (C) 2015 higherfrequencytrading.com + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + * + */ +package net.openhft.chronicle.queue.impl.ringbuffer; + +import net.openhft.chronicle.bytes.Bytes; +import net.openhft.chronicle.bytes.NativeBytesStore; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; + +public class BytesRingBufferTest { + + @Ignore + @Test + public void testOffer() throws IOException, InterruptedException { + NativeBytesStore allocate = NativeBytesStore.nativeStoreWithFixedCapacity(1024); + NativeBytesStore msgBytes = NativeBytesStore.nativeStoreWithFixedCapacity(150); + + BytesRingBuffer ring = new BytesRingBuffer(allocate.bytesForWrite()); + Bytes buffer = msgBytes.bytesForWrite(); + + buffer.clear(); + buffer.writeLong(1L); + ring.offer(buffer); + } +} diff --git a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/AsyncChronicleQueueTest.java b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/AsyncChronicleQueueTest.java index 392baafe01..913b6738bb 100644 --- a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/AsyncChronicleQueueTest.java +++ b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/AsyncChronicleQueueTest.java @@ -17,9 +17,37 @@ */ package net.openhft.chronicle.queue.impl.single; +import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueueTestBase; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.impl.async.AsyncChronicleQueueBuilder; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; public class AsyncChronicleQueueTest extends ChronicleQueueTestBase { + @Test + public void testAppendAndRead() throws IOException { + final ChronicleQueue queue = SingleChronicleQueueBuilder.text(getTmpDir()).build(); + final ChronicleQueue async = new AsyncChronicleQueueBuilder(queue).build(); + + final ExcerptAppender appender = async.createAppender(); + for (int i = 0; i < 10; i++) { + final int n = i; + appender.writeDocument(w -> w.write(TestKey.test).int32(n)); + } + + final ExcerptTailer tailer = queue.createTailer(); + for (int i = 0; i < 10;) { + final int n = i; + if(tailer.readDocument(r -> assertEquals(n, r.read(TestKey.test).int32()))) { + i++; + } + } + } } diff --git a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java index f2531339a3..563c91939e 100755 --- a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java +++ b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java @@ -44,10 +44,6 @@ @RunWith(Parameterized.class) public class SingleChronicleQueueTest extends ChronicleQueueTestBase { - enum TestKey implements WireKey { - test - } - @Parameterized.Parameters public static Collection data() { return Arrays.asList(new Object[][]{