Skip to content

Commit

Permalink
add interface internalAdaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Aug 21, 2016
1 parent 90764fd commit 94d546d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 14 deletions.
Expand Up @@ -16,7 +16,6 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.MarshallableOut;
import net.openhft.chronicle.wire.UnrecoverableTimeoutException;
Expand Down Expand Up @@ -44,10 +43,10 @@ public interface ExcerptAppender extends ExcerptCommon<ExcerptAppender>, Marshal
* @throws StreamCorruptedException the write failed is was unable to write the data at the
* given index.
*/
default void writeBytes(long index, BytesStore bytes) throws StreamCorruptedException {
/* default void writeBytes(long index, BytesStore bytes) throws StreamCorruptedException {
throw new UnsupportedOperationException();
}

*/
/**
* Write an entry at a given index. This can use used for rebuilding a queue, or replication.
*
Expand Down
Expand Up @@ -60,10 +60,15 @@ interface WireWriter<T> {
//
// *************************************************************************


public interface InternalAppender {
void writeBytes(long index, BytesStore bytes) throws StreamCorruptedException;
}

/**
* // StoreAppender
*/
static class StoreAppender implements ExcerptAppender, ExcerptContext {
static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalAppender {
static final int HEAD_ROOM = 1 << 20;
@NotNull
private final SingleChronicleQueue queue;
Expand Down Expand Up @@ -1114,12 +1119,8 @@ ScanResult moveToIndexResult(long index) {
return ScanResult.NOT_REACHED;
}

ScanResult scanResult = FOUND;


index(index);

scanResult = this.store.moveToIndexForRead(this, sequenceNumber);
ScanResult scanResult = this.store.moveToIndexForRead(this, sequenceNumber);

Bytes<?> bytes = wire().bytes();
if (scanResult == FOUND) {
Expand Down
Expand Up @@ -5,6 +5,7 @@
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts.InternalAppender;
import net.openhft.chronicle.wire.DocumentContext;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -23,14 +24,14 @@ public void testWriteBytesWithIndex() throws Exception {
String tmp = OS.TARGET + "/CreateAtIndexTest-" + System.nanoTime();
try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp)
.rollCycle(TEST_DAILY).build()) {
ExcerptAppender appender = queue.acquireAppender();
InternalAppender appender = (InternalAppender) queue.acquireAppender();

appender.writeBytes(0x421d00000000L, Bytes.from("hello world"));
appender.writeBytes(0x421d00000001L, Bytes.from("hello world"));
}
// try again and fail.
try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp).build()) {
ExcerptAppender appender = queue.acquireAppender();
InternalAppender appender = (InternalAppender) queue.acquireAppender();

// try {
appender.writeBytes(0x421d00000000L, Bytes.from("hello world"));
Expand All @@ -43,7 +44,7 @@ public void testWriteBytesWithIndex() throws Exception {

// try too far
try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp).build()) {
ExcerptAppender appender = queue.acquireAppender();
InternalAppender appender = (InternalAppender) queue.acquireAppender();

try {
appender.writeBytes(0x421d00000003L, Bytes.from("hello world"));
Expand All @@ -55,7 +56,7 @@ public void testWriteBytesWithIndex() throws Exception {
}

try (SingleChronicleQueue queue = ChronicleQueueBuilder.single(tmp).build()) {
ExcerptAppender appender = queue.acquireAppender();
InternalAppender appender = (InternalAppender) queue.acquireAppender();

appender.writeBytes(0x421d00000002L, Bytes.from("hello world"));
appender.writeBytes(0x421d00000003L, Bytes.from("hello world"));
Expand Down
Expand Up @@ -27,6 +27,7 @@
import net.openhft.chronicle.core.util.StringUtils;
import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts.InternalAppender;
import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull;
import org.junit.*;
Expand Down Expand Up @@ -1654,8 +1655,8 @@ public void testMoveToWithAppender() throws TimeoutException, StreamCorruptedExc
.wireType(this.wireType)
.build()) {

ExcerptAppender sync = syncQ.acquireAppender();

InternalAppender sync = (InternalAppender) syncQ.acquireAppender();
try (ChronicleQueue chronicle = SingleChronicleQueueBuilder.binary(getTmpDir())
.wireType(this.wireType)
.build()) {
Expand Down

0 comments on commit 94d546d

Please sign in to comment.