From 94d546d075df121fab6e90cd1f44ae2fe1e406ab Mon Sep 17 00:00:00 2001 From: Rob Austin Date: Sun, 21 Aug 2016 10:30:42 +0100 Subject: [PATCH] add interface internalAdaptor --- .../openhft/chronicle/queue/ExcerptAppender.java | 5 ++--- .../impl/single/SingleChronicleQueueExcerpts.java | 13 +++++++------ .../openhft/chronicle/queue/CreateAtIndexTest.java | 9 +++++---- .../queue/impl/single/SingleChronicleQueueTest.java | 3 ++- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java b/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java index 823f9263d9..4c8aea0d7b 100755 --- a/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java +++ b/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java @@ -16,7 +16,6 @@ package net.openhft.chronicle.queue; import net.openhft.chronicle.bytes.Bytes; -import net.openhft.chronicle.bytes.BytesStore; import net.openhft.chronicle.wire.DocumentContext; import net.openhft.chronicle.wire.MarshallableOut; import net.openhft.chronicle.wire.UnrecoverableTimeoutException; @@ -44,10 +43,10 @@ public interface ExcerptAppender extends ExcerptCommon, Marshal * @throws StreamCorruptedException the write failed is was unable to write the data at the * given index. */ - default void writeBytes(long index, BytesStore bytes) throws StreamCorruptedException { + /* default void writeBytes(long index, BytesStore bytes) throws StreamCorruptedException { throw new UnsupportedOperationException(); } - +*/ /** * Write an entry at a given index. This can use used for rebuilding a queue, or replication. * diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java index 61ef99a7f0..9521fdce74 100755 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueExcerpts.java @@ -60,10 +60,15 @@ interface WireWriter { // // ************************************************************************* + + public interface InternalAppender { + void writeBytes(long index, BytesStore bytes) throws StreamCorruptedException; + } + /** * // StoreAppender */ - static class StoreAppender implements ExcerptAppender, ExcerptContext { + static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalAppender { static final int HEAD_ROOM = 1 << 20; @NotNull private final SingleChronicleQueue queue; @@ -1114,12 +1119,8 @@ ScanResult moveToIndexResult(long index) { return ScanResult.NOT_REACHED; } - ScanResult scanResult = FOUND; - - index(index); - - scanResult = this.store.moveToIndexForRead(this, sequenceNumber); + ScanResult scanResult = this.store.moveToIndexForRead(this, sequenceNumber); Bytes bytes = wire().bytes(); if (scanResult == FOUND) { diff --git a/src/test/java/net/openhft/chronicle/queue/CreateAtIndexTest.java b/src/test/java/net/openhft/chronicle/queue/CreateAtIndexTest.java index faceae50f2..32452dc128 100755 --- a/src/test/java/net/openhft/chronicle/queue/CreateAtIndexTest.java +++ b/src/test/java/net/openhft/chronicle/queue/CreateAtIndexTest.java @@ -5,6 +5,7 @@ import net.openhft.chronicle.core.io.IORuntimeException; import net.openhft.chronicle.core.io.IOTools; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts.InternalAppender; import net.openhft.chronicle.wire.DocumentContext; import org.junit.Assert; import org.junit.Test; @@ -23,14 +24,14 @@ public void testWriteBytesWithIndex() throws Exception { String tmp = OS.TARGET + "/CreateAtIndexTest-" + System.nanoTime(); try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp) .rollCycle(TEST_DAILY).build()) { - ExcerptAppender appender = queue.acquireAppender(); + InternalAppender appender = (InternalAppender) queue.acquireAppender(); appender.writeBytes(0x421d00000000L, Bytes.from("hello world")); appender.writeBytes(0x421d00000001L, Bytes.from("hello world")); } // try again and fail. try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp).build()) { - ExcerptAppender appender = queue.acquireAppender(); + InternalAppender appender = (InternalAppender) queue.acquireAppender(); // try { appender.writeBytes(0x421d00000000L, Bytes.from("hello world")); @@ -43,7 +44,7 @@ public void testWriteBytesWithIndex() throws Exception { // try too far try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp).build()) { - ExcerptAppender appender = queue.acquireAppender(); + InternalAppender appender = (InternalAppender) queue.acquireAppender(); try { appender.writeBytes(0x421d00000003L, Bytes.from("hello world")); @@ -55,7 +56,7 @@ public void testWriteBytesWithIndex() throws Exception { } try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp).build()) { - ExcerptAppender appender = queue.acquireAppender(); + InternalAppender appender = (InternalAppender) queue.acquireAppender(); appender.writeBytes(0x421d00000002L, Bytes.from("hello world")); appender.writeBytes(0x421d00000003L, Bytes.from("hello world")); diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java index 7d80c66e42..dbd6c5bbe1 100755 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java @@ -27,6 +27,7 @@ import net.openhft.chronicle.core.util.StringUtils; import net.openhft.chronicle.queue.*; import net.openhft.chronicle.queue.impl.RollingChronicleQueue; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts.InternalAppender; import net.openhft.chronicle.wire.*; import org.jetbrains.annotations.NotNull; import org.junit.*; @@ -1654,8 +1655,8 @@ public void testMoveToWithAppender() throws TimeoutException, StreamCorruptedExc .wireType(this.wireType) .build()) { - ExcerptAppender sync = syncQ.acquireAppender(); + InternalAppender sync = (InternalAppender) syncQ.acquireAppender(); try (ChronicleQueue chronicle = SingleChronicleQueueBuilder.binary(getTmpDir()) .wireType(this.wireType) .build()) {