Skip to content

Commit

Permalink
added ValueIn.skipValue() and ExcerptAppender.writingDocument(long i…
Browse files Browse the repository at this point in the history
…ndex)
  • Loading branch information
peter-lawrey committed May 30, 2016
1 parent e9cc18f commit 30230b0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 30 deletions.
12 changes: 12 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java
Expand Up @@ -16,6 +16,7 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.MarshallableOut;
import net.openhft.chronicle.wire.UnrecoverableTimeoutException;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -45,6 +46,17 @@ default void writeBytes(long index, Bytes<?> bytes) throws StreamCorruptedExcept
throw new UnsupportedOperationException();
}

/**
* Write an entry at a given index. This can use used for rebuilding a queue, or replication.
*
* @param index to write the byte to or fail.
* @return DocumentContext to write to.
* @throws StreamCorruptedException the write failed is was unable to write the data at the given index.
*/
default DocumentContext writeDocument(int index) {
throw new UnsupportedOperationException();
}

/**
* @return the index last written, this index includes the cycle and the sequence number
* @throws IllegalStateException if no index is available
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/net/openhft/chronicle/queue/impl/WireStore.java
Expand Up @@ -18,7 +18,10 @@
import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.core.ReferenceCounted;
import net.openhft.chronicle.queue.impl.single.ScanResult;
import net.openhft.chronicle.wire.*;
import net.openhft.chronicle.wire.Demarshallable;
import net.openhft.chronicle.wire.UnrecoverableTimeoutException;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;

import java.io.EOFException;
Expand Down Expand Up @@ -62,5 +65,5 @@ public interface WireStore extends ReferenceCounted, Demarshallable, WriteMarsha

long writeHeader(Wire wire, int length, long timeoutMS) throws EOFException, UnrecoverableTimeoutException;

void writeEOF(AbstractWire wire, long timeoutMS) throws UnrecoverableTimeoutException;
void writeEOF(Wire wire, long timeoutMS) throws UnrecoverableTimeoutException;
}
Expand Up @@ -61,7 +61,8 @@ public static class StoreAppender implements ExcerptAppender {
private final int indexSpacingMask;
private int cycle = Integer.MIN_VALUE;
private WireStore store;
private AbstractWire wire;
private Wire wire;
private Wire bufferWire;
private long position = -1;
private volatile Thread appendingThread = null;
private long lastIndex = Long.MIN_VALUE;
Expand Down Expand Up @@ -114,7 +115,8 @@ private void setCycle2(int cycle, boolean createIfAbsent) {
}
this.cycle = cycle;

wire = (AbstractWire) queue.wireType().apply(store.bytes());
wire = queue.wireType().apply(store.bytes());

// only set the cycle after the wire is set.
this.cycle = cycle;

Expand Down Expand Up @@ -167,7 +169,9 @@ public DocumentContext writingDocument() throws UnrecoverableTimeoutException {
position = store.writeHeader(wire, Wires.UNKNOWN_LENGTH, queue.timeoutMS);
lastIndex = wire.headerNumber();
context.metaData = false;
context.wire = wire;
break;

} catch (EOFException theySeeMeRolling) {
// retry.
}
Expand All @@ -180,6 +184,13 @@ public DocumentContext writingDocument() throws UnrecoverableTimeoutException {
return context;
}

@Override
public DocumentContext writeDocument(int index) {
context.wire = acquireBufferWire();
context.wire.headerNumber(index);
return context;
}

@Override
public int sourceId() {
return queue.sourceId;
Expand All @@ -191,25 +202,22 @@ public void writeBytes(@NotNull Bytes bytes) throws UnrecoverableTimeoutExceptio
append(Maths.toUInt31(bytes.readRemaining()), (m, w) -> w.bytes().write(m), bytes);
}

Wire acquireBufferWire() {
if (bufferWire == null) {
bufferWire = queue.wireType().apply(Bytes.elasticByteBuffer());
} else {
bufferWire.clear();
}
return bufferWire;
}

@Override
public void writeBytes(long index, Bytes<?> bytes) throws StreamCorruptedException {
if (bytes.isEmpty())
throw new UnsupportedOperationException("Cannot append a zero length message");
assert checkAppendingThread();
try {
if (index != lastIndex + 1) {
int cycle = queue.rollCycle().toCycle(index);

ScanResult scanResult = moveToIndex(cycle, queue.rollCycle().toSequenceNumber(index));
switch (scanResult) {
case FOUND:
throw new IllegalStateException("Unable to move to index " + Long.toHexString(index) + " as the index already exists");
case NOT_REACHED:
throw new IllegalStateException("Unable to move to index " + Long.toHexString(index) + " beyond the end of the queue");
case NOT_FOUND:
break;
}
}
moveToIndexForWrite(index);

// only get the bytes after moveToIndex
Bytes<?> wireBytes = wire.bytes();
Expand Down Expand Up @@ -238,6 +246,22 @@ public void writeBytes(long index, Bytes<?> bytes) throws StreamCorruptedExcepti
}
}

void moveToIndexForWrite(long index) throws EOFException {
if (index != lastIndex + 1) {
int cycle = queue.rollCycle().toCycle(index);

ScanResult scanResult = moveToIndex(cycle, queue.rollCycle().toSequenceNumber(index));
switch (scanResult) {
case FOUND:
throw new IllegalStateException("Unable to move to index " + Long.toHexString(index) + " as the index already exists");
case NOT_REACHED:
throw new IllegalStateException("Unable to move to index " + Long.toHexString(index) + " beyond the end of the queue");
case NOT_FOUND:
break;
}
}
}

ScanResult moveToIndex(int cycle, long sequenceNumber) throws UnrecoverableTimeoutException, EOFException {
if (LOG.isDebugEnabled()) {
LOG.debug("moveToIndex: " + Long.toHexString(cycle) + " " + Long.toHexString(sequenceNumber));
Expand Down Expand Up @@ -366,7 +390,7 @@ void writeIndexForPosition(long index, long position, long timeoutMS) throws Unr
assert lazyIndexing || checkIndex(index, position, timeoutMS);

if (!lazyIndexing && (index & indexSpacingMask) == 0) {
store.setPositionForIndex(context.wire(),
store.setPositionForIndex(wire,
queue.rollCycle().toSequenceNumber(index),
position, timeoutMS);
}
Expand All @@ -386,6 +410,7 @@ boolean checkIndex(long index, long position, long timeoutMS) {
class StoreAppenderContext implements DocumentContext {

private boolean metaData = false;
private Wire wire;

@Override
public int sourceId() {
Expand Down Expand Up @@ -415,17 +440,20 @@ public void metaData(boolean metaData) {
@Override
public void close() {
try {
final long timeoutMS = queue.timeoutMS;
wire.updateHeader(position, metaData);
long index = wire.headerNumber() - 1;
long position2 = wire.bytes().writePosition();
store.writePosition(position2);
if (!metaData)
writeIndexForPosition(index, position, timeoutMS);

} catch (StreamCorruptedException e) {
throw new IllegalStateException(e);
} catch (UnrecoverableTimeoutException e) {
if (wire == StoreAppender.this.wire) {
final long timeoutMS = queue.timeoutMS;
wire.updateHeader(position, metaData);
long index = wire.headerNumber() - 1;
long position2 = wire.bytes().writePosition();
store.writePosition(position2);
if (!metaData)
writeIndexForPosition(index, position, timeoutMS);

} else {
writeBytes(wire.headerNumber(), wire.bytes());
}

} catch (StreamCorruptedException | UnrecoverableTimeoutException e) {
throw new IllegalStateException(e);
} finally {
assert resetAppendingThread();
Expand Down
Expand Up @@ -310,7 +310,7 @@ public long writeHeader(Wire wire, int length, long timeoutMS) throws EOFExcepti
}

@Override
public void writeEOF(AbstractWire wire, long timeoutMS) throws UnrecoverableTimeoutException {
public void writeEOF(Wire wire, long timeoutMS) throws UnrecoverableTimeoutException {
try {
wire.writeEndOfWire(timeoutMS, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Expand Down

0 comments on commit 30230b0

Please sign in to comment.