Skip to content

Commit

Permalink
QUEUE-24 implement append on SingleChronicleQueueFormat
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 15, 2015
1 parent 0de8d06 commit e7a85c8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 61 deletions.
Expand Up @@ -36,16 +36,12 @@ public abstract class AbstractChronicleQueueFormat implements ChronicleQueueForm
public static final long SPB_HEADER_BUILDING = 0x1;
public static final long SPB_HEADER_BUILT = WireUtil.asLong("QUEUE400");

private final Function<Bytes, Wire> wireSupplier;
protected final Function<Bytes, Wire> wireSupplier;

protected AbstractChronicleQueueFormat(WireType wireType) {
this.wireSupplier = WireUtil.wireSupplierFor(wireType);
}

protected Wire wireFor(Bytes bytes) {
return wireSupplier.apply(bytes);
}

protected Function<Bytes, Wire> wireSupplier() {
return this.wireSupplier;
}
Expand Down
Expand Up @@ -16,7 +16,7 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.MappedBytesStore;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.bytes.MappedFile;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.core.values.LongValue;
Expand All @@ -28,18 +28,19 @@
import java.time.ZonedDateTime;
import java.util.UUID;

import static net.openhft.chronicle.wire.WireUtil.wireCache;

class SingleChronicleQueueFormat extends AbstractChronicleQueueFormat {
static {
ClassAliasPool.CLASS_ALIASES.addAlias(Header.class, "Header");
ClassAliasPool.CLASS_ALIASES.addAlias(Header.class, Header.CLASS_ALIAS);
}

private final SingleChronicleQueueBuilder builder;
private final MappedFile mappedFile;
private final MappedBytesStore mappedStore;
private final BytesStore mappedStore;
private final Header header;

private final ThreadLocal<WireIn> wireInCache;
private final ThreadLocal<WireOut> wireOutCache;
private final ThreadLocal<Wire> wireInCache;
private final ThreadLocal<Wire> wireOutCache;

SingleChronicleQueueFormat(final SingleChronicleQueueBuilder builder) throws IOException {
super(builder.wireType());
Expand All @@ -48,24 +49,8 @@ class SingleChronicleQueueFormat extends AbstractChronicleQueueFormat {
this.mappedFile = MappedFile.mappedFile(this.builder.path(), this.builder.blockSize());
this.mappedStore = mappedFile.acquireByteStore(SPB_HEADER_BYTE);
this.header = new Header();

// TODO: refactor
this.wireInCache = ThreadLocal.withInitial(() -> {
try {
return super.wireFor(this.mappedFile.acquireBytesForRead(0));
} catch(Exception e) {
throw new RuntimeException(e);
}
});

// TODO: refactor
this.wireOutCache = ThreadLocal.withInitial(() -> {
try {
return super.wireFor(this.mappedFile.acquireBytesForWrite(0));
} catch(Exception e) {
throw new RuntimeException(e);
}
});
this.wireInCache = wireCache(mappedStore::bytesForRead, wireSupplier());
this.wireOutCache = wireCache(mappedStore::bytesForWrite, wireSupplier());
}

// *************************************************************************
Expand All @@ -79,23 +64,23 @@ private SingleChronicleQueueFormat buildHeader() throws IOException {

@Override
public long append(@NotNull WriteMarshallable writer) throws IOException {
final LongValue writeByte = header.writeByte();
for (long lastByte = header.getWriteByte(); ; ) {
if(mappedStore.compareAndSwapInt(lastByte, WireUtil.FREE, WireUtil.BUILDING)) {
final WireOut wo = wireOutCache.get();
final Bytes wb = wo.bytes();

for (long lastByte = writeByte.getVolatileValue(); ; ) {
if(this.mappedStore.compareAndSwapInt(lastByte, WireUtil.FREE, WireUtil.BUILDING)) {
WireOut wo = wireOutCache.get();
wo.bytes().writePosition(lastByte);
wb.writePosition(lastByte);

WireUtil.writeData(wo, writer);
writeByte.setOrderedValue(wo.bytes().writePosition());
header.setWriteByteLazy(wb.writePosition());

return header.lastIndex().addAtomicValue(1);
return header.incrementLastIndex();
} else {
int lastState = this.mappedStore.readInt(lastByte);
int lastState = mappedStore.readInt(lastByte);
if(WireUtil.isKnownLength(lastState)) {
lastByte += Wires.lengthOf(lastState);
} else {
// todo need to wait
// TODO: need to wait, waiting strategy ?
}
}
}
Expand All @@ -122,6 +107,7 @@ public boolean read(@NotNull AtomicLong offset, @NotNull Bytes buffer) {
}
}
}
*/

protected boolean checkRemainingForAppend(@NotNull Bytes buffer) {
long remaining = buffer.writeRemaining();
Expand All @@ -131,7 +117,6 @@ protected boolean checkRemainingForAppend(@NotNull Bytes buffer) {

return true;
}
*/

// *************************************************************************
//
Expand All @@ -146,14 +131,15 @@ public static SingleChronicleQueueFormat from(
//
// *************************************************************************

enum Field implements WireKey {
private enum HeaderField implements WireKey {
type,
uuid, created, user, host,
indexCount, indexSpacing,
writeByte, index2Index, lastIndex
}

private class Header implements Marshallable {
public static final String CLASS_ALIAS = "Header";
public static final long PADDED_SIZE = 512;

// fields which can be serialized/deserialized in the normal way.
Expand Down Expand Up @@ -201,37 +187,41 @@ LongValue lastIndex() {

@Override
public void writeMarshallable(@NotNull WireOut out) {
out.write(Field.uuid).uuid(uuid)
.write(Field.writeByte).int64forBinding(PADDED_SIZE)
.write(Field.created).zonedDateTime(created)
.write(Field.user).text(user)
.write(Field.host).text(host)
.write(Field.indexCount).int32(indexCount)
.write(Field.indexSpacing).int32(indexSpacing)
.write(Field.index2Index).int64forBinding(0L)
.write(Field.lastIndex).int64forBinding(-1L);
out.write(HeaderField.uuid).uuid(uuid)
.write(HeaderField.writeByte).int64forBinding(PADDED_SIZE)
.write(HeaderField.created).zonedDateTime(created)
.write(HeaderField.user).text(user)
.write(HeaderField.host).text(host)
.write(HeaderField.indexCount).int32(indexCount)
.write(HeaderField.indexSpacing).int32(indexSpacing)
.write(HeaderField.index2Index).int64forBinding(0L)
.write(HeaderField.lastIndex).int64forBinding(-1L);
//out.addPadding((int) (PADDED_SIZE - out.bytes().writePosition()));
}

@Override
public void readMarshallable(@NotNull WireIn in) {
in.read(Field.uuid).uuid(this, (o, i) -> o.uuid = i)
.read(Field.writeByte).int64(this.writeByte, this, (o, i) -> o.writeByte = i)
.read(Field.created).zonedDateTime(this, (o, i) -> o.created = i)
.read(Field.user).text(this, (o, i) -> o.user = i)
.read(Field.host).text(this, (o, i) -> o.host = i)
.read(Field.indexCount).int32(this, (o, i) -> o.indexCount = i)
.read(Field.indexSpacing).int32(this, (o, i) -> o.indexSpacing = i)
.read(Field.index2Index).int64(this.index2Index, this, (o, i) -> o.index2Index = i)
.read(Field.lastIndex).int64(this.lastIndex, this, (o, i ) -> o.lastIndex = i);
in.read(HeaderField.uuid).uuid(this, (o, i) -> o.uuid = i)
.read(HeaderField.writeByte).int64(this.writeByte, this, (o, i) -> o.writeByte = i)
.read(HeaderField.created).zonedDateTime(this, (o, i) -> o.created = i)
.read(HeaderField.user).text(this, (o, i) -> o.user = i)
.read(HeaderField.host).text(this, (o, i) -> o.host = i)
.read(HeaderField.indexCount).int32(this, (o, i) -> o.indexCount = i)
.read(HeaderField.indexSpacing).int32(this, (o, i) -> o.indexSpacing = i)
.read(HeaderField.index2Index).int64(this.index2Index, this, (o, i) -> o.index2Index = i)
.read(HeaderField.lastIndex).int64(this.lastIndex, this, (o, i) -> o.lastIndex = i);
}

public long getWriteByte() {
return writeByte().getVolatileValue();
return writeByte.getVolatileValue();
}

public void setWriteByteLazy(long writeByte) {
this.writeByte().setOrderedValue(writeByte);
this.writeByte.setOrderedValue(writeByte);
}

public long incrementLastIndex() {
return lastIndex.addAtomicValue(1);
}
}
}
Expand Up @@ -13,7 +13,6 @@
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package net.openhft.chronicle.wire;

import net.openhft.chronicle.bytes.Bytes;
Expand All @@ -27,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.function.Function;
import java.util.function.Supplier;

//TODO: workaround for protected access to WireInternal
public class WireUtil {
Expand Down Expand Up @@ -61,6 +61,18 @@ public static long asLong(@NotNull String str) {
.getLong();
}

public static ThreadLocal<Wire> wireCache(
@NotNull final Supplier<Bytes> bytesSupplier,
@NotNull final Function<Bytes, Wire> wireSupplier) {
return ThreadLocal.withInitial(() -> {
try {
return wireSupplier.apply(bytesSupplier.get());
} catch(Exception e) {
throw new RuntimeException(e);
}
});
}

// *************************************************************************
// WIRE
// *************************************************************************
Expand Down

0 comments on commit e7a85c8

Please sign in to comment.