Skip to content

Commit

Permalink
QUEUE-33 : race on roll
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Feb 3, 2016
1 parent 845519f commit 89fe68f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 106 deletions.
172 changes: 68 additions & 104 deletions src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java
Expand Up @@ -44,7 +44,6 @@


import static net.openhft.chronicle.queue.ChronicleQueue.toCycle; import static net.openhft.chronicle.queue.ChronicleQueue.toCycle;
import static net.openhft.chronicle.queue.ChronicleQueue.toSequenceNumber; import static net.openhft.chronicle.queue.ChronicleQueue.toSequenceNumber;
import static net.openhft.chronicle.wire.Wires.toIntU30;


public class Excerpts { public class Excerpts {


Expand All @@ -64,7 +63,8 @@ long writeOrAdvanceIfNotEmpty(




private static final Logger LOG = LoggerFactory.getLogger(Excerpts.class); private static final Logger LOG = LoggerFactory.getLogger(Excerpts.class);
private static final int ROLL_KEY = BytesUtil.asInt("roll"); private static final String ROLL_STRING = "roll";
private static final int ROLL_KEY = BytesUtil.asInt(ROLL_STRING);
private static final int SPB_HEADER_SIZE = 4; private static final int SPB_HEADER_SIZE = 4;




Expand Down Expand Up @@ -107,12 +107,12 @@ public StoreAppender(@NotNull AbstractChronicleQueue queue) {


@Override @Override
public long writeDocument(@NotNull WriteMarshallable writer) { public long writeDocument(@NotNull WriteMarshallable writer) {
return internalWriteBytes(WireInternal::writeWireOrAdvanceIfNotEmpty, writer); return append(WireInternal::writeWireOrAdvanceIfNotEmpty, writer);
} }


@Override @Override
public long writeBytes(@NotNull Bytes bytes) { public long writeBytes(@NotNull Bytes bytes) {
return internalWriteBytes(WireInternal::writeWireOrAdvanceIfNotEmpty, bytes); return append(WireInternal::writeWireOrAdvanceIfNotEmpty, bytes);
} }


@Override @Override
Expand All @@ -134,38 +134,19 @@ public long cycle() {
return this.store.cycle(); return this.store.cycle();
} }


public boolean consumeBytes(BytesConsumer consumer) throws InterruptedException { @Override
@NotNull final Bytes<?> bytes = wire.bytes(); public void prefetch() {
final long start = bytes.writePosition(); long position = wire.bytes().writePosition();

if (position < nextPrefetch)
bytes.writeInt(Wires.NOT_READY); return;

long prefetch = OS.mapAlign(position);
if (!consumer.accept(bytes)) {
bytes.writeSkip(-4);
bytes.writeInt(bytes.writePosition(), 0);
return false;
}

final long len = bytes.writePosition() - start - 4;

// no data was read from the ring buffer, so we wont write any document
// to the appender
if (len == 0) {
bytes.writeSkip(-4);
bytes.writeInt(bytes.writePosition(), 0);
return false;
}

bytes.writeInt(start, toIntU30(len, "Document length %,d " +
"out of 30-bit int range."));

store().writePosition(bytes.writePosition())
.storeIndexLocation(wire, start, ++index);


return true; // touch the page without modifying it.
wire.bytes().compareAndSwapInt(prefetch, ~0, ~0);
nextPrefetch = prefetch + OS.pageSize();
} }


private <T> long internalWriteBytes(@NotNull WireWriter<T> wireWriter, @NotNull T writer) { private <T> long append(@NotNull WireWriter<T> wireWriter, @NotNull T writer) {
WireStore store = store(); WireStore store = store();
Bytes<?> bytes = wire.bytes(); Bytes<?> bytes = wire.bytes();


Expand Down Expand Up @@ -225,18 +206,6 @@ private WireStore store() {


return store; return store;
} }

@Override
public void prefetch() {
long position = wire.bytes().writePosition();
if (position < nextPrefetch)
return;
long prefetch = OS.mapAlign(position);

// touch the page without modifying it.
wire.bytes().compareAndSwapInt(prefetch, ~0, ~0);
nextPrefetch = prefetch + OS.pageSize();
}
} }


// ************************************************************************* // *************************************************************************
Expand Down Expand Up @@ -303,49 +272,6 @@ private <T> boolean readAtIndex(@NotNull final T t, @NotNull final BiConsumer<T,
return false; return false;
} }


private <T> boolean readAt(@NotNull final T t, @NotNull final BiConsumer<T, Wire> c) {

long roll;
for (; ; ) {
roll = Long.MIN_VALUE;
wire.bytes().readLimit(wire.bytes().capacity());
while (wire.bytes().readVolatileInt(wire.bytes().readPosition()) != 0) {

try (@NotNull final DocumentContext documentContext = wire.readingDocument()) {

if (!documentContext.isPresent())
return false;

if (documentContext.isData()) {
c.accept(t, wire);
return true;
}

// In case of meta data, if we are found the "roll" meta, we returns
// the next cycle (negative)
final StringBuilder sb = Wires.acquireStringBuilder();

@NotNull
final ValueIn vi = wire.readEventName(sb);
if ("roll".contentEquals(sb)) {
roll = vi.int32();
break;
}
}
}

// we got to the end of the file and there is no roll information
if (roll == Long.MIN_VALUE)
return false;

// roll to the next file
cycle(roll);
if (store == null)
return false;
}

}

/** /**
* @return provides an index that includes the cycle number * @return provides an index that includes the cycle number
*/ */
Expand All @@ -359,16 +285,16 @@ public long index() {


@Override @Override
public boolean moveToIndex(final long index) { public boolean moveToIndex(final long index) {

if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(SingleChronicleQueueStore.IndexOffset.toBinaryString(index)); LOG.debug(SingleChronicleQueueStore.IndexOffset.toBinaryString(index));
LOG.debug(SingleChronicleQueueStore.IndexOffset.toScale()); LOG.debug(SingleChronicleQueueStore.IndexOffset.toScale());
} }


final long expectedCycle = toCycle(index); final long expectedCycle = toCycle(index);
if (expectedCycle != cycle) if (expectedCycle != cycle) {
// moves to the expected cycle // moves to the expected cycle
cycle(expectedCycle); cycle(expectedCycle);
}


cycle = expectedCycle; cycle = expectedCycle;


Expand All @@ -382,7 +308,7 @@ public boolean moveToIndex(final long index) {
return true; return true;
} }


final long position = this.store.moveToIndex(wire, ChronicleQueue.toSequenceNumber(index)); final long position = this.store.moveToIndex(wire, sequenceNumber);
if (position == -1) if (position == -1)
return false; return false;


Expand Down Expand Up @@ -417,6 +343,57 @@ public ExcerptTailer toEnd() throws IOException {
return this; return this;
} }


