Skip to content

Commit

Permalink
QUEUE-24: implement appender
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 14, 2015
1 parent d3f5dd2 commit 0d8c802
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 21 deletions.
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractExcerptAppender implements ExcerptAppender {
@Nullable
@Override
public WireOut wire() {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public void writeDocument(WriteMarshallable writer) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public long lastWrittenIndex() {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public ChronicleQueue chronicle() {
throw new UnsupportedOperationException("Not implemented");
}
}
Expand Up @@ -16,5 +16,18 @@
package net.openhft.chronicle.queue.impl; package net.openhft.chronicle.queue.impl;




import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;

public interface ChronicleQueueFormat { public interface ChronicleQueueFormat {

/**
*
* @param writer
* @return
* @throws IOException
*/
long append(@NotNull WriteMarshallable writer) throws IOException;
} }
Expand Up @@ -16,7 +16,11 @@


package net.openhft.chronicle.queue.impl.single; package net.openhft.chronicle.queue.impl.single;


import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.AbstractChronicleQueue; import net.openhft.chronicle.queue.impl.AbstractChronicleQueue;
import net.openhft.chronicle.queue.impl.AbstractExcerptAppender;
import net.openhft.chronicle.wire.WriteMarshallable;


import java.io.IOException; import java.io.IOException;


Expand All @@ -25,4 +29,29 @@ class SingleChronicleQueue extends AbstractChronicleQueue<SingleChronicleQueueFo
protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws IOException { protected SingleChronicleQueue(final SingleChronicleQueueBuilder builder) throws IOException {
super(SingleChronicleQueueFormat.from(builder)); super(SingleChronicleQueueFormat.from(builder));
} }

@Override
public ExcerptAppender createAppender() {
return new Appender();
}

// *************************************************************************
//
// *************************************************************************

private class Appender extends AbstractExcerptAppender {
@Override
public void writeDocument(WriteMarshallable writer) {
try {
format().append(writer);
} catch(IOException e) {
//TODO: should this method throw an exception ?
}
}

@Override
public ChronicleQueue chronicle() {
return SingleChronicleQueue.this;
}
}
} }
Expand Up @@ -38,13 +38,34 @@ class SingleChronicleQueueFormat extends AbstractChronicleQueueFormat {
private final MappedBytesStore mappedStore; private final MappedBytesStore mappedStore;
private final Header header; private final Header header;


private final ThreadLocal<WireIn> wireInCache;
private final ThreadLocal<WireOut> wireOutCache;

SingleChronicleQueueFormat(final SingleChronicleQueueBuilder builder) throws IOException { SingleChronicleQueueFormat(final SingleChronicleQueueBuilder builder) throws IOException {
super(builder.wireType()); super(builder.wireType());


this.builder = builder; this.builder = builder;
this.mappedFile = MappedFile.mappedFile(this.builder.path(), this.builder.blockSize()); this.mappedFile = MappedFile.mappedFile(this.builder.path(), this.builder.blockSize());
this.mappedStore = mappedFile.acquireByteStore(SPB_HEADER_BYTE); this.mappedStore = mappedFile.acquireByteStore(SPB_HEADER_BYTE);
this.header = new Header(); this.header = new Header();

// TODO: refactor
this.wireInCache = ThreadLocal.withInitial(() -> {
try {
return super.wireFor(this.mappedFile.acquireBytesForRead(0));
} catch(Exception e) {
throw new RuntimeException(e);
}
});

// TODO: refactor
this.wireOutCache = ThreadLocal.withInitial(() -> {
try {
return super.wireFor(this.mappedFile.acquireBytesForWrite(0));
} catch(Exception e) {
throw new RuntimeException(e);
}
});
} }


// ************************************************************************* // *************************************************************************
Expand All @@ -56,15 +77,17 @@ private SingleChronicleQueueFormat buildHeader() throws IOException {
return this; return this;
} }


