Skip to content

Commit

Permalink
QUEUE-27 async queue
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Oct 26, 2015
1 parent 404e291 commit 97d44be
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 46 deletions.
Expand Up @@ -104,5 +104,9 @@ public interface ChronicleQueue extends Closeable {
*/ */
long lastWrittenIndex(); long lastWrittenIndex();


/**
*
* @return
*/
WireType wireType(); WireType wireType();
} }
Expand Up @@ -19,7 +19,7 @@
import net.openhft.chronicle.queue.Excerpt; import net.openhft.chronicle.queue.Excerpt;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.wire.Wire; import net.openhft.chronicle.wire.WireType;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -71,6 +71,11 @@ public void close() throws IOException {
throw new UnsupportedOperationException("Not implemented"); throw new UnsupportedOperationException("Not implemented");
} }


@Override
public WireType wireType() {
throw new UnsupportedOperationException("Not implemented");
}

/** /**
* *
* @param cycle * @param cycle
Expand Down Expand Up @@ -109,12 +114,6 @@ public void close() throws IOException {
*/ */
public abstract long indexToIndex(); public abstract long indexToIndex();


/**
*
* @return
*/
public abstract Wire wire();

/** /**
* *
* @return * @return
Expand Down
Expand Up @@ -19,7 +19,7 @@


import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore; import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.bytes.NativeBytes; import net.openhft.chronicle.bytes.NativeBytesStore;
import net.openhft.chronicle.bytes.ReadBytesMarshallable; import net.openhft.chronicle.bytes.ReadBytesMarshallable;
import net.openhft.chronicle.core.annotation.ForceInline; import net.openhft.chronicle.core.annotation.ForceInline;
import net.openhft.chronicle.core.util.ThrowingFunction; import net.openhft.chronicle.core.util.ThrowingFunction;
Expand Down Expand Up @@ -76,7 +76,7 @@ public ChronicleQueue queue() {
* Delegates the appender * Delegates the appender
*/ */
public static class DelegatedAppender extends DefaultAppender<ChronicleQueue> { public static class DelegatedAppender extends DefaultAppender<ChronicleQueue> {
private final Bytes buffer; private final BytesStore store;
private final Wire wire; private final Wire wire;
private final Consumer<Bytes> consumer; private final Consumer<Bytes> consumer;


Expand All @@ -86,16 +86,16 @@ public DelegatedAppender(


super(queue); super(queue);


this.buffer = NativeBytes.nativeBytes(); this.store = NativeBytesStore.nativeStoreWithFixedCapacity(1024);
this.wire = queue.wireType().apply(this.buffer); this.wire = queue.wireType().apply(this.store.bytesForWrite());
this.consumer = consumer; this.consumer = consumer;
} }


@Override @Override
public long writeDocument(WriteMarshallable writer) throws IOException { public long writeDocument(WriteMarshallable writer) throws IOException {
buffer.clear(); wire.bytes().clear();
writer.writeMarshallable(wire); writer.writeMarshallable(wire);
consumer.accept(buffer); consumer.accept(wire.bytes());


return WireConstants.NO_INDEX; return WireConstants.NO_INDEX;
} }
Expand Down Expand Up @@ -138,8 +138,11 @@ public boolean write(
final Bytes bytes = bytesFunction.apply(store::acquire); final Bytes bytes = bytesFunction.apply(store::acquire);
if(bytes != null && !bytes.isClear()) { if(bytes != null && !bytes.isClear()) {
bytes.writeVolatileInt( bytes.writeVolatileInt(
bytes.start(), bytes.readPosition(),
Wires.toIntU30(bytes.writePosition() - bytes.start() + 4, "TODO")); Wires.toIntU30(
bytes.length() - 4,
"Document length %,d out of 30-bit int range.")
);


return true; return true;
} }
Expand Down Expand Up @@ -186,7 +189,7 @@ private WireStore store() throws IOException {
this.store = queue.storeForCycle(this.cycle); this.store = queue.storeForCycle(this.cycle);
} }


return this.store(); return this.store;
} }
} }


Expand Down
Expand Up @@ -42,7 +42,7 @@ public class AsyncChronicleQueue extends DelegatedChronicleQueue {
public AsyncChronicleQueue(@NotNull ChronicleQueue queue, long capacity) throws IOException { public AsyncChronicleQueue(@NotNull ChronicleQueue queue, long capacity) throws IOException {
super(queue); super(queue);


this.store = NativeBytesStore.nativeStoreWithFixedCapacity(capacity); this.store = NativeBytesStore.nativeStoreWithFixedCapacity(capacity);
this.buffer = new BytesRingBuffer(this.store.bytesForWrite()); this.buffer = new BytesRingBuffer(this.store.bytesForWrite());
this.appender = null; this.appender = null;


Expand All @@ -60,6 +60,8 @@ public AsyncChronicleQueue(@NotNull ChronicleQueue queue, long capacity) throws


return false; return false;
}); });

