Skip to content

Commit

Permalink
Adapt the tests to read/write Bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Feb 5, 2015
1 parent 2fbcf47 commit 0fe82ce
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 12 deletions.
Expand Up @@ -4,15 +4,21 @@
import net.openhft.lang.io.Bytes;
import net.openhft.lang.io.MultiStoreBytes;

import java.util.concurrent.atomic.AtomicLong;

/**
* Created by peter.lawrey on 03/02/15.
*/
public interface DirectChronicle extends Chronicle {
void appendDocument(Bytes buffer);

void readDocument(AtomicLong offset, Bytes buffer);

Bytes bytes();

long lastIndex();

boolean index(long index, MultiStoreBytes bytes);

long firstBytes();
}
Expand Up @@ -21,6 +21,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicLong;

/**
* SingleChronicle implements Chronicle over a single streaming file
Expand All @@ -44,6 +45,7 @@ public class SingleChronicle implements Chronicle, DirectChronicle {
private final Header header = new Header();
private final ChronicleWire wire;
private final Bytes bytes;
private long firstBytes = -1;

public SingleChronicle(String filename, long blockSize) throws IOException {
file = new MappedFile(filename, blockSize);
Expand Down Expand Up @@ -74,6 +76,24 @@ public void appendDocument(Bytes buffer) {
}
}

@Override
public void readDocument(AtomicLong offset, Bytes buffer) {
buffer.clear();
long lastByte = offset.get();
for (; ; ) {
int length = bytes.readVolatileInt(lastByte);
int length2 = length30(length);
if (BinaryWire.isReady(length)) {
lastByte += 4;
buffer.write(bytes, lastByte, length2);
lastByte += length2;
offset.set(lastByte);
return;
}
Jvm.checkInterrupted();
}
}

@Override
public Bytes bytes() {
return bytes;
Expand Down Expand Up @@ -114,6 +134,7 @@ private void readHeader() throws IOException {

bytes.position(HEADER_OFFSET);
wire.readMetaData($ -> wire.read().readMarshallable(header));
firstBytes = bytes.position();
}

private void waitForTheHeaderToBeBuilt(Bytes bytes) throws IOException {
Expand Down Expand Up @@ -210,4 +231,8 @@ public void close() throws IOException {
throw new UnsupportedOperationException();
}

@Override
public long firstBytes() {
return firstBytes;
}
}
Expand Up @@ -3,6 +3,7 @@
import net.openhft.chronicle.queue.impl.DirectChronicle;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.lang.io.Bytes;
import net.openhft.lang.io.DirectStore;
import org.junit.Test;

import java.io.File;
Expand All @@ -11,25 +12,38 @@
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicLong;

/**
* using direct chronicle to send a string
*/
/*
Threads: 1 - Write rate 24.4 M/s - Read rate 34.1 M/s
Threads: 2 - Write rate 31.8 M/s - Read rate 59.3 M/s
Threads: 3 - Write rate 47.3 M/s - Read rate 90.1 M/s
Threads: 4 - Write rate 62.8 M/s - Read rate 117.9 M/s
Threads: 5 - Write rate 77.5 M/s - Read rate 145.6 M/s
Threads: 6 - Write rate 92.0 M/s - Read rate 161.0 M/s
Threads: 7 - Write rate 107.2 M/s - Read rate 196.5 M/s
Threads: 8 - Write rate 120.2 M/s - Read rate 221.3 M/s
Threads: 9 - Write rate 136.8 M/s - Read rate 244.3 M/s
Threads: 10 - Write rate 143.6 M/s - Read rate 268.7 M/s
Threads: 11 - Write rate 161.7 M/s - Read rate 260.8 M/s
*/
public class DirectChronicleStringTest {

public static final int RUNS = 1000000;
public static final String EXPECTED_STRING = "Hello World23456789012345678901234567890";
public static final byte[] EXPECTED_BYTES = EXPECTED_STRING.getBytes();

@Test
public void testCreateAppender() throws Exception {
for (int r = 0; r < 2; r++) {
for (int t = 1; t <= Runtime.getRuntime().availableProcessors(); t++) {
for (int t = 1; t < Runtime.getRuntime().availableProcessors(); t++) {
List<Future<?>> futureList = new ArrayList<>();
long start = System.nanoTime();
for (int j = 0; j < 4; j++) {
String name = "single" + start + "-" + j + ".q";
for (int j = 0; j < t; j++) {
String name = "/tmp/single" + start + "-" + j + ".q";
new File(name).deleteOnExit();
DirectChronicle chronicle = (DirectChronicle) new ChronicleQueueBuilder(name)
.build();
Expand All @@ -44,8 +58,8 @@ public void testCreateAppender() throws Exception {
}
futureList.clear();
long mid = System.nanoTime();
for (int j = 0; j < 4; j++) {
String name = "single" + start + "-" + j + ".q";
for (int j = 0; j < t; j++) {
String name = "/tmp/single" + start + "-" + j + ".q";
new File(name).deleteOnExit();
DirectChronicle chronicle = (DirectChronicle) new ChronicleQueueBuilder(name)
.build();
Expand All @@ -65,17 +79,20 @@ public void testCreateAppender() throws Exception {
}

private void readSome(DirectChronicle chronicle) throws IOException {
final Bytes bytes = chronicle.bytes();
final Bytes toRead = DirectStore.allocate(EXPECTED_BYTES.length).bytes();
AtomicLong offset = new AtomicLong(chronicle.firstBytes());
for (int i = 0; i < RUNS; i++) {
assertEquals(EXPECTED_STRING, bytes.readUTF());
toRead.clear();
chronicle.readDocument(offset, toRead);
}
}

private void writeSome(DirectChronicle chronicle) throws IOException {

final Bytes bytes = chronicle.bytes();
final Bytes toWrite = DirectStore.allocate(EXPECTED_BYTES.length).bytes();
toWrite.write(EXPECTED_BYTES);
for (int i = 0; i < RUNS; i++) {
bytes.writeUTF(EXPECTED_STRING);
toWrite.clear();
chronicle.appendDocument(toWrite);
}
}

Expand Down

0 comments on commit 0fe82ce

Please sign in to comment.