@Override
public void prefetch() {
long position = wire.bytes().readPosition();
if (position < nextPrefetch)
return;
long prefetch = OS.mapAlign(position) + OS.pageSize();
// touch the page without modifying it.
wire.bytes().compareAndSwapInt(prefetch, ~0, ~0);
nextPrefetch = prefetch + OS.pageSize();
}

private <T> boolean readAt(@NotNull final T t, @NotNull final BiConsumer<T, Wire> c) {
long roll;
for (; ; ) {
roll = Long.MIN_VALUE;
wire.bytes().readLimit(wire.bytes().capacity());
while (wire.bytes().readVolatileInt(wire.bytes().readPosition()) != 0) {

try (@NotNull final DocumentContext documentContext = wire.readingDocument()) {

if (!documentContext.isPresent())
return false;

if (documentContext.isData()) {
c.accept(t, wire);
return true;
}

// In case of meta data, if we are found the "roll" meta, we returns
// the next cycle (negative)
final StringBuilder sb = Wires.acquireStringBuilder();

@NotNull
final ValueIn vi = wire.readEventName(sb);
if (ROLL_STRING.contentEquals(sb)) {
roll = vi.int32();
break;
}
}
}

// we got to the end of the file and there is no roll information
if (roll == Long.MIN_VALUE)
return false;

// roll to the next file
cycle(roll);
if (store == null)
return false;
}
}


@NotNull @NotNull
private StoreTailer cycle(final long cycle) { private StoreTailer cycle(final long cycle) {
Expand All @@ -433,19 +410,6 @@ private StoreTailer cycle(final long cycle) {
} }
return this; return this;
} }


@Override
public void prefetch() {
long position = wire.bytes().readPosition();
if (position < nextPrefetch)
return;
long prefetch = OS.mapAlign(position) + OS.pageSize();
// touch the page without modifying it.
wire.bytes().compareAndSwapInt(prefetch, ~0, ~0);
nextPrefetch = prefetch + OS.pageSize();
}

} }
} }


Expand Up @@ -571,7 +571,7 @@ public long moveToIndex(@NotNull final Wire wire, final long index) {
final LongArrayValues array = this.longArray.get(); final LongArrayValues array = this.longArray.get();
@NotNull final Bytes<?> bytes = wire.bytes(); @NotNull final Bytes<?> bytes = wire.bytes();
final long indexToIndex0 = indexToIndex(wire); final long indexToIndex0 = indexToIndex(wire);
final long readPostion = bytes.readPosition(); final long readPosition = bytes.readPosition();
bytes.readLimit(bytes.capacity()).readPosition(indexToIndex0); bytes.readLimit(bytes.capacity()).readPosition(indexToIndex0);
long startIndex = ((index / 64L)) * 64L; long startIndex = ((index / 64L)) * 64L;


Expand Down Expand Up @@ -632,7 +632,7 @@ public long moveToIndex(@NotNull final Wire wire, final long index) {
} while (primaryOffset >= 0); } while (primaryOffset >= 0);
} }


bytes.readPosition(readPostion); bytes.readPosition(readPosition);
return -1; return -1;
} }


Expand Down

0 comments on commit 89fe68f

Please sign in to comment.