Skip to content

Commit

Permalink
It the mappedFile that should be the property of the ChronicleStore, …
Browse files Browse the repository at this point in the history
…not the mappedBytes, the mapped bytes should be the property of an appender, tailer on indexer, and each instance of these should have a separate instance of mapped bytes
  • Loading branch information
Rob Austin committed Jan 12, 2016
1 parent dff8fa2 commit 00c376f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
Expand Up @@ -48,7 +48,7 @@ public interface WireStore extends ReferenceCounted, Marshallable {
/** /**
* @return the first readable position * @return the first readable position
*/ */
long readPosition(); // long readPosition();


/** /**
* @return the first writable position * @return the first writable position
Expand Down Expand Up @@ -85,4 +85,5 @@ void install(
throws IOException; throws IOException;


MappedBytes mappedBytes(); MappedBytes mappedBytes();

} }
Expand Up @@ -55,7 +55,7 @@ public class SingleChronicleQueueStore implements WireStore {
private final WireType wireType; private final WireType wireType;
private final Roll roll; private final Roll roll;
Bounds bounds = new Bounds(); Bounds bounds = new Bounds();
private MappedBytes mappedBytes; private MappedFile mappedFile;
private Closeable resourceCleaner; private Closeable resourceCleaner;
private final ReferenceCounter refCount = ReferenceCounter.onReleased(this::performRelease); private final ReferenceCounter refCount = ReferenceCounter.onReleased(this::performRelease);
private long appendTimeout = 1_000; private long appendTimeout = 1_000;
Expand Down Expand Up @@ -87,14 +87,10 @@ public class SingleChronicleQueueStore implements WireStore {
this.resourceCleaner = null; this.resourceCleaner = null;
this.builder = null; this.builder = null;
this.wireType = wireType; this.wireType = wireType;
this.mappedBytes = mappedBytes; this.mappedFile = mappedBytes.mappedFile();
this.indexing = new Indexing(wireType, mappedBytes); this.indexing = new Indexing(wireType, mappedBytes);
} }


@Override
public long readPosition() {
return this.bounds.getReadPosition();
}


@Override @Override
public long writePosition() { public long writePosition() {
Expand Down Expand Up @@ -198,7 +194,7 @@ public void install(
long length, long length,
boolean created, boolean created,
long cycle, long cycle,
ChronicleQueueBuilder builder, @NotNull ChronicleQueueBuilder builder,
@NotNull Function<Bytes, Wire> wireSupplier, @NotNull Function<Bytes, Wire> wireSupplier,
@Nullable Closeable closeable) throws IOException { @Nullable Closeable closeable) throws IOException {


Expand All @@ -211,11 +207,17 @@ public void install(
} }
} }



/**
* @return creates a new instance of mapped bytes, because, for example the tailer and appender
* can be at diffent locations.
*/
@Override @Override
public MappedBytes mappedBytes() { public MappedBytes mappedBytes() {
return new MappedBytes(mappedBytes.mappedFile()); return new MappedBytes(mappedFile);//.withSizes(this.chunkSize, this.overlapSize);
} }



// ************************************************************************* // *************************************************************************
// Utilities // Utilities
// ************************************************************************* // *************************************************************************
Expand Down Expand Up @@ -296,7 +298,7 @@ private <T> long read(
final StringBuilder sb = Wires.acquireStringBuilder(); final StringBuilder sb = Wires.acquireStringBuilder();


// todo improve this line // todo improve this line
final ValueIn vi = wireType.apply(context).read(sb); final ValueIn vi = wire.read(sb);


if ("index".contentEquals(sb)) { if ("index".contentEquals(sb)) {
return read(wire, reader, marshaller); return read(wire, reader, marshaller);
Expand Down Expand Up @@ -457,7 +459,7 @@ <T> long write(


@Override @Override
public void writeMarshallable(@NotNull WireOut wire) { public void writeMarshallable(@NotNull WireOut wire) {
MappedFile mappedFile = mappedBytes.mappedFile();
wire.write(MetaDataField.bounds).marshallable(this.bounds) wire.write(MetaDataField.bounds).marshallable(this.bounds)
.write(MetaDataField.roll).object(this.roll) .write(MetaDataField.roll).object(this.roll)
.write(MetaDataField.chunkSize).int64(mappedFile.chunkSize()) .write(MetaDataField.chunkSize).int64(mappedFile.chunkSize())
Expand All @@ -475,16 +477,16 @@ public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException {
long chunkSize = wire.read(MetaDataField.chunkSize).int64(); long chunkSize = wire.read(MetaDataField.chunkSize).int64();
long overlapSize = wire.read(MetaDataField.overlapSize).int64(); long overlapSize = wire.read(MetaDataField.overlapSize).int64();


this.mappedBytes = (MappedBytes) wire.bytes(); final MappedBytes mappedBytes = (MappedBytes) (wire.bytes());

this.mappedFile = mappedBytes.mappedFile();
final MappedBytes mappedBytes = new MappedBytes(this.mappedBytes.mappedFile()); indexing = new Indexing(wireType, mappedBytes.withSizes(chunkSize,
indexing = new Indexing(wireType, mappedBytes.withSizes(chunkSize, overlapSize)); overlapSize));
wire.read(MetaDataField.indexing).marshallable(indexing); wire.read(MetaDataField.indexing).marshallable(indexing);
} }


// ************************************************************************* // *************************************************************************
// Marshallable // Marshallable
// ************************************************************************* // *************************************************************************


enum MetaDataField implements WireKey { enum MetaDataField implements WireKey {
bounds, bounds,
Expand Down

0 comments on commit 00c376f

Please sign in to comment.