Skip to content

Commit

Permalink
Cleanup Read/Write Context
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Oct 30, 2015
1 parent eb69f0c commit 58b16e8
Showing 1 changed file with 46 additions and 68 deletions.
Expand Up @@ -138,41 +138,19 @@ public long lastIndex() {
return this.indexing.getLastIndex(); return this.indexing.getLastIndex();
} }


@Override
public boolean appendRollMeta(@NotNull WriteContext context, long cycle) throws IOException {
if(roll.casNextRollCycle(cycle)) {
final WriteMarshallable marshallable = x -> x.write(MetaDataField.roll).int32(cycle);

writeWithLock(
context,
Wires.UNKNOWN_LENGTH,
(WriteContext ctx, long position, int size, WriteMarshallable w) -> {
Wires.writeMeta(context.wire, w);
roll.setNextCycleMetaPosition(position);
return WireConstants.NO_INDEX;
},
marshallable
);

return true;
}

return false;
}

@Override @Override
public long append(@NotNull WriteContext context, @NotNull final WriteMarshallable marshallable) throws IOException { public long append(@NotNull WriteContext context, @NotNull final WriteMarshallable marshallable) throws IOException {
return writeWithLock(context, Wires.UNKNOWN_LENGTH, this::writeWireMarshallable, marshallable); return write(context, Wires.UNKNOWN_LENGTH, this::writeWireMarshallable, marshallable);
} }


@Override @Override
public long append(@NotNull WriteContext context, @NotNull final WriteBytesMarshallable marshallable) throws IOException { public long append(@NotNull WriteContext context, @NotNull final WriteBytesMarshallable marshallable) throws IOException {
return writeWithLock(context, Wires.UNKNOWN_LENGTH, this::writeBytesMarshallable, marshallable); return write(context, Wires.UNKNOWN_LENGTH, this::writeBytesMarshallable, marshallable);
} }


@Override @Override
public long append(@NotNull WriteContext context, @NotNull final Bytes bytes) throws IOException { public long append(@NotNull WriteContext context, @NotNull final Bytes bytes) throws IOException {
return writeWithLock(context, toIntU30(bytes.length()), this::writeBytes, bytes); return write(context, toIntU30(bytes.length()), this::writeBytes, bytes);
} }


@Override @Override
Expand All @@ -185,6 +163,28 @@ public long read(@NotNull ReadContext context, @NotNull ReadBytesMarshallable re
return read(context, this::readBytesMarshallable, reader); return read(context, this::readBytesMarshallable, reader);
} }


@Override
public boolean appendRollMeta(@NotNull WriteContext context, long cycle) throws IOException {
if(roll.casNextRollCycle(cycle)) {
final WriteMarshallable marshallable = x -> x.write(MetaDataField.roll).int32(cycle);

write(
context,
Wires.UNKNOWN_LENGTH,
(WriteContext ctx, long position, int size, WriteMarshallable w) -> {
Wires.writeMeta(context.wire, w);
roll.setNextCycleMetaPosition(position);
return WireConstants.NO_INDEX;
},
marshallable
);

return true;
}

return false;
}

@Override @Override
public boolean moveToIndex(@NotNull ReadContext context, long index){ public boolean moveToIndex(@NotNull ReadContext context, long index){
long position = readPosition(); long position = readPosition();
Expand Down Expand Up @@ -270,7 +270,7 @@ private synchronized void performRelease() {
} }
} }


protected int toIntU30(long len) { private int toIntU30(long len) {
return Wires.toIntU30(len,"Document length %,d out of 30-bit int range."); return Wires.toIntU30(len,"Document length %,d out of 30-bit int range.");
} }