protected long append(@NotNull WriteMarshallable writer) throws IOException { @Override
public long append(@NotNull WriteMarshallable writer) throws IOException {
final LongValue writeByte = header.writeByte(); final LongValue writeByte = header.writeByte();


for (long lastByte = writeByte.getVolatileValue(); ; ) { for (long lastByte = writeByte.getVolatileValue(); ; ) {
if(this.mappedStore.compareAndSwapInt(lastByte, WireUtil.FREE, WireUtil.BUILDING)) { if(this.mappedStore.compareAndSwapInt(lastByte, WireUtil.FREE, WireUtil.BUILDING)) {
Bytes wb = this.mappedStore.bytesForWrite().writePosition(lastByte); WireOut wo = wireOutCache.get();
wo.bytes().writePosition(lastByte);


WireUtil.writeData(wireOut(wb), writer); WireUtil.writeData(wo, writer);
writeByte.setOrderedValue(wb.writePosition()); writeByte.setOrderedValue(wo.bytes().writePosition());


return header.lastIndex().addAtomicValue(1); return header.lastIndex().addAtomicValue(1);
} else { } else {
Expand All @@ -75,14 +98,6 @@ protected long append(@NotNull WriteMarshallable writer) throws IOException {
// todo need to wait // todo need to wait
} }
} }

/*
try {
Jvm.checkInterrupted();
} catch (InterruptedException e) {
throw new InterruptedRuntimeException(e);
}
*/
} }
} }


Expand All @@ -108,15 +123,13 @@ public boolean read(@NotNull AtomicLong offset, @NotNull Bytes buffer) {
} }
} }
protected boolean checkRemainingForAppend(@NotNull Bytes buffer) {
long remaining = buffer.writeRemaining();
protected long checkRemainingForAppend(@NotNull Bytes buffer) { if (remaining > WireUtil.LENGTH_MASK) {
long remaining = buffer.remaining();
if (remaining > MAX_LENGTH) {
throw new IllegalStateException("Length too large: " + remaining); throw new IllegalStateException("Length too large: " + remaining);
} }
return remaining; return true;
} }
*/ */


Expand Down
Expand Up @@ -35,6 +35,7 @@ public class WireUtil {
public static final int NOT_READY = Wires.NOT_READY; public static final int NOT_READY = Wires.NOT_READY;
public static final int META_DATA = Wires.META_DATA; public static final int META_DATA = Wires.META_DATA;
public static final int UNKNOWN_LENGTH = Wires.UNKNOWN_LENGTH; public static final int UNKNOWN_LENGTH = Wires.UNKNOWN_LENGTH;
public static final int MAX_LENGTH = LENGTH_MASK;
public static final int FREE = 0x0; public static final int FREE = 0x0;
public static final int BUILDING = WireUtil.NOT_READY | WireUtil.UNKNOWN_LENGTH; public static final int BUILDING = WireUtil.NOT_READY | WireUtil.UNKNOWN_LENGTH;


Expand Down
Expand Up @@ -15,7 +15,10 @@
*/ */
package net.openhft.chronicle.queue.impl.single; package net.openhft.chronicle.queue.impl.single;


import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueTestBase; import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.ChronicleQueueFormat;
import net.openhft.chronicle.wire.WireKey; import net.openhft.chronicle.wire.WireKey;
import org.junit.Test; import org.junit.Test;


Expand All @@ -29,16 +32,27 @@ public void testHeaderCreation() throws IOException {
} }


@Test @Test
public void testSimpleAppend() throws IOException { public void testAppendViaFormat() throws IOException {
SingleChronicleQueueFormat format = final SingleChronicleQueueBuilder builder = SingleChronicleQueueBuilder.text(getTmpFile());
SingleChronicleQueueFormat.from(SingleChronicleQueueBuilder.text(getTmpFile())); final ChronicleQueueFormat format = SingleChronicleQueueFormat.from(builder);


for(int i=0; i<10; i++) { for(int i=0; i<10; i++) {
final int n = i; final int n = i;
format.append(w -> w.write(TestKey.test).text("event " + n)); format.append(w -> w.write(TestKey.test).text("event " + n));
} }
} }


@Test
public void testAppendViaAppender() throws IOException {
final ChronicleQueue queue = SingleChronicleQueueBuilder.text(getTmpFile()).build();

final ExcerptAppender appender = queue.createAppender();
for(int i=0; i<10; i++) {
final int n = i;
appender.writeDocument(w -> w.write(TestKey.test).text("event " + n));
}
}

enum TestKey implements WireKey { enum TestKey implements WireKey {
test test
} }
Expand Down

0 comments on commit 0d8c802

Please sign in to comment.