Skip to content

Commit

Permalink
Work around for misaligned volatileInt bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Jul 14, 2016
1 parent 8d83e63 commit 716aed3
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 72 deletions.
5 changes: 3 additions & 2 deletions pom.xml
Expand Up @@ -15,7 +15,8 @@
~ along with this program. If not, see <http://www.gnu.org/licenses />.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<additionalparam>-Xdoclint:none</additionalparam>
Expand Down Expand Up @@ -47,7 +48,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId>
<version>1.13.1</version>
<version>1.13.2-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
29 changes: 21 additions & 8 deletions src/main/java/net/openhft/chronicle/queue/DumpQueueMain.java
Expand Up @@ -21,35 +21,44 @@
import net.openhft.chronicle.wire.WireDumper;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;

import static java.lang.System.err;
import static java.lang.System.out;

/**
* Created by Peter on 07/03/2016.
*/
public class DumpQueueMain {
public static void main(String[] args) {
static final String FILE = System.getProperty("file");

public static void main(String[] args) throws FileNotFoundException {
dump(args[0]);
}

public static void dump(String path) {
public static void dump(String path) throws FileNotFoundException {
File path2 = new File(path);
if (path2.isDirectory()) {
File[] files = path2.listFiles();
PrintStream out = FILE == null ? System.out : new PrintStream(new File(FILE));
long upperLimit = Long.MAX_VALUE;
dump(path2, out, upperLimit);
}

public static void dump(File path, PrintStream out, long upperLimit) {
if (path.isDirectory()) {
File[] files = path.listFiles();
if (files == null)
err.println("Directory not found " + path);

for (File file : files)
dumpFile(file);
dumpFile(file, out, upperLimit);

} else {
dumpFile(path2);
dumpFile(path, out, upperLimit);
}
}

private static void dumpFile(File file) {
public static void dumpFile(File file, PrintStream out, long upperLimit) {
if (file.getName().endsWith(SingleChronicleQueue.SUFFIX)) {
try (MappedBytes bytes = MappedBytes.mappedBytes(file, 4 << 20)) {
bytes.readLimit(bytes.realCapacity());
Expand All @@ -62,6 +71,10 @@ private static void dumpFile(File file) {
out.println(sb);
if (last)
break;
if (bytes.readPosition() > upperLimit) {
out.println("# limit reached.");
return;
}
}
} catch (IOException ioe) {
err.println("Failed to read " + file + " " + ioe);
Expand Down
Expand Up @@ -59,7 +59,7 @@ public interface WireStore extends ReferenceCounted, Demarshallable, WriteMarsha
* @param position of the start of the message
* @return index in this store.
*/
long sequenceForPosition(ExcerptContext ec, long position) throws EOFException, UnrecoverableTimeoutException, StreamCorruptedException;
long sequenceForPosition(ExcerptContext ec, long position, boolean inclusive) throws EOFException, UnrecoverableTimeoutException, StreamCorruptedException;

String dump();

Expand Down
Expand Up @@ -354,7 +354,8 @@ private ScanResult linearScan(@NotNull final Wire wire,
long linearScanByPosition(@NotNull final Wire wire,
final long toPosition,
final long indexOfNext,
final long startAddress) throws EOFException {
final long startAddress,
boolean inclusive) throws EOFException {
assert toPosition >= 0;
Bytes<?> bytes = wire.bytes();

Expand All @@ -363,11 +364,21 @@ long linearScanByPosition(@NotNull final Wire wire,
while (bytes.readPosition() <= toPosition) {
WireIn.HeaderType headerType = wire.readDataHeader(true);

if (!inclusive && toPosition == bytes.readPosition())
return i;

switch (headerType) {
case NONE:
if (toPosition == Long.MAX_VALUE)
return i + 1;
throw new IllegalArgumentException("You can't know the index for an entry which hasn't been written. pos: " + toPosition);

int header = bytes.readVolatileInt(bytes.readPosition());
throw new IllegalArgumentException(
"You can't know the index for an entry which hasn't been written. " +
"start: " + startAddress +
", at: " + bytes.readPosition() +
", header: " + Integer.toHexString(header) +
", toPos: " + toPosition);
case META_DATA:
break;
case DATA:
Expand All @@ -394,7 +405,8 @@ public long nextEntryToBeIndexed() {

long sequenceForPosition(@NotNull StoreRecovery recovery,
@NotNull ExcerptContext ec,
final long position)
final long position,
boolean inclusive)
throws EOFException, StreamCorruptedException {

long timeoutMS = ((AbstractWire) ec.wire()).isInsideHeader() ? 0 : ec.timeoutMS();
Expand All @@ -403,7 +415,7 @@ long sequenceForPosition(@NotNull StoreRecovery recovery,
long indexOfNext = 0;
long lastKnownAddress = 0;
if (((Byteable) index2indexArr).bytesStore() == null)
return linearScanByPosition(ec.wireForIndex(), position, indexOfNext, lastKnownAddress);
return linearScanByPosition(ec.wireForIndex(), position, indexOfNext, lastKnownAddress, inclusive);

int used2 = Maths.toUInt31(index2indexArr.getUsed());
if (used2 == 0) {
Expand Down Expand Up @@ -446,7 +458,7 @@ long sequenceForPosition(@NotNull StoreRecovery recovery,
}
}

return linearScanByPosition(ec.wireForIndex(), position, indexOfNext, lastKnownAddress);
return linearScanByPosition(ec.wireForIndex(), position, indexOfNext, lastKnownAddress, inclusive);

}

Expand Down Expand Up @@ -521,7 +533,6 @@ private long getSecondaryAddress0(ExcerptContext ec, long timeoutMS, LongArrayVa
* @param ec the wire that used to store the data
* @param sequenceNumber the sequenceNumber that the data will be stored to
* @param position the position the data is at
* @throws EOFException
* @throws UnrecoverableTimeoutException
* @throws StreamCorruptedException
Expand Down
Expand Up @@ -53,6 +53,8 @@ public class SingleChronicleQueue implements RollingChronicleQueue {
final Supplier<Pauser> pauserSupplier;
final long timeoutMS;
@NotNull
final File path;
@NotNull
private final RollCycle rollCycle;
@NotNull
private final RollingResourcesCache dateCache;
Expand All @@ -61,8 +63,6 @@ public class SingleChronicleQueue implements RollingChronicleQueue {
private final long epoch;
private final boolean isBuffered;
@NotNull
private final File path;
@NotNull
private final WireType wireType;
private final long blockSize;
@NotNull
Expand Down
Expand Up @@ -192,20 +192,21 @@ private void resetPosition() throws UnrecoverableTimeoutException {

if (store == null || wire == null)
return;
position = store.writePosition();
position(store.writePosition());

Bytes<?> bytes = wire.bytes();
int header = bytes.readInt(position);
if (Wires.isReadyData(header))
bytes.writePosition(position + 4 + Wires.lengthOf(header));
assert position == 0 || Wires.isReadyData(header);
bytes.writePosition(position + 4 + Wires.lengthOf(header));
if (lazyIndexing) {
wire.headerNumber(Long.MIN_VALUE);
return;
}

final long headerNumber = store.sequenceForPosition(this, position);
Thread.yield();
long headerNumber2 = store.sequenceForPosition(this, position);
assert headerNumber == headerNumber2;
final long headerNumber = store.sequenceForPosition(this, position, true);
// Thread.yield();
// long headerNumber2 = store.sequenceForPosition(this, position);
// assert headerNumber == headerNumber2;
// System.err.println("==== " + Thread.currentThread().getName()+" pos: "+position+" hdr: "+headerNumber);
wire.headerNumber(queue.rollCycle().toIndex(cycle, headerNumber + 1) - 1);
assert lazyIndexing || checkIndex(wire.headerNumber(), position);
Expand Down Expand Up @@ -269,12 +270,9 @@ boolean checkWritePositionHeaderNumber() {
throw new AssertionError(message);
}*/
long seq1 = queue.rollCycle().toSequenceNumber(wire.headerNumber() + 1) - 1;
long seq2;
try {
seq2 = store.sequenceForPosition(this, pos1);
} catch (RuntimeException e) {
throw new IllegalStateException("While checking the seq of " + pos1, e);
}
long seq2 = store.sequenceForPosition(this, pos1, true);


if (seq1 != seq2) {
// System.out.println(queue.dump());
String message = "~~~~~~~~~~~~~~ " +
Expand Down Expand Up @@ -375,7 +373,9 @@ public void writeBytes(long index, BytesStore bytes) throws StreamCorruptedExcep
}

private void position(long position) {
// System.err.println("----- "+Thread.currentThread().getName()+" pos: "+position);
if (position > store.writePosition() + queue.blockSize())
throw new IllegalArgumentException("pos: " + position);
// System.err.println("----- "+Thread.currentThread().getName()+" pos: "+position);
this.position = position;
}

Expand Down Expand Up @@ -432,7 +432,7 @@ public long lastIndexAppended() {
}

try {
long sequenceNumber = store.sequenceForPosition(this, lastPosition);
long sequenceNumber = store.sequenceForPosition(this, lastPosition, true);
long index = queue.rollCycle().toIndex(lastCycle, sequenceNumber);
lastIndex(index);
return index;
Expand Down Expand Up @@ -487,26 +487,12 @@ private <T> void append(int length, WireWriter<T> wireWriter, T writer) throws U
}
}

private long headerNumber() {
if (wire.headerNumber() == Long.MIN_VALUE)
try {
long headerNumber = store.sequenceForPosition(this, position);
wire.headerNumber(queue.rollCycle().toIndex(cycle, headerNumber));
} catch (Exception e) {
Jvm.rethrow(e);
}

return wire.headerNumber();
}

private void rollCycleTo(int cycle) throws UnrecoverableTimeoutException {
if (this.cycle == cycle)
throw new AssertionError();
if (wire != null)
store.writeEOF(wire, timeoutMS());
setCycle2(cycle, true);
resetPosition();

}

private <T> void append2(int length, WireWriter<T> wireWriter, T writer) throws UnrecoverableTimeoutException, EOFException, StreamCorruptedException {
Expand Down Expand Up @@ -547,10 +533,11 @@ void writeIndexForPosition(long index, long position) throws UnrecoverableTimeou
boolean checkIndex(long index, long position) {
try {
final long seq1 = queue.rollCycle().toSequenceNumber(index + 1) - 1;
final long seq2 = store.sequenceForPosition(this, position);
final long seq2 = store.sequenceForPosition(this, position, true);

if (seq1 != seq2) {
final long seq3 = ((SingleChronicleQueueStore) store).indexing.linearScanByPosition(wireForIndex(), position, 0, 0);
final long seq3 = ((SingleChronicleQueueStore) store).indexing
.linearScanByPosition(wireForIndex(), position, 0, 0, true);
System.out.println("Thread=" + Thread.currentThread().getName() +
" pos: " + position +
" seq1: " + Long.toHexString(seq1) +
Expand Down Expand Up @@ -616,12 +603,13 @@ public void close() {
lastPosition = position;
lastCycle = cycle;

if (!metaData && lastIndex != Long.MIN_VALUE)
writeIndexForPosition(lastIndex, position);
else
assert lazyIndexing || checkIndex(lastIndex, position);
if (!metaData)
if (!metaData) {
store.writePosition(position);
if (lastIndex != Long.MIN_VALUE)
writeIndexForPosition(lastIndex, position);
else
assert lazyIndexing || checkIndex(lastIndex, position);
}

} else if (wire != null) {
isClosed = true;
Expand All @@ -645,7 +633,7 @@ public long index() throws IORuntimeException {
if (wire.headerNumber() == Long.MIN_VALUE) {
try {
long headerNumber0 = queue.rollCycle().toIndex(cycle, store
.sequenceForPosition(StoreAppender.this, position));
.sequenceForPosition(StoreAppender.this, position, false));
assert (((AbstractWire) wire).isInsideHeader());
wire.headerNumber(headerNumber0 - 1);
} catch (IOException e) {
Expand Down Expand Up @@ -888,7 +876,7 @@ private long approximateLastIndex() {
}
// give the position of the last entry and
// flag we want to count it even though we don't know if it will be meta data or not.
long sequenceNumber = store.sequenceForPosition(this, Long.MAX_VALUE);
long sequenceNumber = store.sequenceForPosition(this, Long.MAX_VALUE, false);
return rollCycle.toIndex(lastCycle, sequenceNumber);

} catch (EOFException | StreamCorruptedException | UnrecoverableTimeoutException e) {
Expand Down Expand Up @@ -935,19 +923,6 @@ public RollingChronicleQueue queue() {
return queue;
}

private <T> boolean __read(@NotNull final T t, @NotNull final BiConsumer<T, Wire> c) {
if (this.store == null) {
toStart();
if (this.store == null) return false;
}

if (read0(t, c)) {
incrementIndex();
return true;
}
return false;
}

private void incrementIndex() {
RollCycle rollCycle = queue.rollCycle();
long seq = rollCycle.toSequenceNumber(this.index);
Expand Down
Expand Up @@ -170,6 +170,7 @@ public long writePosition() {
@Override
public WireStore writePosition(long position) {

assert writePosition.getVolatileValue() + mappedFile.chunkSize() > position;
int header = mappedBytes.readVolatileInt(position);
if (Wires.isReadyData(header))
writePosition.setMaxValue(position);
Expand Down Expand Up @@ -236,9 +237,9 @@ public MappedBytes bytes() {
}

@Override
public long sequenceForPosition(final ExcerptContext ec, final long position) throws
public long sequenceForPosition(final ExcerptContext ec, final long position, boolean inclusive) throws
EOFException, UnrecoverableTimeoutException, StreamCorruptedException {
return indexing.sequenceForPosition(recovery, ec, position);
return indexing.sequenceForPosition(recovery, ec, position, inclusive);
}

@Override
Expand Down

0 comments on commit 716aed3

Please sign in to comment.