Expand All @@ -279,11 +279,11 @@ protected int toIntU30(long len) {
// ************************************************************************* // *************************************************************************


@FunctionalInterface @FunctionalInterface
interface Reader<T> { private interface Reader<T> {
long read(@NotNull ReadContext context, int len, @NotNull T reader) throws IOException; long read(@NotNull ReadContext context, int len, @NotNull T reader) throws IOException;
} }


protected long readWireMarshallable( private long readWireMarshallable(
@NotNull ReadContext context, @NotNull ReadContext context,
int len, int len,
@NotNull ReadMarshallable marshaller) { @NotNull ReadMarshallable marshaller) {
Expand All @@ -292,7 +292,7 @@ protected long readWireMarshallable(
return readWire(context.wire, len, marshaller); return readWire(context.wire, len, marshaller);
} }


protected long readBytesMarshallable( private long readBytesMarshallable(
@NotNull ReadContext context, @NotNull ReadContext context,
int len, int len,
@NotNull ReadBytesMarshallable marshaller) { @NotNull ReadBytesMarshallable marshaller) {
Expand All @@ -308,7 +308,11 @@ protected long readBytesMarshallable(
return readp + len; return readp + len;
} }


protected <T> long read(@NotNull ReadContext context, @NotNull Reader<T> reader, T marshaller) throws IOException { private <T> long read(
@NotNull ReadContext context,
@NotNull Reader<T> reader,
@NotNull T marshaller) throws IOException {

long position = context.bytes.readPosition(); long position = context.bytes.readPosition();
if(context.bytes.readRemaining() == 0) { if(context.bytes.readRemaining() == 0) {
mappedFile.acquireBytesForRead(position, context.bytes); mappedFile.acquireBytesForRead(position, context.bytes);
Expand Down Expand Up @@ -337,12 +341,12 @@ protected <T> long read(@NotNull ReadContext context, @NotNull Reader<T> reader,
return WireConstants.NO_DATA; return WireConstants.NO_DATA;
} }


//TODO move to wire //TODO : maybe move to wire
@ForceInline @ForceInline
protected long readWire(@NotNull WireIn wireIn, long len, @NotNull ReadMarshallable dataConsumer) { private long readWire(@NotNull WireIn wireIn, long size, @NotNull ReadMarshallable dataConsumer) {
final Bytes<?> bytes = wireIn.bytes(); final Bytes<?> bytes = wireIn.bytes();
final long limit0 = bytes.readLimit(); final long limit0 = bytes.readLimit();
final long limit = bytes.readPosition() + len; final long limit = bytes.readPosition() + size;
try { try {
bytes.readLimit(limit); bytes.readLimit(limit);
dataConsumer.readMarshallable(wireIn); dataConsumer.readMarshallable(wireIn);
Expand All @@ -359,11 +363,11 @@ protected long readWire(@NotNull WireIn wireIn, long len, @NotNull ReadMarshalla
// ************************************************************************* // *************************************************************************


@FunctionalInterface @FunctionalInterface
interface Writer<T> { private interface Writer<T> {
long write(@NotNull WriteContext context, long position, int size, @NotNull T writer) throws IOException; long write(@NotNull WriteContext context, long position, int size, @NotNull T writer) throws IOException;
} }


public long writeWireMarshallable( private long writeWireMarshallable(
@NotNull WriteContext context, @NotNull WriteContext context,
long position, long position,
int size, int size,
Expand All @@ -373,7 +377,7 @@ public long writeWireMarshallable(
return indexing.incrementLastIndex(); return indexing.incrementLastIndex();
} }


public long writeBytesMarshallable( private long writeBytesMarshallable(
@NotNull WriteContext context, @NotNull WriteContext context,
long position, long position,
int size, int size,
Expand All @@ -392,7 +396,7 @@ public long writeBytesMarshallable(
return indexing.incrementLastIndex(); return indexing.incrementLastIndex();
} }


public long writeBytes( private long writeBytes(
@NotNull WriteContext context, @NotNull WriteContext context,
long position, long position,
int size, int size,
Expand All @@ -405,8 +409,11 @@ public long writeBytes(
return indexing.incrementLastIndex(); return indexing.incrementLastIndex();
} }


protected <T> long writeWithLock(@NotNull WriteContext context, int size, @NotNull Writer<T> writer, T marshaller) private <T> long write(
throws IOException { @NotNull WriteContext context,
int size,
@NotNull Writer<T> writer,
@NotNull T marshaller) throws IOException {


final long end = System.currentTimeMillis() + builder.appendTimeout(); final long end = System.currentTimeMillis() + builder.appendTimeout();
long position = writePosition(); long position = writePosition();
Expand Down Expand Up @@ -434,35 +441,6 @@ protected <T> long writeWithLock(@NotNull WriteContext context, int size, @NotNu
} }
} }


protected WriteContext acquireLock(@NotNull WriteContext context, int size)
throws IOException {

final long end = System.currentTimeMillis() + builder.appendTimeout();
long position = writePosition();

for (; ;) {
if (position > context.bytes.safeLimit()) {
mappedFile.acquireBytesForWrite(position, context.bytes);
}

if(context.bytes.compareAndSwapInt(position, Wires.NOT_INITIALIZED, Wires.NOT_READY | size)) {
return context;
} else {
int spbHeader = context.bytes.readInt(position);
if (Wires.isKnownLength(spbHeader)) {
position += Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE;
} else {
// TODO: wait strategy
if(System.currentTimeMillis() > end) {
throw new AssertionError("Timeout waiting to append");
}

Jvm.pause(1);
}
}
}
}

// ************************************************************************* // *************************************************************************
// Marshallable // Marshallable
// ************************************************************************* // *************************************************************************
Expand Down

0 comments on commit 58b16e8

Please sign in to comment.