Skip to content

Commit

Permalink
QUEUE-27 async queue
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Oct 27, 2015
1 parent d37e717 commit bd36f68
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 44 deletions.
Expand Up @@ -59,7 +59,7 @@ public ReadContext store(@NotNull BytesStore store, long position) {


public ReadContext store(@NotNull BytesStore store, long position, long size) { public ReadContext store(@NotNull BytesStore store, long position, long size) {
if(store != this.bytes.bytesStore()) { if(store != this.bytes.bytesStore()) {
((VanillaBytes) this.wire.bytes()).bytesStore(store, position, size); this.bytes.bytesStore(store, position, size);
} }


this.position = position; this.position = position;
Expand Down
Expand Up @@ -15,11 +15,7 @@
*/ */
package net.openhft.chronicle.queue.impl; package net.openhft.chronicle.queue.impl;


import net.openhft.chronicle.core.pool.StringBuilderPool;

public class WireConstants { public class WireConstants {
public static final StringBuilderPool SBP = new StringBuilderPool();

public static final long NO_DATA = 0L; public static final long NO_DATA = 0L;
public static final long NO_INDEX = -1L; public static final long NO_INDEX = -1L;
public static final long HEADER_OFFSET = 0L; public static final long HEADER_OFFSET = 0L;
Expand Down
Expand Up @@ -146,9 +146,8 @@ public long append(@NotNull WriteContext context, @NotNull final Bytes bytes) th
final int size = toIntU30(bytes.length()); final int size = toIntU30(bytes.length());
final long position = acquireLock(context, size).position(); final long position = acquireLock(context, size).position();


context.store() context.bytes.write(position + 4, bytes);
.write(position + 4, bytes) context.bytes.compareAndSwapInt(position, size | Wires.NOT_READY, size);
.compareAndSwapInt(position, size | Wires.NOT_READY, size);


return indexing.incrementLastIndex(); return indexing.incrementLastIndex();
} }
Expand All @@ -169,17 +168,13 @@ public long read(@NotNull ReadContext context, @NotNull ReadMarshallable reader)
} }


final int spbHeader = context.bytes.readVolatileInt(position); final int spbHeader = context.bytes.readVolatileInt(position);
if(Wires.isNotInitialized(spbHeader)) { if(!Wires.isNotInitialized(spbHeader) && Wires.isReady(spbHeader)) {
return WireConstants.NO_DATA;
}

if(Wires.isReady(spbHeader)) {
if(Wires.isData(spbHeader)) { if(Wires.isData(spbHeader)) {
return Wires.readData(context.wire(position, builder.blockSize()), reader); return Wires.readData(context.wire(position, builder.blockSize()), reader);
} else { } else {
// In case of meta data, if we are found the "roll" meta, we returns // In case of meta data, if we are found the "roll" meta, we returns
// the next cycle (negative) // the next cycle (negative)
final StringBuilder sb = WireConstants.SBP.acquireStringBuilder(); final StringBuilder sb = Wires.acquireStringBuilder();
final ValueIn vi = context.wire(position + SPB_DATA_HEADER_SIZE, builder.blockSize()).read(sb); final ValueIn vi = context.wire(position + SPB_DATA_HEADER_SIZE, builder.blockSize()).read(sb);


if("roll".contentEquals(sb)) { if("roll".contentEquals(sb)) {
Expand Down
Expand Up @@ -38,7 +38,6 @@


import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;


@RunWith(Parameterized.class) @RunWith(Parameterized.class)
Expand All @@ -52,10 +51,6 @@ public static Collection<Object[]> data() {
}); });
} }


// *************************************************************************
//
// *************************************************************************

private final WireType wireType; private final WireType wireType;


/** /**
Expand All @@ -67,6 +62,8 @@ public SingleChronicleQueueTest(WireType wireType) {


// ************************************************************************* // *************************************************************************
// //
// TESTS
//
// ************************************************************************* // *************************************************************************


@Test @Test
Expand Down Expand Up @@ -113,30 +110,6 @@ public void testAppendAndRead() throws IOException {
} }
} }


/*
* Tailer doesn't work if created before the appender
*
* See https://higherfrequencytrading.atlassian.net/browse/QUEUE-28
*/
@Test
public void testQUEUE28() throws IOException {
final ChronicleQueue queue = new SingleChronicleQueueBuilder(getTmpDir())
.wireType(this.wireType)
.build();

final ExcerptTailer tailer = queue.createTailer();
assertFalse(tailer.readDocument(r ->
r.read(TestKey.test).int32()
));

final ExcerptAppender appender = queue.createAppender();
appender.writeDocument(w -> w.write(TestKey.test).int32(1));

assertTrue(tailer.readDocument(r ->
r.read(TestKey.test).int32()
));
}

@Test @Test
public void testReadAndAppend() throws IOException { public void testReadAndAppend() throws IOException {
final ChronicleQueue queue = new SingleChronicleQueueBuilder(getTmpDir()) final ChronicleQueue queue = new SingleChronicleQueueBuilder(getTmpDir())
Expand Down
@@ -0,0 +1,74 @@
/*
*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package net.openhft.chronicle.queue.impl.single.jira;

import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.WireType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class Queue28 extends ChronicleQueueTestBase {

@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{WireType.TEXT},
{WireType.BINARY}
});
}

private final WireType wireType;

public Queue28(WireType wireType) {
this.wireType = wireType;
}

/*
* Tailer doesn't work if created before the appender
*
* See https://higherfrequencytrading.atlassian.net/browse/QUEUE-28
*/
@Test
public void test() throws IOException {
final ChronicleQueue queue = new SingleChronicleQueueBuilder(getTmpDir())
.wireType(this.wireType)
.build();

final ExcerptTailer tailer = queue.createTailer();
assertFalse(tailer.readDocument(r -> r.read(TestKey.test).int32()));

final ExcerptAppender appender = queue.createAppender();
appender.writeDocument(w -> w.write(TestKey.test).int32(1));

assertTrue(tailer.readDocument(r -> r.read(TestKey.test).int32()));
}
}
Expand Up @@ -15,11 +15,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
* *
*/ */
package net.openhft.chronicle.queue.impl.single; package net.openhft.chronicle.queue.impl.single.jira;


import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueTestBase; import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.WireType; import net.openhft.chronicle.wire.WireType;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
Expand Down

0 comments on commit bd36f68

Please sign in to comment.