From 031451ffbe4c160e1570f1739eb6b0bbedd46b10 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 25 Mar 2016 14:51:10 +0000 Subject: [PATCH] Add a ComponentBenchamrk and fix reading blocks as text() --- pom.xml | 5 +- .../chronicle/queue/ExcerptAppender.java | 18 +--- .../openhft/chronicle/queue/MethodReader.java | 12 ++- .../queue/MessageReaderWriterTest.java | 98 +++++++++++++++++++ 4 files changed, 115 insertions(+), 18 deletions(-) mode change 100644 => 100755 src/main/java/net/openhft/chronicle/queue/MethodReader.java create mode 100755 src/test/java/net/openhft/chronicle/queue/MessageReaderWriterTest.java diff --git a/pom.xml b/pom.xml index 045fc60f84..cf85f59f3d 100755 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,8 @@ ~ along with this program. If not, see . --> - + 4.0.0 -Xdoclint:none @@ -47,7 +48,7 @@ net.openhft chronicle-bom - 1.11.18 + 1.11.19-SNAPSHOT pom import diff --git a/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java b/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java index 14b0c5731a..5177303b73 100755 --- a/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java +++ b/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java @@ -17,6 +17,7 @@ import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.WriteBytesMarshallable; +import net.openhft.chronicle.core.util.ObjectUtils; import net.openhft.chronicle.wire.DocumentContext; import net.openhft.chronicle.wire.ValueOut; import net.openhft.chronicle.wire.WriteMarshallable; @@ -24,9 +25,6 @@ import java.io.StreamCorruptedException; import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; /** * The component that facilitates sequentially writing data to a {@link ChronicleQueue}. @@ -82,16 +80,8 @@ default void writeBytes(long index, Bytes bytes) throws StreamCorruptedExcept */ int cycle(); - default T methodWriter(Class tClass, Class... iClasses) { - Class[] interfaces; - if (iClasses.length == 0) { - interfaces = new Class[]{tClass}; - } else { - List classes = new ArrayList<>(); - classes.add(tClass); - Collections.addAll(classes, iClasses); - interfaces = classes.toArray(new Class[0]); - } + default T methodWriter(Class tClass, Class... additional) { + Class[] interfaces = ObjectUtils.addAll(tClass, additional); //noinspection unchecked return (T) Proxy.newProxyInstance(tClass.getClassLoader(), interfaces, (proxy, method, args) -> { @@ -112,7 +102,7 @@ default T methodWriter(Class tClass, Class... iClasses) { throw new UnsupportedOperationException(); } } - return (Void) null; + return ObjectUtils.defaultValue(method.getReturnType()); }); } } diff --git a/src/main/java/net/openhft/chronicle/queue/MethodReader.java b/src/main/java/net/openhft/chronicle/queue/MethodReader.java old mode 100644 new mode 100755 index 269c451b30..3252decdd6 --- a/src/main/java/net/openhft/chronicle/queue/MethodReader.java +++ b/src/main/java/net/openhft/chronicle/queue/MethodReader.java @@ -22,7 +22,8 @@ public class MethodReader { public MethodReader(ExcerptTailer tailer, Object... objects) { this.tailer = tailer; - WireParselet defaultParselet = (s, v, $) -> LOGGER.warn("Unknown message " + s + ' ' + v.text()); + WireParselet defaultParselet = (s, v, $) -> + LOGGER.warn("Unknown message " + s + ' ' + v.text()); if (objects[0] instanceof WireParselet) defaultParselet = (WireParselet) objects[0]; wireParser = WireParser.wireParser(defaultParselet); @@ -30,9 +31,16 @@ public MethodReader(ExcerptTailer tailer, Object... objects) { Set methodsHandled = new HashSet<>(); for (Object o : objects) { for (Method m : o.getClass().getMethods()) { - if (m.getDeclaringClass() == Object.class || Modifier.isStatic(m.getModifiers())) + if (Modifier.isStatic(m.getModifiers())) continue; + try { + Object.class.getMethod(m.getName(), m.getParameterTypes()); + continue; + } catch (NoSuchMethodException e) { + // not an Object method. + } + if (!methodsHandled.add(m.getName())) continue; diff --git a/src/test/java/net/openhft/chronicle/queue/MessageReaderWriterTest.java b/src/test/java/net/openhft/chronicle/queue/MessageReaderWriterTest.java new file mode 100755 index 0000000000..3ce7ddde19 --- /dev/null +++ b/src/test/java/net/openhft/chronicle/queue/MessageReaderWriterTest.java @@ -0,0 +1,98 @@ +package net.openhft.chronicle.queue; + +import net.openhft.chronicle.core.OS; +import net.openhft.chronicle.core.io.IOTools; +import net.openhft.chronicle.core.pool.ClassAliasPool; +import net.openhft.chronicle.core.util.ObjectUtils; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; +import net.openhft.chronicle.wire.AbstractMarshallable; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Created by Peter on 25/03/2016. + */ +public class MessageReaderWriterTest { + + @Test + public void testWriteWhileReading() { + ClassAliasPool.CLASS_ALIASES.addAlias(Message1.class, "M1"); + ClassAliasPool.CLASS_ALIASES.addAlias(Message2.class, "M2"); + + String path = OS.TARGET + "/testWriteWhileReading-" + System.nanoTime() + "-"; + + try (SingleChronicleQueue queue1 = SingleChronicleQueueBuilder.binary(path + "1").build(); + SingleChronicleQueue queue2 = SingleChronicleQueueBuilder.binary(path + "2").build()) { + MethodReader reader2 = queue1.createTailer().methodReader(ObjectUtils.printAll(MessageListener.class)); + MessageListener writer2 = queue2.createAppender().methodWriter(MessageListener.class); + MessageListener processor = new MessageProcessor(writer2); + MethodReader reader1 = queue1.createTailer().methodReader(processor); + MessageListener writer1 = queue1.createAppender().methodWriter(MessageListener.class); + + for (int i = 0; i < 3; i++) { + // write a message + writer1.method1(new Message1("hello")); + writer1.method2(new Message2(234)); + + // read those messages + assertTrue(reader1.readOne()); + assertTrue(reader1.readOne()); +// System.out.println(queue1.dump()); + assertFalse(reader1.readOne()); + + // read the produced messages + assertTrue(reader2.readOne()); + assertTrue(reader2.readOne()); + assertFalse(reader2.readOne()); + } + } finally { + IOTools.shallowDeleteDirWithFiles(path + "1"); + IOTools.shallowDeleteDirWithFiles(path + "2"); + } + } + + interface MessageListener { + void method1(Message1 message); + + void method2(Message2 message); + } + + static class Message1 extends AbstractMarshallable { + String text; + + public Message1(String text) { + this.text = text; + } + } + + static class Message2 extends AbstractMarshallable { + long number; + + public Message2(long number) { + this.number = number; + } + } + + static class MessageProcessor implements MessageListener { + private final MessageListener writer2; + + public MessageProcessor(MessageListener writer2) { + this.writer2 = writer2; + } + + @Override + public void method1(Message1 message) { + message.text += "-processed"; + writer2.method1(message); + } + + @Override + public void method2(Message2 message) { + message.number += 1000; + writer2.method2(message); + } + } +}