From e128cb1e9ce9dc3860bef98b07e211e5f35745b0 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 17 Apr 2015 19:49:33 +0100 Subject: [PATCH] Refactoring after a review of the code. --- .../ClientWiredChronicleQueueStateless.java | 13 +- .../client/internal/QueueWireHandler.java | 20 +- .../chronicle/queue/impl/ChronicleWire.java | 46 --- .../chronicle/queue/impl/ChronicleWireIn.java | 46 --- .../queue/impl/ChronicleWireOut.java | 47 --- .../openhft/chronicle/queue/impl/Indexer.java | 4 +- .../chronicle/queue/impl/SingleAppender.java | 12 +- .../queue/impl/SingleChronicleQueue.java | 280 +++++++++--------- .../chronicle/queue/impl/SingleTailer.java | 80 +++-- .../impl/ringbuffer/BytesRingBuffer.java | 6 +- .../chronicle/queue/BytesRingBufferTest.java | 102 ++++--- .../queue/DirectChronicleQueueStringTest.java | 45 ++- .../queue/impl/SimpleChronicleQueueTest.java | 16 +- 13 files changed, 277 insertions(+), 440 deletions(-) delete mode 100644 chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWire.java delete mode 100644 chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireIn.java delete mode 100644 chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireOut.java diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/ClientWiredChronicleQueueStateless.java b/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/ClientWiredChronicleQueueStateless.java index c867170c91..83cf4990e9 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/ClientWiredChronicleQueueStateless.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/ClientWiredChronicleQueueStateless.java @@ -25,13 +25,10 @@ import net.openhft.chronicle.queue.Excerpt; import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptTailer; -import net.openhft.chronicle.wire.TextWire; -import net.openhft.chronicle.wire.ValueOut; import net.openhft.chronicle.wire.WireKey; import org.jetbrains.annotations.NotNull; import java.io.IOException; -import java.util.function.Consumer; /** * Created by Rob Austin @@ -68,7 +65,8 @@ public ExcerptTailer createTailer() throws IOException { @NotNull @Override public ExcerptAppender createAppender() throws IOException { - return new ClientWiredExcerptAppenderStateless(this, hub, TextWire::new); + throw new UnsupportedOperationException(); +// return new ClientWiredExcerptAppenderStateless(this, hub, TextWire::new); } @Override @@ -86,16 +84,15 @@ public long firstAvailableIndex() { throw new UnsupportedOperationException("todo"); } + public long lastWrittenIndex() { + return proxyReturnLong(EventId.lastWrittenIndex); + } @Override public void close() throws IOException { // todo add ref count } - public long lastWrittenIndex() { - return proxyReturnLong(EventId.lastWrittenIndex); - } - enum EventId implements ParameterizeWireKey { lastWrittenIndex, createAppender, diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/QueueWireHandler.java b/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/QueueWireHandler.java index f3339450af..171c7aea3e 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/QueueWireHandler.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/engine/client/internal/QueueWireHandler.java @@ -29,7 +29,6 @@ import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueueBuilder; import net.openhft.chronicle.queue.ExcerptAppender; -import net.openhft.chronicle.queue.impl.ChronicleWire; import net.openhft.chronicle.wire.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,9 +47,14 @@ */ public class QueueWireHandler implements WireHandler, Consumer { - private static final Logger LOG = LoggerFactory.getLogger(QueueWireHandler.class); public static final int SIZE_OF_SIZE = ClientWiredStatelessTcpConnectionHub.SIZE_OF_SIZE; - + private static final Logger LOG = LoggerFactory.getLogger(QueueWireHandler.class); + final StringBuilder cspText = new StringBuilder(); + final StringBuilder eventName = new StringBuilder(); + // assume there is a handler for each connection. + long tid = -1; + long cid = -1; + ChronicleQueue queue = null; private WireHandlers publishLater; private Wire inWire; private Wire outWire; @@ -60,13 +64,6 @@ public class QueueWireHandler implements WireHandler, Consumer { private AtomicInteger cidCounter = new AtomicInteger(); private Map queueToAppender = new ConcurrentHashMap<>(); - // assume there is a handler for each connection. - long tid = -1; - long cid = -1; - final StringBuilder cspText = new StringBuilder(); - final StringBuilder eventName = new StringBuilder(); - ChronicleQueue queue = null; - public QueueWireHandler() { } @@ -124,10 +121,13 @@ void onEvent() throws IOException { }); outWire.writeDocument(false, wireOut -> { + throw new UnsupportedOperationException(); +/* QueueAppenderResponse qar = new QueueAppenderResponse(); qar.setCid(cid); qar.setCsp(cspText); wireOut.write(reply).typedMarshallable(qar); +*/ }); } else if (EventId.submit.contentEquals(eventName)) { ExcerptAppender appender = queueToAppender.get(queue); diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWire.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWire.java deleted file mode 100644 index b24d8bd669..0000000000 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWire.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2015 Higher Frequency Trading - * - * http://www.higherfrequencytrading.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.openhft.chronicle.queue.impl; - -import net.openhft.chronicle.wire.Wire; -import net.openhft.chronicle.wire.WireIn; -import net.openhft.chronicle.wire.WireOut; -import net.openhft.chronicle.wire.WrappedWire; -import org.jetbrains.annotations.NotNull; - -/** - * Created by peter.lawrey on 03/02/15. - */ -public class ChronicleWire extends WrappedWire implements Wire { - public ChronicleWire(Wire wire) { - super(wire); - } - - @NotNull - @Override - protected WireOut thisWireOut() { - return this; - } - - @NotNull - @Override - protected WireIn thisWireIn() { - return this; - } -} diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireIn.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireIn.java deleted file mode 100644 index 86d3cc53ea..0000000000 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireIn.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2015 Higher Frequency Trading - * - * http://www.higherfrequencytrading.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.openhft.chronicle.queue.impl; - -import net.openhft.chronicle.wire.Wire; -import net.openhft.chronicle.wire.WireIn; -import net.openhft.chronicle.wire.WireOut; -import net.openhft.chronicle.wire.WrappedWire; -import org.jetbrains.annotations.NotNull; - -/** - * Created by peter.lawrey on 03/02/15. - */ -public class ChronicleWireIn extends WrappedWire implements WireIn { - public ChronicleWireIn(Wire wire) { - super(wire); - } - - @NotNull - @Override - protected WireOut thisWireOut() { - throw new UnsupportedOperationException(); - } - - @NotNull - @Override - protected WireIn thisWireIn() { - return this; - } -} diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireOut.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireOut.java deleted file mode 100644 index dcb7cf3b57..0000000000 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ChronicleWireOut.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2015 Higher Frequency Trading - * - * http://www.higherfrequencytrading.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.openhft.chronicle.queue.impl; - -import net.openhft.chronicle.wire.Wire; -import net.openhft.chronicle.wire.WireIn; -import net.openhft.chronicle.wire.WireOut; -import net.openhft.chronicle.wire.WrappedWire; -import org.jetbrains.annotations.NotNull; - -/** - * Created by peter.lawrey on 03/02/15. - */ -public class ChronicleWireOut extends WrappedWire implements WireOut { - public ChronicleWireOut(Wire wire) { - super(wire); - } - - @NotNull - @Override - protected WireOut thisWireOut() { - return this; - } - - @NotNull - @Override - protected WireIn thisWireIn() { - throw new UnsupportedOperationException(); - } - -} diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Indexer.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Indexer.java index 03a72de89c..b7f1b0013f 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Indexer.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Indexer.java @@ -49,9 +49,9 @@ public Indexer(@NotNull final AbstractChronicle chronicle) { newLongArrayValuesPool(Class wireType) { if (TextWire.class.isAssignableFrom(wireType)) - return withInitial(LongArrayTextReference::new); + return withInitial(TextLongArrayReference::new); if (BinaryWire.class.isAssignableFrom(wireType)) - return withInitial(LongArrayDirectReference::new); + return withInitial(BinaryLongArrayReference::new); else throw new IllegalStateException("todo, unsupported type=" + wireType); diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleAppender.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleAppender.java index be7627691a..c86d15d5b9 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleAppender.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleAppender.java @@ -22,13 +22,13 @@ import net.openhft.chronicle.bytes.NativeBytes; import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ExcerptAppender; -import net.openhft.chronicle.wire.BinaryWire; import net.openhft.chronicle.wire.Wire; import net.openhft.chronicle.wire.WireOut; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.function.Consumer; +import java.util.function.Function; /** * Created by peter.lawrey on 30/01/15. @@ -37,22 +37,20 @@ public class SingleAppender implements ExcerptAppender { @NotNull private final DirectChronicleQueue chronicle; - @Nullable - private final ChronicleWireOut wireOut; private final Bytes buffer = NativeBytes.nativeBytes(); - private final Wire wire = new BinaryWire(buffer); + private final Wire wire; private long lastWrittenIndex = -1; - public SingleAppender(ChronicleQueue chronicle) { + public SingleAppender(ChronicleQueue chronicle, Function bytesToWire) { this.chronicle = (DirectChronicleQueue) chronicle; - wireOut = new ChronicleWireOut(wire); + wire = bytesToWire.apply(buffer); } @Nullable @Override public WireOut wire() { - return wireOut; + return wire; } @Override diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleChronicleQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleChronicleQueue.java index 539ee4e7e4..e32d0fa6c7 100755 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleChronicleQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleChronicleQueue.java @@ -36,21 +36,16 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import static net.openhft.chronicle.queue.impl.Indexer.NUMBER_OF_ENTRIES_IN_EACH_INDEX; import static net.openhft.chronicle.wire.Wires.isData; /** - * SingleChronicle implements Chronicle over a single streaming file

Created by peter.lawrey on - * 30/01/15. + * SingleChronicle implements Chronicle over a single streaming file

Created by peter.lawrey on 30/01/15. */ public class SingleChronicleQueue extends AbstractChronicle { - // don't write to this without reviewing net.openhft.chronicle.queue.impl.SingleChronicleQueue.casMagicOffset - private static final long MAGIC_OFFSET = 0L; - - private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueue.class.getName()); - static final long HEADER_OFFSET = 8L; static final long UNINITIALISED = 0L; static final long BUILDING = BytesUtil.asLong("BUILDING"); @@ -59,23 +54,23 @@ public class SingleChronicleQueue extends AbstractChronicle { static final int META_DATA = Wires.META_DATA; static final int LENGTH_MASK = Wires.LENGTH_MASK; static final int MAX_LENGTH = LENGTH_MASK; - + // don't write to this without reviewing net.openhft.chronicle.queue.impl.SingleChronicleQueue.casMagicOffset + private static final long MAGIC_OFFSET = 0L; + private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueue.class.getName()); + final Header header = new Header(); + @NotNull + final Wire wire; private final ThreadLocal localAppender = new ThreadLocal<>(); - @NotNull private final MappedFile mappedFile; private final Bytes headerMemory; - final Header header = new Header(); - @NotNull - final ChronicleWire wire; @NotNull private final Bytes bytes; private final Class wireType; - private long firstBytes = -1; - - + private final Function bytesToWireFunction; // used in the indexer private final ThreadLocal longArray; + private long firstBytes = -1; public SingleChronicleQueue(@NotNull final String filename, long blockSize, @@ -86,84 +81,26 @@ public SingleChronicleQueue(@NotNull final String filename, bytes = mappedFile.bytes(); this.wire = createWire(wireType, bytes); this.wireType = wireType; + this.bytesToWireFunction = byteToWireFor(wireType); longArray = Indexer.newLongArrayValuesPool(wireType()); initialiseHeader(); } - private static ChronicleWire createWire(@NotNull final Class wireType, + private static Wire createWire(@NotNull final Class wireType, @NotNull final Bytes bytes) { - final Wire rootWire; - - if (BinaryWire.class.isAssignableFrom(wireType)) { - rootWire = new BinaryWire(bytes); - } else if (TextWire.class.isAssignableFrom(wireType)) { - rootWire = new TextWire(bytes); - } else - throw new UnsupportedOperationException("todo"); - - return new ChronicleWire(rootWire); - } - - - @Override - public boolean readDocument(@NotNull AtomicLong offset, @NotNull Bytes buffer) { - buffer.clear(); - long lastByte = offset.get(); - for (; ; ) { - int length = bytes.readVolatileInt(lastByte); - int length2 = length30(length); - if (Wires.isReady(length)) { - lastByte += 4; - buffer.write(bytes, lastByte, length2); - lastByte += length2; - offset.set(lastByte); - return isData(length); - } - if (Thread.currentThread().isInterrupted()) - return false; - } - } - - @NotNull - @Override - public Bytes bytes() { - return bytes; - } - - @Override - public long lastIndex() { - long value = header.lastIndex().getVolatileValue(); - if (value == -1) - throw new IllegalStateException("No data has been written to chronicle."); - return value; - } - - @Override - public boolean index(long index, @NotNull BytesStoreBytes bytes) { - if (index == -1) { - bytes.bytesStore(headerMemory, HEADER_OFFSET, headerMemory.length() - HEADER_OFFSET); - return true; - } - return false; - } - - private int length30(int i) { - return i & LENGTH_MASK; - } - - @Override - Wire wire() { - return wire; - } - - @Override - public Class wireType() { - return wireType; + return byteToWireFor(wireType).apply(bytes); } - enum MetaDataKey implements WireKey { - header, index2index, index + static Function byteToWireFor(Class wireType) { + if (TextWire.class.isAssignableFrom(wireType)) + return TextWire::new; + else if (BinaryWire.class.isAssignableFrom(wireType)) + return BinaryWire::new; + else if (RawWire.class.isAssignableFrom(wireType)) + return RawWire::new; + else + throw new UnsupportedOperationException("todo"); } private void initialiseHeader() throws IOException { @@ -173,8 +110,19 @@ private void initialiseHeader() throws IOException { readHeader(); } + private void buildHeader() { + // skip the magic number. + bytes.position(HEADER_OFFSET); + + wire.writeDocument(true, w -> w + .write(MetaDataKey.header).marshallable(header.init(Compression.NONE))); + + if (!bytes.compareAndSwapLong(MAGIC_OFFSET, BUILDING, QUEUE_CREATED)) + throw new AssertionError("Concurrent writing of the header"); + } + private void readHeader() throws IOException { - // skip the magic number. + // skip the magic number. waitForTheHeaderToBeBuilt(bytes); bytes.position(HEADER_OFFSET); @@ -202,29 +150,6 @@ private void waitForTheHeaderToBeBuilt(@NotNull Bytes bytes) throws IOException throw new AssertionError("Timeout waiting to build the file " + name()); } - private void buildHeader() { - // skip the magic number. - bytes.position(HEADER_OFFSET); - - wire.writeDocument(true, w -> w - .write(MetaDataKey.header).marshallable(header.init(Compression.NONE))); - - if (!bytes.compareAndSwapLong(MAGIC_OFFSET, BUILDING, QUEUE_CREATED)) - throw new AssertionError("Concurrent writing of the header"); - } - - static String getHostName() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - try { - return Files.readAllLines(Paths.get("etc", "hostname")).get(0); - } catch (Exception e2) { - return "localhost"; - } - } - } - @Override public String name() { return mappedFile.name(); @@ -239,12 +164,7 @@ public Excerpt createExcerpt() throws IOException { @NotNull @Override public ExcerptTailer createTailer() throws IOException { - if (TextWire.class.isAssignableFrom(wireType())) - return new SingleTailer(this, TextWire::new); - else if (BinaryWire.class.isAssignableFrom(wireType())) - return new SingleTailer(this, BinaryWire::new); - else - throw new UnsupportedOperationException("todo"); + return new SingleTailer(this, bytesToWireFunction); } @NotNull @@ -252,7 +172,7 @@ else if (BinaryWire.class.isAssignableFrom(wireType())) public ExcerptAppender createAppender() throws IOException { ExcerptAppender appender = localAppender.get(); if (appender == null) - localAppender.set(appender = new SingleAppender(this)); + localAppender.set(appender = new SingleAppender(this, bytesToWireFunction)); return appender; } @@ -276,16 +196,27 @@ public long lastWrittenIndex() { return 0; } - @Override - public void close() { - throw new UnsupportedOperationException(); + static String getHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + try { + return Files.readAllLines(Paths.get("etc", "hostname")).get(0); + } catch (Exception e2) { + return "localhost"; + } + } } @Override - public long firstBytes() { - return firstBytes; + Wire wire() { + return wire; } + @Override + public Class wireType() { + return wireType; + } /** * @return gets the index2index, or creates it, if it does not exist. @@ -310,13 +241,11 @@ long indexToIndex() { } } - /** - * Creates a new Excerpt containing and index which will be 1L << 17L bytes long, This method is - * used for creating both the primary and secondary indexes. Chronicle Queue uses a root primary - * index ( each entry in the primary index points to a unique a secondary index. The secondary - * index only records the address of every 64th except, the except are linearly scanned from - * there on. + * Creates a new Excerpt containing and index which will be 1L << 17L bytes long, This method is used for creating + * both the primary and secondary indexes. Chronicle Queue uses a root primary index ( each entry in the primary + * index points to a unique a secondary index. The secondary index only records the address of every 64th except, + * the except are linearly scanned from there on. * * @return the address of the Excerpt containing the usable index, just after the header */ @@ -335,27 +264,28 @@ long newIndex() { } - - @Override - public long appendDocument(@NotNull Bytes buffer) { + /** + * This method does not update the index, as indexs are not used for meta data + * + * @param buffer + * @return the address of the appended data + */ + private long appendMetaDataReturnAddress(@NotNull Bytes buffer) { long length = buffer.remaining(); if (length > MAX_LENGTH) throw new IllegalStateException("Length too large: " + length); LongValue writeByte = header.writeByte(); - + long lastByte = writeByte.getVolatileValue(); for (; ; ) { - long lastByte = writeByte.getVolatileValue(); - if (bytes.compareAndSwapInt(lastByte, 0, NOT_READY | (int) length)) { long lastByte2 = lastByte + 4 + buffer.remaining(); bytes.write(lastByte + 4, buffer); - long lastIndex = header.lastIndex().addAtomicValue(1); writeByte.setOrderedValue(lastByte2); - bytes.writeOrderedInt(lastByte, (int) length); - return lastIndex; + bytes.writeOrderedInt(lastByte, (int) (META_DATA | length)); + return lastByte; } int length2 = length30(bytes.readVolatileInt()); bytes.skip(length2); @@ -367,28 +297,31 @@ public long appendDocument(@NotNull Bytes buffer) { } } - /** - * This method does not update the index, as indexs are not used for meta data - * - * @param buffer - * @return the address of the appended data - */ - private long appendMetaDataReturnAddress(@NotNull Bytes buffer) { + @Override + public void close() { + throw new UnsupportedOperationException(); + } + + @Override + public long appendDocument(@NotNull Bytes buffer) { long length = buffer.remaining(); if (length > MAX_LENGTH) throw new IllegalStateException("Length too large: " + length); LongValue writeByte = header.writeByte(); - long lastByte = writeByte.getVolatileValue(); + for (; ; ) { + long lastByte = writeByte.getVolatileValue(); + if (bytes.compareAndSwapInt(lastByte, 0, NOT_READY | (int) length)) { long lastByte2 = lastByte + 4 + buffer.remaining(); bytes.write(lastByte + 4, buffer); + long lastIndex = header.lastIndex().addAtomicValue(1); writeByte.setOrderedValue(lastByte2); - bytes.writeOrderedInt(lastByte, (int) (META_DATA | length)); - return lastByte; + bytes.writeOrderedInt(lastByte, (int) length); + return lastIndex; } int length2 = length30(bytes.readVolatileInt()); bytes.skip(length2); @@ -400,6 +333,61 @@ private long appendMetaDataReturnAddress(@NotNull Bytes buffer) { } } + @Override + public boolean readDocument(@NotNull AtomicLong offset, @NotNull Bytes buffer) { + buffer.clear(); + long lastByte = offset.get(); + for (; ; ) { + int length = bytes.readVolatileInt(lastByte); + int length2 = length30(length); + if (Wires.isReady(length)) { + lastByte += 4; + buffer.write(bytes, lastByte, length2); + lastByte += length2; + offset.set(lastByte); + return isData(length); + } + if (Thread.currentThread().isInterrupted()) + return false; + } + } + + @NotNull + @Override + public Bytes bytes() { + return bytes; + } + + @Override + public long lastIndex() { + long value = header.lastIndex().getVolatileValue(); + if (value == -1) + throw new IllegalStateException("No data has been written to chronicle."); + return value; + } + + @Override + public boolean index(long index, @NotNull BytesStoreBytes bytes) { + if (index == -1) { + bytes.bytesStore(headerMemory, HEADER_OFFSET, headerMemory.length() - HEADER_OFFSET); + return true; + } + return false; + } + + @Override + public long firstBytes() { + return firstBytes; + } + + private int length30(int i) { + return i & LENGTH_MASK; + } + + enum MetaDataKey implements WireKey { + header, index2index, index + } + } diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleTailer.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleTailer.java index 383aeefe3a..1004f98029 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleTailer.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/SingleTailer.java @@ -42,13 +42,17 @@ public class SingleTailer implements ExcerptTailer { @NotNull private final SingleChronicleQueue chronicle; - - private long index; - private final BytesStoreBytes bytes = new BytesStoreBytes(Bytes.elasticByteBuffer()); - private final Wire wire; - + /** + * reads an item in the index, the index is stored in meta data + * + * @param offset the address of the document + * @param index the index of of the array item + * @return returns a long at array {@code index} + */ + LongArrayValues values = null; + private long index; private ThreadLocal value; public SingleTailer(@NotNull final AbstractChronicle chronicle, @@ -61,7 +65,7 @@ public SingleTailer(@NotNull final AbstractChronicle chronicle, @Override public WireIn wire() { - return new ChronicleWireIn(null); + return wire; } @Override @@ -70,42 +74,6 @@ public boolean readDocument(Consumer reader) { return true; } - - /** - * reads an item in the index, the index is stored in meta data - * - * @param offset the address of the document - * @param index the index of of the array item - * @return returns a long at array {@code index} - */ - private long readIndexAt(long offset, long index) { - - if (offset == 0) - return 0; - - long pos = chronicle.bytes().position(); - - try { - - final LongArrayValues values = value.get(); - final long[] result = new long[1]; - - chronicle.bytes().position(offset); - chronicle.wire.readDocument(wireIn -> { - - wireIn.read(() -> "index").int64array(values, null); - result[0] = values.getVolatileValueAt(index); - - }, null); - - return result[0]; - - } finally { - chronicle.bytes().position(pos); - } - - } - /** * The indexes are stored in many excerpts, so the index2index tells chronicle where ( in other * words the address of where ) the root first level index is stored. The indexing works like a @@ -227,6 +195,34 @@ public ExcerptTailer toEnd() { return this; } + private long readIndexAt(long offset, long index) { + + if (offset == 0) + return 0; + + long pos = chronicle.bytes().position(); + + try { + +// final LongArrayValues values = value.get(); + final long[] result = new long[1]; + + chronicle.bytes().position(offset); + chronicle.wire.readDocument(wireIn -> { + + wireIn.read(() -> "index").int64array(values, v -> values = v); + result[0] = values.getVolatileValueAt(index); + + }, null); + + return result[0]; + + } finally { + chronicle.bytes().position(pos); + } + + } + @NotNull @Override public ChronicleQueue chronicle() { diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java index 601cb85b35..a0959e5260 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesRingBuffer.java @@ -252,13 +252,13 @@ private Header(@NotNull Bytes buffer) { // final AtomicLong readLocationAtomic = new AtomicLong(); // final AtomicLong writeUpToOffsetAtomic = new AtomicLong(); - private boolean compareAndSetWriteLocation(long expectedValue, long newValue) { + boolean compareAndSetWriteLocation(long expectedValue, long newValue) { //return writeLocationAtomic.compareAndSet(expectedValue, newValue); // todo replace the above with this : return buffer.compareAndSwapLong(writeLocationOffset, expectedValue, newValue); } - private long getWriteLocation() { + long getWriteLocation() { //return writeLocationAtomic.get(); // todo replace the above with this : return buffer.readVolatileLong(writeLocationOffset); @@ -267,7 +267,7 @@ private long getWriteLocation() { /** * @return the point at which you should not write any additional bits */ - private long getWriteUpTo() { + long getWriteUpTo() { //return writeUpToOffsetAtomic.get(); // todo replace the above with this : return buffer.readVolatileLong(writeUpToOffset); diff --git a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/BytesRingBufferTest.java b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/BytesRingBufferTest.java index 8ee52a868a..a56cedd8b7 100644 --- a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/BytesRingBufferTest.java +++ b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/BytesRingBufferTest.java @@ -22,8 +22,10 @@ import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.NativeBytesStore; import net.openhft.chronicle.queue.impl.ringbuffer.BytesRingBuffer; -import org.jetbrains.annotations.NotNull; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; @@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * @author Rob Austin. @@ -46,12 +49,18 @@ public class BytesRingBufferTest { Bytes output; Bytes out; NativeBytesStore outBuffer; + @Before public void setup() { - outBuffer = NativeBytesStore.nativeStoreWithFixedCapacity(12); - out = outBuffer.bytes(); - out.writeUTFΔ(EXPECTED); - output = out.flip().bytes(); + try { + outBuffer = NativeBytesStore.nativeStoreWithFixedCapacity(12); + out = outBuffer.bytes(); + out.writeUTFΔ(EXPECTED); + output = out.flip().bytes(); + } catch (Throwable e) { + e.printStackTrace(); + fail(); + } } @After @@ -67,13 +76,9 @@ public void testWriteAndRead3SingleThreadedWrite() throws Exception { for (int i = 0; i < 100; i++) { bytesRingBuffer.offer(data()); - Bytes bytes = bytesRingBuffer.take(new BytesRingBuffer.BytesProvider() { - @NotNull - @Override - public Bytes provide(long maxSize) { - Bytes clear = input.clear(); - return clear; - } + Bytes bytes = bytesRingBuffer.take(maxSize -> { + Bytes clear = input.clear(); + return clear; }); assertEquals(EXPECTED, bytes.readUTFΔ()); @@ -82,6 +87,10 @@ public Bytes provide(long maxSize) { } } + private Bytes data() { + output.clear(); + return output; + } @Test public void testSimpledSingleThreadedWriteRead() throws Exception { @@ -121,11 +130,6 @@ public void testWriteAndRead() throws Exception { } } - private Bytes data() { - output.clear(); - return output; - } - @Test public void testFlowAroundSingleThreadedWriteDifferentSizeBuffers() throws Exception { try (NativeBytesStore nativeStore = NativeBytesStore.nativeStoreWithFixedCapacity(150)) { @@ -165,13 +169,9 @@ public void testWrite3read3SingleThreadedWrite() throws Exception { // assertEquals(EXPECTED, bytesRingBuffer.take(maxSize -> input.clear()).readUTFΔ()); if (i == 29) System.out.println(""); - Bytes bytes = bytesRingBuffer.take(new BytesRingBuffer.BytesProvider() { - @NotNull - @Override - public Bytes provide(long maxSize) { - Bytes clear = input.clear(); - return clear; - } + Bytes bytes = bytesRingBuffer.take(maxSize -> { + Bytes clear = input.clear(); + return clear; }); try { @@ -191,16 +191,16 @@ public Bytes provide(long maxSize) { } } - // @Ignore("works in lang-bytes") + // @Ignore("works in lang-bytes") @Test - public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Exception { - + public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws InterruptedException { + System.out.println("Hello World"); try (NativeBytesStore allocate = NativeBytesStore.nativeStoreWithFixedCapacity(1000)) { final BytesRingBuffer bytesRingBuffer = new BytesRingBuffer(allocate.bytes()); //writer - int iterations = 20_000; + final int iterations = 100_000; { ExecutorService executorService = Executors.newFixedThreadPool(2); @@ -219,8 +219,9 @@ public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Except do { offer = bytesRingBuffer.offer(out); } while (!offer); + System.out.println("+"); - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); } }); @@ -229,6 +230,7 @@ public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Except CountDownLatch count = new CountDownLatch(iterations); + System.out.println(count); //reader @@ -237,30 +239,26 @@ public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Except for (int i = 0; i < iterations; i++) { executorService.submit(() -> { - try { - try (NativeBytesStore nativeStore = NativeBytesStore.nativeStoreWithFixedCapacity(25)) { - Bytes bytes = nativeStore.bytes(); - Bytes result = null; - do { - try { - result = bytesRingBuffer.poll(maxSize -> bytes); - } catch (InterruptedException e) { - return; - } - } while (result == null); + try (NativeBytesStore nativeStore = NativeBytesStore.nativeStoreWithFixedCapacity(25)) { + Bytes bytes = nativeStore.bytes(); + Bytes result = null; + do { + try { + result = bytesRingBuffer.poll(maxSize -> bytes); + } catch (InterruptedException e) { + return; + } + } while (result == null); - result.clear(); - String actual = result.readUTFΔ(); + result.clear(); + String actual = result.readUTFΔ(); - if (actual.startsWith(EXPECTED_VALUE)) - count.countDown(); - } catch (Error e) { - e.printStackTrace(); - } - } catch (Exception e) { + if (actual.startsWith(EXPECTED_VALUE)) + count.countDown(); + System.out.println("-"); + } catch (Throwable e) { e.printStackTrace(); } - }); } } @@ -270,8 +268,8 @@ public void testMultiThreadedCheckAllEntriesReturnedAreValidText() throws Except } } - // @Ignore("works in lang-bytes, appears to be a visibility issue that can be fixed by adding - // a" + + // @Ignore("works in lang-bytes, appears to be a visibility issue that can be fixed by adding + // a" + // " synchronized to ringbuffer.poll() and ringbuffer.offer()") @Test public void testMultiThreadedWithIntValues() throws Exception { diff --git a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/DirectChronicleQueueStringTest.java b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/DirectChronicleQueueStringTest.java index efccc6486a..b8509ced65 100644 --- a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/DirectChronicleQueueStringTest.java +++ b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/DirectChronicleQueueStringTest.java @@ -62,14 +62,14 @@ public void testCreateAppender() throws Exception { File file = new File(name); file.deleteOnExit(); - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(name) + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(name) .build(); writeSome(chronicle); long mid = System.nanoTime(); - DirectChronicleQueue chronicle2 = (DirectChronicleQueue) new ChronicleQueueBuilder(name) + DirectChronicleQueue chronicle2 = new ChronicleQueueBuilder(name) .build(); readSome(chronicle2); @@ -80,6 +80,26 @@ public void testCreateAppender() throws Exception { } } + private void writeSome(DirectChronicleQueue chronicle) { + NativeBytesStore allocate = NativeBytesStore.nativeStoreWithFixedCapacity(EXPECTED_BYTES.length); + final Bytes toWrite = allocate.bytes(); + for (int i = 0; i < RUNS; i++) { + toWrite.clear(); + toWrite.write(EXPECTED_BYTES); + toWrite.flip(); + chronicle.appendDocument(toWrite); + } + } + + private void readSome(DirectChronicleQueue chronicle) { + NativeBytesStore allocate = NativeBytesStore.nativeStoreWithFixedCapacity(EXPECTED_BYTES.length); + final Bytes toRead = allocate.bytes(); + AtomicLong offset = new AtomicLong(chronicle.firstBytes()); + for (int i = 0; i < RUNS; i++) { + toRead.clear(); + chronicle.readDocument(offset, toRead); + } + } @Test public void testCreateAppenderMT() throws Exception { @@ -129,25 +149,4 @@ public void testCreateAppenderMT() throws Exception { } } } - - private void readSome(DirectChronicleQueue chronicle) { - NativeBytesStore allocate = NativeBytesStore.nativeStoreWithFixedCapacity(EXPECTED_BYTES.length); - final Bytes toRead = allocate.bytes(); - AtomicLong offset = new AtomicLong(chronicle.firstBytes()); - for (int i = 0; i < RUNS; i++) { - toRead.clear(); - chronicle.readDocument(offset, toRead); - } - } - - private void writeSome(DirectChronicleQueue chronicle) { - NativeBytesStore allocate = NativeBytesStore.nativeStoreWithFixedCapacity(EXPECTED_BYTES.length); - final Bytes toWrite = allocate.bytes(); - for (int i = 0; i < RUNS; i++) { - toWrite.clear(); - toWrite.write(EXPECTED_BYTES); - toWrite.flip(); - chronicle.appendDocument(toWrite); - } - } } \ No newline at end of file diff --git a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/SimpleChronicleQueueTest.java b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/SimpleChronicleQueueTest.java index 27ef6f62bb..e56fbf2354 100644 --- a/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/SimpleChronicleQueueTest.java +++ b/chronicle-queue/src/test/java/net/openhft/chronicle/queue/impl/SimpleChronicleQueueTest.java @@ -25,7 +25,7 @@ public void testSimpleWire() throws Exception { try { final DirectChronicleQueue - chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); final ExcerptAppender appender = chronicle.createAppender(); appender.writeDocument(wire -> wire.write(() -> "FirstName").text("Steve")); @@ -57,7 +57,7 @@ public void testSimpleDirect() throws Exception { file.deleteOnExit(); - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); final ExcerptAppender appender = chronicle.createAppender(); @@ -91,7 +91,7 @@ public void testReadAtIndex() throws Exception { try { - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); final ExcerptAppender appender = chronicle.createAppender(); @@ -125,7 +125,7 @@ public void testLastWrittenIndexPerAppender() throws Exception { file.deleteOnExit(); try { - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); final ExcerptAppender appender = chronicle.createAppender(); @@ -144,7 +144,7 @@ public void testLastWrittenIndexPerAppenderNoData() throws Exception { File file = File.createTempFile("chronicle.", "q"); file.deleteOnExit(); try { - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); final ExcerptAppender appender = chronicle.createAppender(); appender.lastWrittenIndex(); @@ -161,7 +161,7 @@ public void testLastIndexPerChronicle() throws Exception { file.deleteOnExit(); try { - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); final ExcerptAppender appender = chronicle.createAppender(); appender.writeDocument(wire -> wire.write(() -> "key").text("test")); @@ -180,7 +180,7 @@ public void testLastIndexPerChronicleNoData() throws Exception { file.deleteOnExit(); try { - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); Assert.assertEquals(-1, chronicle.lastIndex()); } finally { @@ -196,7 +196,7 @@ public void testHeaderIndexReadAtIndex() throws Exception { file.deleteOnExit(); try { - DirectChronicleQueue chronicle = (DirectChronicleQueue) new ChronicleQueueBuilder(file.getAbsolutePath()).build(); + DirectChronicleQueue chronicle = new ChronicleQueueBuilder(file.getAbsolutePath()).build(); final ExcerptAppender appender = chronicle.createAppender();