this.eventGroup.start();
} }


@NotNull @NotNull
Expand Down
Expand Up @@ -63,7 +63,7 @@ public BytesRingBuffer(@NotNull final Bytes buffer) {
* @return returning {@code true} upon success and {@code false} if this queue is full. * @return returning {@code true} upon success and {@code false} if this queue is full.
*/ */
public boolean offer(@NotNull Bytes bytes0) throws InterruptedException { public boolean offer(@NotNull Bytes bytes0) throws InterruptedException {
bytes0.readLimit(bytes0.writeLimit()); //bytes0.readLimit(bytes0.writeLimit());


try { try {


Expand All @@ -81,7 +81,7 @@ public boolean offer(@NotNull Bytes bytes0) throws InterruptedException {
return false; return false;


// write the size // write the size
long len = bytes0.readLimit(); long len = bytes0.length();


long messageLen = SIZE + FLAG + len; long messageLen = SIZE + FLAG + len;


Expand Down
Expand Up @@ -22,7 +22,6 @@
import net.openhft.chronicle.queue.impl.AbstractChronicleQueue; import net.openhft.chronicle.queue.impl.AbstractChronicleQueue;
import net.openhft.chronicle.queue.impl.WireStore; import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.WireStorePool; import net.openhft.chronicle.queue.impl.WireStorePool;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireType; import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.WiredFile; import net.openhft.chronicle.wire.WiredFile;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -140,19 +139,14 @@ protected long lastCycle() {


@Override @Override
public WireType wireType() { public WireType wireType() {
throw new UnsupportedOperationException("todo"); return builder.wireType();
} }


@Override @Override
public long indexToIndex() { public long indexToIndex() {
throw new UnsupportedOperationException("todo"); throw new UnsupportedOperationException("todo");
} }


@Override
public Wire wire() {
throw new UnsupportedOperationException("todo");
}

@Override @Override
public long newIndex() { public long newIndex() {
throw new UnsupportedOperationException("todo"); throw new UnsupportedOperationException("todo");
Expand Down
Expand Up @@ -172,7 +172,11 @@ public Bytes<?> acquire(long size) throws IOException {
final NativeBytes bytes = WireConstants.NBP.get(); final NativeBytes bytes = WireConstants.NBP.get();


withLock( withLock(
(store, position) -> bytes.bytesStore(store, position + 4, position + 4 + size), (store, position) -> {
bytes.bytesStore(store, position, 4 + size);
bytes.writePosition(position + 4);
bytes.writeLimit(position + 4 + size);
},
size30 size30
); );


Expand Down Expand Up @@ -310,37 +314,37 @@ public long refCount() {
// Utilities // Utilities
// ************************************************************************* // *************************************************************************


//TODO move top wire //TODO move to wire
protected boolean acquireLock(BytesStore store, long position, int size) { protected boolean acquireLock(BytesStore store, long position, int size) {
return store.compareAndSwapInt(position, Wires.NOT_INITIALIZED, Wires.NOT_READY | size); return store.compareAndSwapInt(position, Wires.NOT_INITIALIZED, Wires.NOT_READY | size);
} }


protected void withLock(BytesStoreFunction function) protected void withLock(@NotNull BytesStoreFunction function)
throws IOException { throws IOException {
withLock(function, 0x0); withLock(function, 0x0);
} }


protected void withLock(BytesStoreFunction function, int size) protected void withLock(@NotNull BytesStoreFunction function, int size)
throws IOException { throws IOException {


long TIMEOUT_MS = 10_000; // 10 seconds. long TIMEOUT_MS = 10_000; // 10 seconds.
long end = System.currentTimeMillis() + TIMEOUT_MS; long end = System.currentTimeMillis() + TIMEOUT_MS;
long lastWritePosition = writePosition(); long writePosition = writePosition();
BytesStore store; BytesStore store;


for (; ;) { for (; ;) {
checkRemainingForAppend(lastWritePosition); checkRemainingForAppend(writePosition);


//TODO: a byte store should be acquired only if lastWrittenPosition is out its limits //TODO: a byte store should be acquired only if lastWrittenPosition is out its limits
store = mappedFile.acquireByteStore(lastWritePosition); store = mappedFile.acquireByteStore(writePosition);


if(acquireLock(store, lastWritePosition, size)) { if(acquireLock(store, writePosition, size)) {
function.apply(store, lastWritePosition); function.apply(store, writePosition);
return; return;
} else { } else {
int spbHeader = store.readInt(lastWritePosition); int spbHeader = store.readInt(writePosition);
if (Wires.isKnownLength(spbHeader)) { if (Wires.isKnownLength(spbHeader)) {
lastWritePosition += Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE; writePosition += Wires.lengthOf(spbHeader) + SPB_DATA_HEADER_SIZE;
} else { } else {
// TODO: wait strategy // TODO: wait strategy
if(System.currentTimeMillis() > end) { if(System.currentTimeMillis() > end) {
Expand Down
Expand Up @@ -18,17 +18,16 @@


package net.openhft.chronicle.queue.impl.single.work.in.progress; package net.openhft.chronicle.queue.impl.single.work.in.progress;


import net.openhft.chronicle.bytes.IORuntimeException;
import net.openhft.chronicle.core.values.LongArrayValues;
import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.AbstractChronicleQueue; import net.openhft.chronicle.queue.impl.AbstractChronicleQueue;
import net.openhft.chronicle.wire.*; import net.openhft.chronicle.wire.BinaryLongArrayReference;
import net.openhft.chronicle.wire.ByteableLongArrayValues;
import net.openhft.chronicle.wire.TextLongArrayReference;
import net.openhft.chronicle.wire.WireType;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import static java.lang.ThreadLocal.withInitial; import static java.lang.ThreadLocal.withInitial;
import static net.openhft.chronicle.queue.impl.single.work.in.progress.Indexer.IndexOffset.toAddress0;
import static net.openhft.chronicle.queue.impl.single.work.in.progress.Indexer.IndexOffset.toAddress1;




/** /**
Expand Down Expand Up @@ -96,6 +95,7 @@ public synchronized void index() throws Exception {
*/ */
private void recordAddress(long index, long address) { private void recordAddress(long index, long address) {


/*
if (index % 64 != 0) if (index % 64 != 0)
return; return;
Expand Down Expand Up @@ -124,6 +124,7 @@ public void readMarshallable(@NotNull WireIn wire) throws IORuntimeException {
}, null); }, null);
*/




} }
Expand Down
Expand Up @@ -16,6 +16,7 @@
package net.openhft.chronicle.queue; package net.openhft.chronicle.queue;


import net.openhft.chronicle.core.OS; import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.wire.WireKey;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.*; import org.junit.rules.*;
import org.junit.runner.Description; import org.junit.runner.Description;
Expand Down Expand Up @@ -62,6 +63,14 @@ protected void starting(Description description) {
// //
// ************************************************************************* // *************************************************************************


public static enum TestKey implements WireKey {
test
}

// *************************************************************************
//
// *************************************************************************

protected File getTmpDir() { protected File getTmpDir() {
try { try {
final File tmpDir = Files.createTempDirectory(getClass().getSimpleName() + "-").toFile(); final File tmpDir = Files.createTempDirectory(getClass().getSimpleName() + "-").toFile();
Expand Down
@@ -0,0 +1,42 @@
/*
*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package net.openhft.chronicle.queue.impl.ringbuffer;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.NativeBytesStore;
import org.junit.Ignore;
import org.junit.Test;

import java.io.IOException;

public class BytesRingBufferTest {

@Ignore
@Test
public void testOffer() throws IOException, InterruptedException {
NativeBytesStore allocate = NativeBytesStore.nativeStoreWithFixedCapacity(1024);
NativeBytesStore msgBytes = NativeBytesStore.nativeStoreWithFixedCapacity(150);

BytesRingBuffer ring = new BytesRingBuffer(allocate.bytesForWrite());
Bytes buffer = msgBytes.bytesForWrite();

buffer.clear();
buffer.writeLong(1L);
ring.offer(buffer);
}
}
Expand Up @@ -17,9 +17,37 @@
*/ */
package net.openhft.chronicle.queue.impl.single; package net.openhft.chronicle.queue.impl.single;


import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueTestBase; import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.async.AsyncChronicleQueueBuilder;
import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.assertEquals;




public class AsyncChronicleQueueTest extends ChronicleQueueTestBase { public class AsyncChronicleQueueTest extends ChronicleQueueTestBase {


@Test
public void testAppendAndRead() throws IOException {
final ChronicleQueue queue = SingleChronicleQueueBuilder.text(getTmpDir()).build();
final ChronicleQueue async = new AsyncChronicleQueueBuilder(queue).build();

final ExcerptAppender appender = async.createAppender();
for (int i = 0; i < 10; i++) {
final int n = i;
appender.writeDocument(w -> w.write(TestKey.test).int32(n));
}

final ExcerptTailer tailer = queue.createTailer();
for (int i = 0; i < 10;) {
final int n = i;
if(tailer.readDocument(r -> assertEquals(n, r.read(TestKey.test).int32()))) {
i++;
}
}
}
} }
Expand Up @@ -44,10 +44,6 @@
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class SingleChronicleQueueTest extends ChronicleQueueTestBase { public class SingleChronicleQueueTest extends ChronicleQueueTestBase {


enum TestKey implements WireKey {
test
}

@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{ return Arrays.asList(new Object[][]{
Expand Down

0 comments on commit 97d44be

Please sign in to comment.