Skip to content

Commit

Permalink
Improve the interface for MappedBytes for use in Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Dec 23, 2015
1 parent 63cde18 commit 20cd0e0
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 35 deletions.
2 changes: 1 addition & 1 deletion chronicle-queue/pom.xml
Expand Up @@ -49,7 +49,7 @@
<dependency> <dependency>
<groupId>net.openhft</groupId> <groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId> <artifactId>chronicle-bom</artifactId>
<version>1.10.19</version> <version>1.10.28-SNAPSHOT</version>
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
Expand Down
2 changes: 1 addition & 1 deletion chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/Excerpts.java 100644 → 100755
Expand Up @@ -167,7 +167,7 @@ public StoreAppender(@NotNull AbstractChronicleQueue queue) throws IOException {




final MappedFile mappedFile = store.mappedFile(); final MappedFile mappedFile = store.mappedFile();
System.out.println("appender file=" + mappedFile.file().getAbsolutePath().toString()); System.out.println("appender file=" + mappedFile.file().getAbsolutePath());
this.writeContext = new MappedBytes(mappedFile); this.writeContext = new MappedBytes(mappedFile);
} }


Expand Down
Expand Up @@ -130,8 +130,8 @@ protected long lastCycle() {


if (files != null && files.length > 0) { if (files != null && files.length > 0) {
long lastDate = Long.MIN_VALUE; long lastDate = Long.MIN_VALUE;
long date = -1; long date;
String name = null; String name;


for (int i = files.length - 1; i >= 0; i--) { for (int i = files.length - 1; i >= 0; i--) {
try { try {
Expand Down
Expand Up @@ -60,7 +60,9 @@ enum MetaDataField implements WireKey {
bounds, bounds,
indexing, indexing,
roll, roll,
mappedFile mappedFile,
chunkSize,
overlapSize
} }




Expand Down Expand Up @@ -90,18 +92,13 @@ enum MetaDataField implements WireKey {
this.builder = null; this.builder = null;
this.wireType = wireType; this.wireType = wireType;
this.mappedFile = mappedFile; this.mappedFile = mappedFile;
init(); indexing = new Indexing(wireType, new MappedBytes(mappedFile));
} }


void mappedFile(@NotNull MappedFile mappedFile) { void mappedFile(@NotNull MappedFile mappedFile) {
this.mappedFile = mappedFile; this.mappedFile = mappedFile;
} }


private void init() {
indexing = new Indexing(wireType, mappedFile);
}


@Override @Override
public long readPosition() { public long readPosition() {
return this.bounds.getReadPosition(); return this.bounds.getReadPosition();
Expand Down Expand Up @@ -438,27 +435,24 @@ private <T> long write(
public void writeMarshallable(@NotNull WireOut wire) { public void writeMarshallable(@NotNull WireOut wire) {
; ;
wire.write(MetaDataField.bounds).marshallable(this.bounds) wire.write(MetaDataField.bounds).marshallable(this.bounds)
.write(MetaDataField.indexing).object(this.indexing)
.write(MetaDataField.roll).object(this.roll) .write(MetaDataField.roll).object(this.roll)
.write(MetaDataField.mappedFile).object(new MarshallableMappedFile(this .write(MetaDataField.chunkSize).int64(this.mappedFile.chunkSize())
.mappedFile)); .write(MetaDataField.overlapSize).int64(this.mappedFile.overlapSize())
.write(MetaDataField.indexing).object(this.indexing);
} }




@Override @Override
public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException { public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException {
wire.read(MetaDataField.bounds).marshallable(this.bounds) wire.read(MetaDataField.bounds).marshallable(this.bounds)
.read(MetaDataField.indexing).object(Indexing.class, this, (t, v) -> t.indexing = v) .read(MetaDataField.roll).marshallable(this.roll);
.read(MetaDataField.roll).marshallable(this.roll) long chunkSize = wire.read(MetaDataField.chunkSize).int64();
.read(MetaDataField.mappedFile).object(MarshallableMappedFile.class, long overlapSize = wire.read(MetaDataField.overlapSize).int64();
this, (t, v) -> {
t.mappedFile = v.mf();
t.init();
});

init();



MappedBytes mappedBytes = ((MappedBytes) wire.bytes()).withSizes(chunkSize, overlapSize);
this.mappedFile = mappedBytes.mappedFile();
indexing = new Indexing(wireType, mappedBytes);
wire.read(MetaDataField.indexing).marshallable(indexing);
} }




Expand Down Expand Up @@ -539,18 +533,16 @@ enum IndexingFields implements WireKey {
class Indexing implements Marshallable { class Indexing implements Marshallable {
private final WireType wireType; private final WireType wireType;
private final MappedBytes indexContext; private final MappedBytes indexContext;
private int indexCount; private int indexCount = 128 << 10;
private int indexSpacing; private int indexSpacing = 64;
private LongValue index2Index; private LongValue index2Index;
private LongValue lastIndex; private LongValue lastIndex;
private final Wire templateIndex; private final Wire templateIndex;
private ThreadLocal<LongArrayValues> longArray; private ThreadLocal<LongArrayValues> longArray;


Indexing(@NotNull WireType wireType, final MappedFile mappedFile) { Indexing(@NotNull WireType wireType, final MappedBytes mappedBytes) {
this.indexCount = 128 << 10; this.index2Index = wireType.newLongReference().get();
this.indexSpacing = 64; this.lastIndex = wireType.newLongReference().get();
this.index2Index = null;
this.lastIndex = null;


final Bytes b = Bytes.elasticByteBuffer(); final Bytes b = Bytes.elasticByteBuffer();


Expand All @@ -560,15 +552,15 @@ class Indexing implements Marshallable {


this.wireType = wireType; this.wireType = wireType;
this.longArray = withInitial(wireType.newLongArrayReference()); this.longArray = withInitial(wireType.newLongArrayReference());
this.indexContext = new MappedBytes(mappedFile); this.indexContext = mappedBytes;
} }


@Override @Override
public void writeMarshallable(@NotNull WireOut wire) { public void writeMarshallable(@NotNull WireOut wire) {
wire.write(IndexingFields.indexCount).int32(indexCount) wire.write(IndexingFields.indexCount).int32(indexCount)
.write(IndexingFields.indexSpacing).int32(indexSpacing) .write(IndexingFields.indexSpacing).int32(indexSpacing)
.write(IndexingFields.index2Index).int64forBinding(0L, index2Index = wire.newLongReference()) .write(IndexingFields.index2Index).int64forBinding(0L, index2Index)
.write(IndexingFields.lastIndex).int64forBinding(-1L, lastIndex = wire.newLongReference()); .write(IndexingFields.lastIndex).int64forBinding(-1L, lastIndex);
} }


@Override @Override
Expand Down Expand Up @@ -819,7 +811,7 @@ private void readPosition(MappedBytes context, long position) throws IOException
/** /**
* moves the context to the index of {@code toIndex} by doing a linear scans form a {@code * moves the context to the index of {@code toIndex} by doing a linear scans form a {@code
* fromKnownIndex} at {@code knownAddress} * fromKnownIndex} at {@code knownAddress}
* * <p/>
* note meta data is skipped and does not count to the indexes * note meta data is skipped and does not count to the indexes
* *
* @param context if successful, moves the context to an address relating to the * @param context if successful, moves the context to an address relating to the
Expand Down

0 comments on commit 20cd0e0

Please sign in to comment.