Skip to content

Commit

Permalink
Add a ComponentBenchamrk and fix reading blocks as text()
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Mar 25, 2016
1 parent bf2a287 commit 031451f
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 18 deletions.
5 changes: 3 additions & 2 deletions pom.xml
Expand Up @@ -15,7 +15,8 @@
~ along with this program. If not, see <http://www.gnu.org/licenses />.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<additionalparam>-Xdoclint:none</additionalparam>
Expand Down Expand Up @@ -47,7 +48,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId>
<version>1.11.18</version>
<version>1.11.19-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
18 changes: 4 additions & 14 deletions src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java
Expand Up @@ -17,16 +17,14 @@

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.WriteBytesMarshallable;
import net.openhft.chronicle.core.util.ObjectUtils;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;

import java.io.StreamCorruptedException;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* The component that facilitates sequentially writing data to a {@link ChronicleQueue}.
Expand Down Expand Up @@ -82,16 +80,8 @@ default void writeBytes(long index, Bytes<?> bytes) throws StreamCorruptedExcept
*/
int cycle();

default <T> T methodWriter(Class<T> tClass, Class... iClasses) {
Class[] interfaces;
if (iClasses.length == 0) {
interfaces = new Class[]{tClass};
} else {
List<Class> classes = new ArrayList<>();
classes.add(tClass);
Collections.addAll(classes, iClasses);
interfaces = classes.toArray(new Class[0]);
}
default <T> T methodWriter(Class<T> tClass, Class... additional) {
Class[] interfaces = ObjectUtils.addAll(tClass, additional);

//noinspection unchecked
return (T) Proxy.newProxyInstance(tClass.getClassLoader(), interfaces, (proxy, method, args) -> {
Expand All @@ -112,7 +102,7 @@ default <T> T methodWriter(Class<T> tClass, Class... iClasses) {
throw new UnsupportedOperationException();
}
}
return (Void) null;
return ObjectUtils.defaultValue(method.getReturnType());
});
}
}
12 changes: 10 additions & 2 deletions src/main/java/net/openhft/chronicle/queue/MethodReader.java 100644 → 100755
Expand Up @@ -22,17 +22,25 @@ public class MethodReader {

public MethodReader(ExcerptTailer tailer, Object... objects) {
this.tailer = tailer;
WireParselet defaultParselet = (s, v, $) -> LOGGER.warn("Unknown message " + s + ' ' + v.text());
WireParselet defaultParselet = (s, v, $) ->
LOGGER.warn("Unknown message " + s + ' ' + v.text());
if (objects[0] instanceof WireParselet)
defaultParselet = (WireParselet) objects[0];
wireParser = WireParser.wireParser(defaultParselet);

Set<String> methodsHandled = new HashSet<>();
for (Object o : objects) {
for (Method m : o.getClass().getMethods()) {
if (m.getDeclaringClass() == Object.class || Modifier.isStatic(m.getModifiers()))
if (Modifier.isStatic(m.getModifiers()))
continue;

try {
Object.class.getMethod(m.getName(), m.getParameterTypes());
continue;
} catch (NoSuchMethodException e) {
// not an Object method.
}

if (!methodsHandled.add(m.getName()))
continue;

Expand Down
@@ -0,0 +1,98 @@
package net.openhft.chronicle.queue;

import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.core.util.ObjectUtils;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.AbstractMarshallable;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Created by Peter on 25/03/2016.
*/
public class MessageReaderWriterTest {

@Test
public void testWriteWhileReading() {
ClassAliasPool.CLASS_ALIASES.addAlias(Message1.class, "M1");
ClassAliasPool.CLASS_ALIASES.addAlias(Message2.class, "M2");

String path = OS.TARGET + "/testWriteWhileReading-" + System.nanoTime() + "-";

try (SingleChronicleQueue queue1 = SingleChronicleQueueBuilder.binary(path + "1").build();
SingleChronicleQueue queue2 = SingleChronicleQueueBuilder.binary(path + "2").build()) {
MethodReader reader2 = queue1.createTailer().methodReader(ObjectUtils.printAll(MessageListener.class));
MessageListener writer2 = queue2.createAppender().methodWriter(MessageListener.class);
MessageListener processor = new MessageProcessor(writer2);
MethodReader reader1 = queue1.createTailer().methodReader(processor);
MessageListener writer1 = queue1.createAppender().methodWriter(MessageListener.class);

for (int i = 0; i < 3; i++) {
// write a message
writer1.method1(new Message1("hello"));
writer1.method2(new Message2(234));

// read those messages
assertTrue(reader1.readOne());
assertTrue(reader1.readOne());
// System.out.println(queue1.dump());
assertFalse(reader1.readOne());

// read the produced messages
assertTrue(reader2.readOne());
assertTrue(reader2.readOne());
assertFalse(reader2.readOne());
}
} finally {
IOTools.shallowDeleteDirWithFiles(path + "1");
IOTools.shallowDeleteDirWithFiles(path + "2");
}
}

interface MessageListener {
void method1(Message1 message);

void method2(Message2 message);
}

static class Message1 extends AbstractMarshallable {
String text;

public Message1(String text) {
this.text = text;
}
}

static class Message2 extends AbstractMarshallable {
long number;

public Message2(long number) {
this.number = number;
}
}

static class MessageProcessor implements MessageListener {
private final MessageListener writer2;

public MessageProcessor(MessageListener writer2) {
this.writer2 = writer2;
}

@Override
public void method1(Message1 message) {
message.text += "-processed";
writer2.method1(message);
}

@Override
public void method2(Message2 message) {
message.number += 1000;
writer2.method2(message);
}
}
}

0 comments on commit 031451f

Please sign in to comment.