From ee3c6637607678b7e8546f3f5a11362be79ab75e Mon Sep 17 00:00:00 2001 From: Artem Labazin Date: Sat, 8 Sep 2018 00:25:14 +0300 Subject: [PATCH 1/5] Add atom cache --- .../io/appulse/encon/common/LruCache.java | 42 ++++++ .../java/io/appulse/encon/terms/Erlang.java | 6 +- .../io/appulse/encon/terms/ErlangTerm.java | 2 +- .../encon/terms/term/CollectionTerm.java | 4 +- .../appulse/encon/terms/type/ErlangAtom.java | 120 ++++++++++++++---- .../appulse/encon/terms/type/ErlangPid.java | 3 +- .../appulse/encon/terms/type/ErlangPort.java | 3 +- .../encon/terms/type/ErlangReference.java | 3 +- 8 files changed, 148 insertions(+), 35 deletions(-) create mode 100644 encon-common/src/main/java/io/appulse/encon/common/LruCache.java diff --git a/encon-common/src/main/java/io/appulse/encon/common/LruCache.java b/encon-common/src/main/java/io/appulse/encon/common/LruCache.java new file mode 100644 index 0000000..e718862 --- /dev/null +++ b/encon-common/src/main/java/io/appulse/encon/common/LruCache.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.encon.common; + +import java.util.LinkedHashMap; +import java.util.Map.Entry; + +/** + * + * @since 1.6.2 + * @author Artem Labazin + */ +public class LruCache extends LinkedHashMap { + + private static final long serialVersionUID = -1100634446524987320L; + + private final int maxSize; + + public LruCache (int maxSize) { + super(maxSize + 1, 1.0F, true); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry (Entry eldest) { + return size() > maxSize; + } +} diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java b/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java index 6ccc046..80568ee 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java @@ -53,7 +53,7 @@ public final class Erlang { /** * Cached enpty (0-size string) {@link ErlangAtom} instance. */ - public static final ErlangAtom EMPTY_ATOM = new ErlangAtom(""); + public static final ErlangAtom EMPTY_ATOM = ErlangAtom.cached(""); /** * Creates new {@link ErlangAtom} instance from {@code boolean} (true/false value) . @@ -63,7 +63,7 @@ public final class Erlang { * @return {@link ErlangAtom} new instance */ public static ErlangAtom atom (boolean value) { - return new ErlangAtom(value); + return ErlangAtom.cached(value); } /** @@ -74,7 +74,7 @@ public static ErlangAtom atom (boolean value) { * @return {@link ErlangAtom} new instance */ public static ErlangAtom atom (@NonNull String value) { - return new ErlangAtom(value); + return ErlangAtom.cached(value); } /** diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java index 38d64cb..ddb368e 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java @@ -133,7 +133,7 @@ public static T newInstance (@NonNull ByteBuf buffer) { case SMALL_ATOM_UTF8: case ATOM: case SMALL_ATOM: - return (T) new ErlangAtom(type, buffer); + return (T) ErlangAtom.cached(type, buffer); default: val message = String.format("Unknown term type %s (%d)", type.name(), typeByte); throw new ErlangTermDecodeException(message); diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java b/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java index 77bcad1..001a946 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java @@ -28,8 +28,8 @@ import java.util.Map; import java.util.Optional; +import io.appulse.encon.terms.Erlang; import io.appulse.encon.terms.ErlangTerm; -import io.appulse.encon.terms.type.ErlangAtom; import io.appulse.encon.terms.type.ErlangList; import io.appulse.encon.terms.type.ErlangMap; import io.appulse.encon.terms.type.ErlangTuple; @@ -212,7 +212,7 @@ default ErlangTerm getUnsafe (int index) { * key term. {@link Optional#empty} otherwise. */ default Optional getByAtom (@NonNull String fieldName) { - val term = new ErlangAtom(fieldName); + val term = Erlang.atom(fieldName); return get(term); } diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java index 7517984..e411482 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java @@ -21,21 +21,25 @@ import static io.appulse.encon.terms.TermType.SMALL_ATOM_UTF8; import static java.nio.charset.StandardCharsets.ISO_8859_1; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Locale.ENGLISH; import static lombok.AccessLevel.PRIVATE; import java.nio.charset.Charset; +import java.util.Arrays; +import io.appulse.encon.common.LruCache; import io.appulse.encon.terms.ErlangTerm; import io.appulse.encon.terms.TermType; import io.appulse.encon.terms.exception.IllegalErlangTermTypeException; import io.netty.buffer.ByteBuf; +import io.netty.util.ByteProcessor; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.NonNull; import lombok.ToString; import lombok.experimental.FieldDefaults; import lombok.experimental.NonFinal; -import lombok.val; /** * An atom is a literal, a constant with name. An atom is to be enclosed in @@ -53,44 +57,56 @@ public class ErlangAtom extends ErlangTerm { private static final long serialVersionUID = -2748345367418129439L; - private static final byte[] TRUE_BYTES = Boolean.TRUE.toString().getBytes(UTF_8); - - private static final byte[] FALSE_BYTES = Boolean.FALSE.toString().getBytes(UTF_8); - private static final int MAX_ATOM_CODE_POINTS_LENGTH = 255; private static final int MAX_SMALL_ATOM_BYTES_LENGTH = 255; - @NonFinal - String value; + private static final LruCache CACHE = new LruCache<>(100); - byte[] bytes; + private static final ErlangAtom ATOM_TRUE = cached(Boolean.TRUE.toString().toLowerCase(ENGLISH)); - transient Charset charset; + private static final ErlangAtom ATOM_FALSE = cached(Boolean.FALSE.toString().toLowerCase(ENGLISH)); + + public static ErlangAtom cached (boolean value) { + return value + ? ATOM_TRUE + : ATOM_FALSE; + } + + public static ErlangAtom cached (String value) { + Charset charset = UTF_8; + byte[] bytes = value.getBytes(charset); + int hashCode = Arrays.hashCode(bytes); + return CACHE.computeIfAbsent(hashCode, key -> new ErlangAtom(value, charset, bytes)); + } - /** - * Constructs Erlang's term object with specific {@link TermType} from {@link ByteBuf}. - * - * @param type object's type - * - * @param buffer byte buffer - */ @SuppressWarnings("deprecation") - public ErlangAtom (TermType type, @NonNull ByteBuf buffer) { - super(type); + public static ErlangAtom cached (TermType type, ByteBuf buffer) { + ByteArrayHashCode byteProcessor = new ByteArrayHashCode(); - val length = type == SMALL_ATOM || type == SMALL_ATOM_UTF8 + int length = type == SMALL_ATOM || type == SMALL_ATOM_UTF8 ? buffer.readUnsignedByte() : buffer.readUnsignedShort(); - charset = type == SMALL_ATOM_UTF8 || type == ATOM_UTF8 - ? UTF_8 - : ISO_8859_1; + buffer.forEachByte(buffer.readerIndex(), length, byteProcessor); - bytes = new byte[length]; - buffer.readBytes(bytes); + return CACHE.compute(byteProcessor.getHashCode(), (key, value) -> { + if (value == null) { + return new ErlangAtom(type, buffer, length); + } else { + buffer.skipBytes(length); + return value; + } + }); } + @NonFinal + String value; + + byte[] bytes; + + transient Charset charset; + /** * Constructs Erlang's atom object with specific {@link String} value. * @@ -124,8 +140,47 @@ public ErlangAtom (boolean value) { charset = UTF_8; this.value = Boolean.toString(value); bytes = value - ? TRUE_BYTES - : FALSE_BYTES; + ? Boolean.TRUE.toString().getBytes(charset) + : Boolean.FALSE.toString().getBytes(charset); + } + + /** + * Constructs Erlang's term object with specific {@link TermType} from {@link ByteBuf}. + * + * @param type object's type + * + * @param buffer byte buffer + * + * @param length amount of useful bytes + */ + private ErlangAtom (TermType type, ByteBuf buffer, int length) { + super(type); + + charset = type == SMALL_ATOM_UTF8 || type == ATOM_UTF8 + ? UTF_8 + : ISO_8859_1; + + bytes = new byte[length]; + buffer.readBytes(bytes); + } + + @SuppressWarnings("PMD.ArrayIsStoredDirectly") + private ErlangAtom (String value, Charset charset, byte[] bytes) { + super(); + + this.value = value.codePointCount(0, value.length()) <= MAX_ATOM_CODE_POINTS_LENGTH + ? value + // Throwing an exception would be better I think, but truncation + // seems to be the way it has been done in other parts of OTP... + : new String(value.codePoints().toArray(), 0, MAX_ATOM_CODE_POINTS_LENGTH); + + this.charset = charset; + this.bytes = bytes; + if (bytes.length > MAX_SMALL_ATOM_BYTES_LENGTH) { + setType(ATOM_UTF8); + } else { + setType(SMALL_ATOM_UTF8); + } } @Override @@ -175,4 +230,17 @@ protected void serialize (ByteBuf buffer) { } buffer.writeBytes(bytes); } + + @Getter + @FieldDefaults(level = PRIVATE) + private static class ByteArrayHashCode implements ByteProcessor { + + int hashCode = 1; + + @Override + public boolean process (byte value) throws Exception { + hashCode = 31 * hashCode + value; + return true; + } + } } diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java index 9f6da5e..efc783f 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java @@ -21,6 +21,7 @@ import static lombok.AccessLevel.PRIVATE; import io.appulse.encon.common.NodeDescriptor; +import io.appulse.encon.terms.Erlang; import io.appulse.encon.terms.ErlangTerm; import io.appulse.encon.terms.TermType; import io.appulse.encon.terms.exception.IllegalErlangTermTypeException; @@ -87,7 +88,7 @@ public ErlangPid (TermType type, @NonNull ByteBuf buffer) { @Builder private ErlangPid (TermType type, @NonNull String node, int id, int serial, int creation) { super(ofNullable(type).orElse(PID)); - this.node = new ErlangAtom(node); + this.node = Erlang.atom(node); if (getType() == PID) { this.id = id & 0x7FFF; // 15 bits diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java index 2ff68a0..056e7b0 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java @@ -21,6 +21,7 @@ import static lombok.AccessLevel.PRIVATE; import io.appulse.encon.common.NodeDescriptor; +import io.appulse.encon.terms.Erlang; import io.appulse.encon.terms.ErlangTerm; import io.appulse.encon.terms.TermType; import io.appulse.encon.terms.exception.IllegalErlangTermTypeException; @@ -94,7 +95,7 @@ public ErlangPort (TermType type, @NonNull ByteBuf buffer) { @Builder private ErlangPort (TermType type, @NonNull String node, int id, int creation) { super(ofNullable(type).orElse(PORT)); - this.node = new ErlangAtom(node); + this.node = Erlang.atom(node); if (getType() == PORT) { this.id = id & 0xFFFFFFF; // 28 bits diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java index ddd5c63..646c671 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java @@ -23,6 +23,7 @@ import java.util.stream.LongStream; import io.appulse.encon.common.NodeDescriptor; +import io.appulse.encon.terms.Erlang; import io.appulse.encon.terms.ErlangTerm; import io.appulse.encon.terms.TermType; import io.appulse.encon.terms.exception.IllegalErlangTermTypeException; @@ -108,7 +109,7 @@ public ErlangReference (TermType type, @NonNull ByteBuf buffer) { private ErlangReference (TermType type, @NonNull String node, long id, long[] ids, int creation) { super(ofNullable(type).orElse(NEW_REFERENCE)); - this.node = new ErlangAtom(node); + this.node = Erlang.atom(node); this.ids = ofNullable(ids) .map(it -> it.clone()) .map(it -> { From a6f2c4e4c46c056ebf781d815a3628a817a87507 Mon Sep 17 00:00:00 2001 From: Artem Labazin Date: Sat, 8 Sep 2018 00:42:06 +0300 Subject: [PATCH 2/5] Increase atom cache --- .../src/main/java/io/appulse/encon/terms/type/ErlangAtom.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java index e411482..d5f2d62 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java @@ -61,7 +61,7 @@ public class ErlangAtom extends ErlangTerm { private static final int MAX_SMALL_ATOM_BYTES_LENGTH = 255; - private static final LruCache CACHE = new LruCache<>(100); + private static final LruCache CACHE = new LruCache<>(1000); private static final ErlangAtom ATOM_TRUE = cached(Boolean.TRUE.toString().toLowerCase(ENGLISH)); From f6edb3b0092951deed1653425db4104105d9fb76 Mon Sep 17 00:00:00 2001 From: Artem Labazin Date: Sat, 8 Sep 2018 00:42:54 +0300 Subject: [PATCH 3/5] Constant time access for TermType values --- .../java/io/appulse/encon/terms/TermType.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java b/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java index feed999..85b4c73 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java @@ -16,8 +16,13 @@ package io.appulse.encon.terms; +import static java.util.stream.Collectors.toMap; import static lombok.AccessLevel.PRIVATE; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Stream; + import io.appulse.encon.terms.type.ErlangAtom; import io.appulse.encon.terms.type.ErlangBinary; import io.appulse.encon.terms.type.ErlangBitString; @@ -36,7 +41,6 @@ import lombok.Getter; import lombok.experimental.FieldDefaults; -import lombok.val; /** * Enumeration of all available term types. @@ -945,6 +949,13 @@ public enum TermType { */ UNKNOWN(-1); + private static final Map VALUES; + + static { + VALUES = Stream.of(TermType.values()) + .collect(toMap(it -> (int) it.getCode(), Function.identity())); + } + @Getter byte code; @@ -967,12 +978,7 @@ public enum TermType { * * @return parsed {@link TermType} instance */ - public static TermType of (byte code) { - for (val type : values()) { - if (type.getCode() == code) { - return type; - } - } - return UNKNOWN; + public static TermType of (int code) { + return VALUES.getOrDefault(code, UNKNOWN); } } From cad5d9172c42dd56de2c21152dab85baa5f091e5 Mon Sep 17 00:00:00 2001 From: Artem Labazin Date: Sat, 8 Sep 2018 01:15:06 +0300 Subject: [PATCH 4/5] Cache pid --- .../io/appulse/encon/terms/ErlangTerm.java | 2 +- .../appulse/encon/terms/type/ErlangAtom.java | 2 +- .../appulse/encon/terms/type/ErlangPid.java | 45 +++++++++++++++++++ 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java index ddb368e..1ca8da3 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java @@ -108,7 +108,7 @@ public static T newInstance (@NonNull ByteBuf buffer) { return (T) new ErlangPort(type, buffer); case PID: case NEW_PID: - return (T) new ErlangPid(type, buffer); + return (T) ErlangPid.cached(type, buffer); case SMALL_TUPLE: case LARGE_TUPLE: return (T) new ErlangTuple(type, buffer); diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java index d5f2d62..e8138b7 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java @@ -61,7 +61,7 @@ public class ErlangAtom extends ErlangTerm { private static final int MAX_SMALL_ATOM_BYTES_LENGTH = 255; - private static final LruCache CACHE = new LruCache<>(1000); + private static final LruCache CACHE = new LruCache<>(1000); private static final ErlangAtom ATOM_TRUE = cached(Boolean.TRUE.toString().toLowerCase(ENGLISH)); diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java index efc783f..cf2abd9 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java @@ -17,9 +17,12 @@ package io.appulse.encon.terms.type; import static io.appulse.encon.terms.TermType.PID; +import static io.appulse.encon.terms.TermType.SMALL_ATOM; +import static io.appulse.encon.terms.TermType.SMALL_ATOM_UTF8; import static java.util.Optional.ofNullable; import static lombok.AccessLevel.PRIVATE; +import io.appulse.encon.common.LruCache; import io.appulse.encon.common.NodeDescriptor; import io.appulse.encon.terms.Erlang; import io.appulse.encon.terms.ErlangTerm; @@ -27,6 +30,7 @@ import io.appulse.encon.terms.exception.IllegalErlangTermTypeException; import io.netty.buffer.ByteBuf; +import io.netty.util.ByteProcessor; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -47,6 +51,34 @@ public class ErlangPid extends ErlangTerm { private static final long serialVersionUID = 7083159089429831665L; + private static final LruCache CACHE = new LruCache<>(1000); + + @SuppressWarnings("deprecation") + public static ErlangPid cached (TermType type, ByteBuf buffer) { + int index = buffer.readerIndex(); + byte nodeNameType = buffer.readByte(); + int nodeNameLength = nodeNameType == SMALL_ATOM.getCode() || nodeNameType == SMALL_ATOM_UTF8.getCode() + ? buffer.readUnsignedByte() + : buffer.readUnsignedShort(); + + int length = type == PID + ? nodeNameLength + 9 + : nodeNameLength + 12; + + ByteArrayHashCode byteProcessor = new ByteArrayHashCode(); + buffer.forEachByte(buffer.readerIndex(), length, byteProcessor); + + return CACHE.compute(byteProcessor.getHashCode(), (key, value) -> { + if (value == null) { + buffer.readerIndex(index); + return new ErlangPid(type, buffer); + } else { + buffer.skipBytes(length); + return value; + } + }); + } + @NonFinal NodeDescriptor descriptor; @@ -151,4 +183,17 @@ protected void serialize (ByteBuf buffer) { throw new IllegalErlangTermTypeException(getClass(), getType()); } } + + @Getter + @FieldDefaults(level = PRIVATE) + private static class ByteArrayHashCode implements ByteProcessor { + + int hashCode = 1; + + @Override + public boolean process (byte value) throws Exception { + hashCode = 31 * hashCode + value; + return true; + } + } } From cc4e2f2876c92e496a85e8898b74ccd65bb7fef8 Mon Sep 17 00:00:00 2001 From: Artem Labazin Date: Mon, 10 Sep 2018 20:35:34 +0300 Subject: [PATCH 5/5] Speed up performance --- CHANGELOG.md | 8 + README.md | 4 +- benchmark/README.md | 26 ++- benchmark/pom.xml | 2 +- .../encon/benchmark/EnconBenchmark.java | 186 ---------------- .../benchmark/Encon_Node2NodeBenchmarks.java | 159 +++++++++++++ .../benchmark/Encon_SimpleBenchmarks.java | 178 +++++++++++++++ .../encon/benchmark/JInterfaceBenchmark.java | 184 --------------- .../JInterface_Node2NodeBenchmarks.java | 138 ++++++++++++ .../JInterface_SimpleBenchmarks.java | 154 +++++++++++++ .../java/io/appulse/encon/benchmark/Main.java | 5 +- benchmark/src/main/resources/logback.xml | 24 ++ encon-common/pom.xml | 2 +- encon-config/README.md | 4 +- encon-config/pom.xml | 2 +- encon-databind/README.md | 8 +- encon-databind/pom.xml | 2 +- encon-handler/README.md | 4 +- encon-handler/pom.xml | 2 +- encon-spring/pom.xml | 2 +- encon-terms/pom.xml | 4 +- .../java/io/appulse/encon/terms/Erlang.java | 12 +- .../io/appulse/encon/terms/ErlangTerm.java | 2 +- .../encon/terms/type/ErlangInteger.java | 209 +++++++++++++++--- .../encon/terms/type/ErlangString.java | 2 +- encon/README.md | 4 +- encon/pom.xml | 2 +- .../java/io/appulse/encon/ModuleClient.java | 4 + .../io/appulse/encon/ModuleConnection.java | 6 + .../java/io/appulse/encon/ModuleServer.java | 10 +- .../connection/control/ControlMessage.java | 2 +- .../connection/regular/ConnectionHandler.java | 125 +++++++++-- .../encon/connection/regular/Message.java | 12 + .../connection/regular/MessageDecoder.java | 81 ------- .../connection/regular/MessageEncoder.java | 67 ------ .../connection/regular/RegularPipeline.java | 40 +--- .../connection/regular/TickTockHandler.java | 64 ------ examples/custom-queue/pom.xml | 2 +- examples/databind/pom.xml | 2 +- examples/echo-server-spring/pom.xml | 2 +- examples/echo-server/pom.xml | 2 +- examples/handler-advanced/pom.xml | 2 +- examples/handler-basic/pom.xml | 2 +- examples/load-config-spring/pom.xml | 2 +- examples/load-config/pom.xml | 2 +- examples/pom.xml | 2 +- examples/simple/pom.xml | 2 +- pom.xml | 89 ++++---- 48 files changed, 1100 insertions(+), 749 deletions(-) delete mode 100644 benchmark/src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java create mode 100644 benchmark/src/main/java/io/appulse/encon/benchmark/Encon_Node2NodeBenchmarks.java create mode 100644 benchmark/src/main/java/io/appulse/encon/benchmark/Encon_SimpleBenchmarks.java delete mode 100644 benchmark/src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java create mode 100644 benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_Node2NodeBenchmarks.java create mode 100644 benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_SimpleBenchmarks.java create mode 100644 benchmark/src/main/resources/logback.xml delete mode 100644 encon/src/main/java/io/appulse/encon/connection/regular/MessageDecoder.java delete mode 100644 encon/src/main/java/io/appulse/encon/connection/regular/MessageEncoder.java delete mode 100644 encon/src/main/java/io/appulse/encon/connection/regular/TickTockHandler.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 542ec23..e36e98d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Turn on checkstyle JavaDocs module. - Add updates to the protocol, like new `ControlMessage`. +## [1.6.3](https://github.com/appulse-projects/encon-java/releases/tag/1.6.3) - 2018-09-11 + +### Changed + +- Removed multi-handlers `netty` pipeline, use single one instead; +- Cache `atom` and `pid` values; +- Improve benchmarks. + ## [1.6.2](https://github.com/appulse-projects/encon-java/releases/tag/1.6.2) - 2018-09-06 ### Added diff --git a/README.md b/README.md index a091d79..ebcded2 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ $> mvn clean compile [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] -[INFO] encon 1.6.2 ........................................ SUCCESS [ 1.210 s] +[INFO] encon 1.6.3 ........................................ SUCCESS [ 1.210 s] [INFO] encon-common ....................................... SUCCESS [ 25.693 s] [INFO] encon-terms ........................................ SUCCESS [ 27.517 s] [INFO] encon-config ....................................... SUCCESS [ 18.707 s] @@ -64,7 +64,7 @@ $> mvn clean compile [INFO] handler-advanced ................................... SUCCESS [ 11.289 s] [INFO] load-config ........................................ SUCCESS [ 3.725 s] [INFO] load-config-spring ................................. SUCCESS [ 6.420 s] -[INFO] benchmark 1.6.2 .................................... SUCCESS [ 5.594 s] +[INFO] benchmark 1.6.3 .................................... SUCCESS [ 5.594 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ diff --git a/benchmark/README.md b/benchmark/README.md index 8a1f186..67c7610 100644 --- a/benchmark/README.md +++ b/benchmark/README.md @@ -22,9 +22,23 @@ | Benchmark | Mode | Cnt | Score | Error | Units | |----------------------------------------------------------------------------------------------------------------|-------|-----|------------:|------------:|-------| -| [EnconBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L103) | thrpt | 25 | 4611737.930 | ± 54595.578 | ops/s | -| [EnconBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L177) | thrpt | 25 | 13969.833 | ± 164.199 | ops/s | -| [EnconBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L167) | thrpt | 25 | 28028.764 | ± 262.720 | ops/s | -| [JInterfaceBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L99) | thrpt | 25 | 4370604.847 | ± 15769.278 | ops/s | -| [JInterfaceBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L175) | thrpt | 25 | 17196.521 | ± 113.159 | ops/s | -| [JInterfaceBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L165) | thrpt | 25 | 34529.556 | ± 207.977 | ops/s | +| [EnconBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L103) | thrpt | 25 | 4562837.232 | ± 48730.020 | ops/s | +| [EnconBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L177) | thrpt | 25 | 13744.084 | ± 160.906 | ops/s | +| [EnconBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L167) | thrpt | 25 | 27665.670 | ± 230.607 | ops/s | +| [JInterfaceBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L99) | thrpt | 25 | 4345167.985 | ± 22392.570 | ops/s | +| [JInterfaceBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L175) | thrpt | 25 | 13850.978 | ± 126.660 | ops/s | +| [JInterfaceBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L165) | thrpt | 25 | 27590.545 | ± 253.874 | ops/s | + +How to run the benchmarks: + +```bash +$> mvn clean package \ + -DskipTests \ + -Dgpg.skip \ + -Dfindbugs.skip=true \ + -Dpmd.skip=true \ + -Dcheckstyle.skip \ + -Dmaven.test.skip=true \ + -pl benchmark -am; and \ + java -Xms1G -Xmx2G -jar benchmark/target/benchmarks.jar +``` diff --git a/benchmark/pom.xml b/benchmark/pom.xml index 7de3aa8..3da57f9 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 benchmark diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java b/benchmark/src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java deleted file mode 100644 index e03d194..0000000 --- a/benchmark/src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.encon.benchmark; - -import static io.appulse.encon.terms.Erlang.atom; -import static io.appulse.encon.terms.Erlang.binary; -import static io.appulse.encon.terms.Erlang.list; -import static io.appulse.encon.terms.Erlang.number; -import static io.appulse.encon.terms.Erlang.string; -import static io.appulse.encon.terms.Erlang.tuple; -import static java.lang.Boolean.TRUE; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.openjdk.jmh.annotations.Level.Trial; -import static org.openjdk.jmh.annotations.Mode.Throughput; -import static org.openjdk.jmh.annotations.Scope.Thread; - -import io.appulse.encon.Node; -import io.appulse.encon.Nodes; -import io.appulse.encon.config.NodeConfig; -import io.appulse.encon.connection.regular.Message; -import io.appulse.encon.mailbox.Mailbox; -import io.appulse.encon.terms.ErlangTerm; -import io.appulse.encon.terms.type.ErlangPid; - -import lombok.val; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -/** - * - * @since 1.6.0 - * @author Artem Labazin - */ -public class EnconBenchmark { - - @State(Thread) - public static class MailboxesState { - - @Setup(Trial) - public void setup () { - val config = NodeConfig.builder() - .shortName(TRUE) - .build(); - - node = Nodes.singleNode("node-" + System.nanoTime(), config); - - mailbox1 = node.mailbox().build(); - pid1 = mailbox1.getPid(); - - mailbox2 = node.mailbox().build(); - pid2 = mailbox2.getPid(); - - message = tuple( - atom("ok"), - tuple( - binary(new byte[] { 56, 16, 78 }), - number(42), - string("Hello world"), - list(number(1), number(2), number(3)) - ) - ); - } - - @TearDown(Trial) - public void tearDown () { - node.close(); - } - - Node node; - - Mailbox mailbox1; - - ErlangPid pid1; - - Mailbox mailbox2; - - ErlangPid pid2; - - ErlangTerm message; - } - - @Benchmark - @BenchmarkMode(Throughput) - @OutputTimeUnit(SECONDS) - public Message mailbox2mailbox (MailboxesState state) { - state.mailbox1.send(state.pid2, state.message); - - Message one = state.mailbox2.receive(); - state.mailbox2.send(state.pid1, one.getBody()); - - Message two = state.mailbox1.receive(); - return two; - } - - @State(Thread) - public static class NodesState { - - @Setup(Trial) - public void setup () { - val config = NodeConfig.builder() - .shortName(TRUE) - .build(); - - node1 = Nodes.singleNode("node-" + System.nanoTime(), config); - - mailbox1 = node1.mailbox().build(); - pid1 = mailbox1.getPid(); - - node2 = Nodes.singleNode("node-" + System.nanoTime(), config); - - mailbox2 = node2.mailbox().build(); - pid2 = mailbox2.getPid(); - - message = tuple( - atom("ok"), - tuple( - binary(new byte[] { 56, 16, 78 }), - number(42), - string("Hello world"), - list(number(1), number(2), number(3)) - ) - ); - } - - @TearDown(Trial) - public void tearDown () { - node1.close(); - node2.close(); - } - - Node node1; - - Mailbox mailbox1; - - ErlangPid pid1; - - Node node2; - - Mailbox mailbox2; - - ErlangPid pid2; - - ErlangTerm message; - } - - @Benchmark - @BenchmarkMode(Throughput) - @OutputTimeUnit(SECONDS) - public Message oneDirection (NodesState nodes) { - nodes.mailbox1.send(nodes.pid2, nodes.message); - - Message request = nodes.mailbox2.receive(); - return request; - } - - @Benchmark - @BenchmarkMode(Throughput) - @OutputTimeUnit(SECONDS) - public Message node2node (NodesState nodes) { - nodes.mailbox1.send(nodes.pid2, nodes.message); - - Message one = nodes.mailbox2.receive(); - nodes.mailbox2.send(nodes.pid1, one.getBody()); - - Message two = nodes.mailbox1.receive(); - return two; - } -} diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_Node2NodeBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_Node2NodeBenchmarks.java new file mode 100644 index 0000000..d510c21 --- /dev/null +++ b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_Node2NodeBenchmarks.java @@ -0,0 +1,159 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.encon.benchmark; + +import static io.appulse.encon.terms.Erlang.binary; +import static java.lang.Boolean.TRUE; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.openjdk.jmh.annotations.Level.Trial; +import static org.openjdk.jmh.annotations.Mode.Throughput; +import static org.openjdk.jmh.annotations.Scope.Benchmark; + +import java.util.stream.IntStream; + +import io.appulse.encon.Node; +import io.appulse.encon.Nodes; +import io.appulse.encon.config.NodeConfig; +import io.appulse.encon.config.ServerConfig; +import io.appulse.encon.mailbox.Mailbox; +import io.appulse.encon.terms.ErlangTerm; +import io.appulse.encon.terms.type.ErlangPid; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.infra.ThreadParams; + +/** + * + * @since 1.6.0 + * @author Artem Labazin + */ +@State(Benchmark) +@OutputTimeUnit(SECONDS) +@Warmup(iterations = 1) +@BenchmarkMode(Throughput) +@Measurement(iterations = 1) +public class Encon_Node2NodeBenchmarks { + + Node serverNode; + + Mailbox serverMailbox; + + ErlangPid serverMailboxPid; + + Thread serverThread; + + ErlangTerm data; + + Node clientNode; + + Mailbox[] clientMailboxes; + + @Setup(Trial) + public void setup () throws Exception { + serverNode = Nodes.singleNode("node-server-" + System.nanoTime(), NodeConfig.builder() + .shortName(TRUE) + .server(ServerConfig.builder() + .bossThreads(1) + .workerThreads(2) + .build() + ) + .build() + ); + + serverMailbox = serverNode.mailbox().build(); + serverMailboxPid = serverMailbox.getPid(); + data = binary(new byte[] { 1, 2, 3, 4, 5 }); + + serverThread = new Thread(() -> { + try { + while (!java.lang.Thread.interrupted()) { + ErlangTerm payload = serverMailbox.receive().getBody(); + serverMailbox.send(payload.asPid(), data); + } + } catch (Throwable ex) { + } + }); + serverThread.start(); + + clientNode = Nodes.singleNode("node-client-" + System.nanoTime(), NodeConfig.builder().shortName(TRUE) + .server(ServerConfig.builder() + .bossThreads(1) + .workerThreads(8) + .build() + ) + .build()); + + clientMailboxes = IntStream.range(0, 8) + .boxed() + .map(it -> clientNode.mailbox().build()) + .toArray(Mailbox[]::new); + } + + @TearDown(Trial) + public void tearDown () { + for (Mailbox mailbox : clientMailboxes) { + mailbox.close(); + } + clientNode.close(); + + serverMailbox.close(); + serverNode.close(); + + serverThread.interrupt(); + } + + @Threads(1) + @Benchmark + public void client_1 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + Mailbox mailbox = clientMailboxes[0]; + mailbox.send(serverMailboxPid, mailbox.getPid()); + blackHole.consume(mailbox.receive()); + } + + @Threads(2) + @Benchmark + public void clients_2 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + Mailbox mailbox = clientMailboxes[thredParams.getThreadIndex()]; + mailbox.send(serverMailboxPid, mailbox.getPid()); + blackHole.consume(mailbox.receive()); + } + + @Threads(4) + @Benchmark + public void clients_4 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + Mailbox mailbox = clientMailboxes[thredParams.getThreadIndex()]; + mailbox.send(serverMailboxPid, mailbox.getPid()); + blackHole.consume(mailbox.receive()); + } + + @Threads(8) + @Benchmark + public void clients_8 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + Mailbox mailbox = clientMailboxes[thredParams.getThreadIndex()]; + mailbox.send(serverMailboxPid, mailbox.getPid()); + blackHole.consume(mailbox.receive()); + } +} diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_SimpleBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_SimpleBenchmarks.java new file mode 100644 index 0000000..dd27743 --- /dev/null +++ b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_SimpleBenchmarks.java @@ -0,0 +1,178 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.encon.benchmark; + +import static io.appulse.encon.terms.Erlang.binary; +import static java.lang.Boolean.TRUE; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.openjdk.jmh.annotations.Level.Trial; +import static org.openjdk.jmh.annotations.Mode.Throughput; +import static org.openjdk.jmh.annotations.Scope.Benchmark; + +import io.appulse.encon.Node; +import io.appulse.encon.Nodes; +import io.appulse.encon.config.NodeConfig; +import io.appulse.encon.config.ServerConfig; +import io.appulse.encon.connection.regular.Message; +import io.appulse.encon.mailbox.Mailbox; +import io.appulse.encon.terms.ErlangTerm; +import io.appulse.encon.terms.type.ErlangPid; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * + * @since 1.6.0 + * @author Artem Labazin + */ +@OutputTimeUnit(SECONDS) +@Warmup(iterations = 1) +@BenchmarkMode(Throughput) +@Measurement(iterations = 1) +public class Encon_SimpleBenchmarks { + + @Benchmark + public void mailbox2mailboxAndBack (Mailbox2MailboxAndBackState state, Blackhole blackHole) { + state.mailbox1.send(state.pid2, state.data); + Message message = state.mailbox2.receive(); + state.mailbox2.send(state.pid1, message.getBody()); + blackHole.consume(state.mailbox1.receive()); + } + + @Benchmark + public void oneDirectionSend (OneDirectionSendState state, Blackhole blackHole) { + state.clientMailbox.send(state.serverMailboxPid, state.data); + blackHole.consume(state.serverMailbox.receive()); + } + + @State(Benchmark) + public static class Mailbox2MailboxAndBackState { + + Node node1; + + Mailbox mailbox1; + + ErlangPid pid1; + + Node node2; + + Mailbox mailbox2; + + ErlangPid pid2; + + ErlangTerm data; + + @Setup(Trial) + public void setup () throws Exception { + NodeConfig config = NodeConfig.builder() + .shortName(TRUE) + .server(ServerConfig.builder() + .bossThreads(1) + .workerThreads(1) + .build() + ) + .build(); + + node1 = Nodes.singleNode("node-1-" + System.nanoTime(), config); + mailbox1 = node1.mailbox().build(); + pid1 = mailbox1.getPid(); + + node2 = Nodes.singleNode("node-2-" + System.nanoTime(), config); + mailbox2 = node2.mailbox().build(); + pid2 = mailbox2.getPid(); + + data = binary(new byte[] { 1, 2, 3, 4, 5 }); + } + + @TearDown(Trial) + public void tearDown () { + mailbox1.close(); + node1.close(); + + mailbox2.close(); + node2.close(); + } + } + + @State(Benchmark) + public static class OneDirectionSendState { + + Node serverNode; + + Mailbox serverMailbox; + + ErlangPid serverMailboxPid; + + Thread serverThread; + + Node clientNode; + + Mailbox clientMailbox; + + ErlangTerm data; + + @Setup(Trial) + public void setup () throws Exception { + NodeConfig config = NodeConfig.builder() + .shortName(TRUE) + .server(ServerConfig.builder() + .bossThreads(1) + .workerThreads(1) + .build() + ) + .build(); + + serverNode = Nodes.singleNode("node-server-" + System.nanoTime(), config); + serverMailbox = serverNode.mailbox().build(); + serverMailboxPid = serverMailbox.getPid(); + + serverThread = new Thread(() -> { + try { + while (!java.lang.Thread.interrupted()) { + serverMailbox.receive(); + } + } catch (Throwable ex) { + } + }); + serverThread.start(); + + clientNode = Nodes.singleNode("node-client-" + System.nanoTime(), config); + clientMailbox = clientNode.mailbox().build(); + + data = binary(new byte[] { 1, 2, 3, 4, 5 }); + } + + @TearDown(Trial) + public void tearDown () { + clientMailbox.close(); + clientNode.close(); + + serverMailbox.close(); + serverNode.close(); + + serverThread.interrupt(); + } + } +} diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java deleted file mode 100644 index 43a881f..0000000 --- a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.encon.benchmark; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.openjdk.jmh.annotations.Level.Trial; -import static org.openjdk.jmh.annotations.Mode.Throughput; -import static org.openjdk.jmh.annotations.Scope.Thread; - -import com.ericsson.otp.erlang.OtpErlangAtom; -import com.ericsson.otp.erlang.OtpErlangBinary; -import com.ericsson.otp.erlang.OtpErlangInt; -import com.ericsson.otp.erlang.OtpErlangList; -import com.ericsson.otp.erlang.OtpErlangObject; -import com.ericsson.otp.erlang.OtpErlangPid; -import com.ericsson.otp.erlang.OtpErlangString; -import com.ericsson.otp.erlang.OtpErlangTuple; -import com.ericsson.otp.erlang.OtpMbox; -import com.ericsson.otp.erlang.OtpNode; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -/** - * - * @since 1.6.0 - * @author Artem Labazin - */ -public class JInterfaceBenchmark { - - @State(Thread) - public static class MailboxesState { - - @Setup(Trial) - public void setup () throws Exception { - node = new OtpNode("node-" + System.nanoTime() + "@localhost"); - - mailbox1 = node.createMbox(); - pid1 = mailbox1.self(); - - mailbox2 = node.createMbox(); - pid2 = mailbox2.self(); - - message = new OtpErlangTuple(new OtpErlangObject[] { - new OtpErlangAtom("ok"), - new OtpErlangTuple(new OtpErlangObject[] { - new OtpErlangBinary(new byte[] { 56, 16, 78 }), - new OtpErlangInt(42), - new OtpErlangString("Hello world"), - new OtpErlangList(new OtpErlangObject[] { - new OtpErlangInt(1), - new OtpErlangInt(2), - new OtpErlangInt(3) - }) - }) - }); - } - - @TearDown(Trial) - public void tearDown () { - mailbox1.close(); - mailbox2.close(); - node.close(); - } - - OtpNode node; - - OtpMbox mailbox1; - - OtpErlangPid pid1; - - OtpMbox mailbox2; - - OtpErlangPid pid2; - - OtpErlangObject message; - } - - @Benchmark - @BenchmarkMode(Throughput) - @OutputTimeUnit(SECONDS) - public OtpErlangObject mailbox2mailbox (MailboxesState state) throws Exception { - state.mailbox1.send(state.pid2, state.message); - - OtpErlangObject one = state.mailbox2.receive(); - state.mailbox2.send(state.pid1, one); - - OtpErlangObject two = state.mailbox1.receive(); - return two; - } - - @State(Thread) - public static class NodesState { - - @Setup(Trial) - public void setup () throws Exception { - node1 = new OtpNode("node-" + System.nanoTime() + "@localhost"); - - mailbox1 = node1.createMbox(); - pid1 = mailbox1.self(); - - node2 = new OtpNode("node-" + System.nanoTime() + "@localhost"); - - mailbox2 = node2.createMbox(); - pid2 = mailbox2.self(); - - message = new OtpErlangTuple(new OtpErlangObject[] { - new OtpErlangAtom("ok"), - new OtpErlangTuple(new OtpErlangObject[] { - new OtpErlangBinary(new byte[] { 56, 16, 78 }), - new OtpErlangInt(42), - new OtpErlangString("Hello world"), - new OtpErlangList(new OtpErlangObject[] { - new OtpErlangInt(1), - new OtpErlangInt(2), - new OtpErlangInt(3) - }) - }) - }); - } - - @TearDown(Trial) - public void tearDown () { - mailbox1.close(); - node1.close(); - mailbox2.close(); - node2.close(); - } - - OtpNode node1; - - OtpMbox mailbox1; - - OtpErlangPid pid1; - - OtpNode node2; - - OtpMbox mailbox2; - - OtpErlangPid pid2; - - OtpErlangObject message; - } - - @Benchmark - @BenchmarkMode(Throughput) - @OutputTimeUnit(SECONDS) - public OtpErlangObject oneDirection (NodesState nodes) throws Exception { - nodes.mailbox1.send(nodes.pid2, nodes.message); - - OtpErlangObject request = nodes.mailbox2.receive(); - return request; - } - - @Benchmark - @BenchmarkMode(Throughput) - @OutputTimeUnit(SECONDS) - public OtpErlangObject node2node (NodesState state) throws Exception { - state.mailbox1.send(state.pid2, state.message); - - OtpErlangObject one = state.mailbox2.receive(); - state.mailbox2.send(state.pid1, one); - - OtpErlangObject two = state.mailbox1.receive(); - return two; - } -} diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_Node2NodeBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_Node2NodeBenchmarks.java new file mode 100644 index 0000000..8cee0be --- /dev/null +++ b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_Node2NodeBenchmarks.java @@ -0,0 +1,138 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.encon.benchmark; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.openjdk.jmh.annotations.Level.Trial; +import static org.openjdk.jmh.annotations.Mode.Throughput; +import static org.openjdk.jmh.annotations.Scope.Benchmark; + +import java.util.stream.IntStream; + +import com.ericsson.otp.erlang.OtpErlangBinary; +import com.ericsson.otp.erlang.OtpErlangObject; +import com.ericsson.otp.erlang.OtpErlangPid; +import com.ericsson.otp.erlang.OtpMbox; +import com.ericsson.otp.erlang.OtpNode; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.infra.ThreadParams; + +/** + * + * @since 1.6.0 + * @author Artem Labazin + */ +@State(Benchmark) +@OutputTimeUnit(SECONDS) +@Warmup(iterations = 1) +@BenchmarkMode(Throughput) +@Measurement(iterations = 1) +public class JInterface_Node2NodeBenchmarks { + + OtpNode serverNode; + + OtpMbox serverMailbox; + + OtpErlangPid serverMailboxPid; + + Thread serverThread; + + OtpErlangObject data; + + OtpNode clientNode; + + OtpMbox[] clientMailboxes; + + @Setup(Trial) + public void setup () throws Exception { + serverNode = new OtpNode("node-server-" + System.nanoTime() + "@localhost"); + serverMailbox = serverNode.createMbox(); + serverMailboxPid = serverMailbox.self(); + data = new OtpErlangBinary(new byte[] { 1, 2, 3, 4, 5 }); + + serverThread = new Thread(() -> { + try { + while (!java.lang.Thread.currentThread().isInterrupted()) { + OtpErlangObject message = serverMailbox.receive(); + serverMailbox.send((OtpErlangPid) message, data); + } + } catch (Throwable ex) { + } + }); + serverThread.start(); + + clientNode = new OtpNode("node-client-" + System.nanoTime() + "@localhost"); + clientMailboxes = IntStream.range(0, 8) + .boxed() + .map(it -> clientNode.createMbox()) + .toArray(OtpMbox[]::new); + } + + @TearDown(Trial) + public void tearDown () throws Exception { + for (OtpMbox mailbox : clientMailboxes) { + mailbox.close(); + } + clientNode.close(); + + serverMailbox.close(); + serverNode.close(); + + serverThread.interrupt(); + } + + @Threads(1) + @Benchmark + public void client_1 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + OtpMbox mailbox = clientMailboxes[0]; + mailbox.send(serverMailboxPid, mailbox.self()); + blackHole.consume(mailbox.receive()); + } + + @Threads(2) + @Benchmark + public void clients_2 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + OtpMbox mailbox = clientMailboxes[thredParams.getThreadIndex()]; + mailbox.send(serverMailboxPid, mailbox.self()); + blackHole.consume(mailbox.receive()); + } + + @Threads(4) + @Benchmark + public void clients_4 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + OtpMbox mailbox = clientMailboxes[thredParams.getThreadIndex()]; + mailbox.send(serverMailboxPid, mailbox.self()); + blackHole.consume(mailbox.receive()); + } + + @Threads(8) + @Benchmark + public void clients_8 (ThreadParams thredParams, Blackhole blackHole) throws Exception { + OtpMbox mailbox = clientMailboxes[thredParams.getThreadIndex()]; + mailbox.send(serverMailboxPid, mailbox.self()); + blackHole.consume(mailbox.receive()); + } +} diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_SimpleBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_SimpleBenchmarks.java new file mode 100644 index 0000000..dac0a09 --- /dev/null +++ b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_SimpleBenchmarks.java @@ -0,0 +1,154 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.encon.benchmark; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.openjdk.jmh.annotations.Level.Trial; +import static org.openjdk.jmh.annotations.Mode.Throughput; +import static org.openjdk.jmh.annotations.Scope.Benchmark; + +import com.ericsson.otp.erlang.OtpErlangBinary; +import com.ericsson.otp.erlang.OtpErlangObject; +import com.ericsson.otp.erlang.OtpErlangPid; +import com.ericsson.otp.erlang.OtpMbox; +import com.ericsson.otp.erlang.OtpNode; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * + * @since 1.6.0 + * @author Artem Labazin + */ +@OutputTimeUnit(SECONDS) +@Warmup(iterations = 1) +@BenchmarkMode(Throughput) +@Measurement(iterations = 1) +public class JInterface_SimpleBenchmarks { + + @Benchmark + public void mailbox2mailboxAndBack (Mailbox2MailboxAndBackState state, Blackhole blackHole) throws Exception { + state.mailbox1.send(state.pid2, state.data); + OtpErlangObject message = state.mailbox2.receive(); + state.mailbox2.send(state.pid1, message); + blackHole.consume(state.mailbox1.receive()); + } + + @Benchmark + public void oneDirectionSend (OneDirectionSendState state, Blackhole blackHole) throws Exception { + state.clientMailbox.send(state.serverMailboxPid, state.data); + blackHole.consume(state.serverMailbox.receive()); + } + + @State(Benchmark) + public static class Mailbox2MailboxAndBackState { + + OtpNode node1; + + OtpMbox mailbox1; + + OtpErlangPid pid1; + + OtpNode node2; + + OtpMbox mailbox2; + + OtpErlangPid pid2; + + OtpErlangObject data; + + @Setup(Trial) + public void setup () throws Exception { + node1 = new OtpNode("node-1-" + System.nanoTime() + "@localhost"); + mailbox1 = node1.createMbox(); + pid1 = mailbox1.self(); + + node2 = new OtpNode("node-2-" + System.nanoTime() + "@localhost"); + mailbox2 = node2.createMbox(); + pid2 = mailbox2.self(); + + data = new OtpErlangBinary(new byte[] { 1, 2, 3, 4, 5 }); + } + + @TearDown(Trial) + public void tearDown () { + mailbox1.close(); + node1.close(); + + mailbox2.close(); + node2.close(); + } + } + + @State(Benchmark) + public static class OneDirectionSendState { + + OtpNode serverNode; + + OtpMbox serverMailbox; + + OtpErlangPid serverMailboxPid; + + Thread serverThread; + + OtpNode clientNode; + + OtpMbox clientMailbox; + + OtpErlangObject data; + + @Setup(Trial) + public void setup () throws Exception { + serverNode = new OtpNode("node-server-" + System.nanoTime() + "@localhost"); + serverMailbox = serverNode.createMbox(); + serverMailboxPid = serverMailbox.self(); + + serverThread = new Thread(() -> { + try { + while (!java.lang.Thread.interrupted()) { + serverMailbox.receive(); + } + } catch (Throwable ex) { + } + }); + serverThread.start(); + + clientNode = new OtpNode("node-client-" + System.nanoTime() + "@localhost"); + clientMailbox = clientNode.createMbox(); + + data = new OtpErlangBinary(new byte[] { 1, 2, 3, 4, 5 }); + } + + @TearDown(Trial) + public void tearDown () { + clientMailbox.close(); + clientNode.close(); + + serverMailbox.close(); + serverNode.close(); + + serverThread.interrupt(); + } + } +} diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java b/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java index 0c08909..b6cabde 100644 --- a/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java +++ b/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java @@ -115,7 +115,10 @@ private static void startEpmd () { } executor = Executors.newSingleThreadExecutor(); - server = new ServerCommandExecutor(new CommonOptions(), new ServerCommandOptions()); + ServerCommandOptions options = new ServerCommandOptions(); + options.setChecks(true); + + server = new ServerCommandExecutor(new CommonOptions(), options); executor.execute(server::execute); } diff --git a/benchmark/src/main/resources/logback.xml b/benchmark/src/main/resources/logback.xml new file mode 100644 index 0000000..1e462a4 --- /dev/null +++ b/benchmark/src/main/resources/logback.xml @@ -0,0 +1,24 @@ + + + + + + + + + + diff --git a/encon-common/pom.xml b/encon-common/pom.xml index 7c603fb..e9aa521 100644 --- a/encon-common/pom.xml +++ b/encon-common/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 encon-common diff --git a/encon-config/README.md b/encon-config/README.md index f7773d8..b1643d6 100644 --- a/encon-config/README.md +++ b/encon-config/README.md @@ -14,7 +14,7 @@ First of all, add config's dependency: io.appulse.encon encon-config - 1.6.2 + 1.6.3 ... @@ -23,7 +23,7 @@ First of all, add config's dependency: **Gradle**: ```groovy -compile 'io.appulse.encon:encon-config:1.6.2' +compile 'io.appulse.encon:encon-config:1.6.3' ``` ### File based configuration diff --git a/encon-config/pom.xml b/encon-config/pom.xml index 7681346..d7a6e2a 100644 --- a/encon-config/pom.xml +++ b/encon-config/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 encon-config diff --git a/encon-databind/README.md b/encon-databind/README.md index 6612d95..3845a6e 100644 --- a/encon-databind/README.md +++ b/encon-databind/README.md @@ -14,12 +14,12 @@ First of all, add databind's dependency: io.appulse.encon encon - 1.6.2 + 1.6.3 io.appulse.encon encon-databind - 1.6.2 + 1.6.3 ... @@ -28,8 +28,8 @@ First of all, add databind's dependency: **Gradle**: ```groovy -compile 'io.appulse.encon:encon:1.6.2' -compile 'io.appulse.encon:encon-databind:1.6.2' +compile 'io.appulse.encon:encon:1.6.3' +compile 'io.appulse.encon:encon-databind:1.6.3' ``` Let's imagine, you have POJO like this: diff --git a/encon-databind/pom.xml b/encon-databind/pom.xml index 6389cea..e91a7e1 100644 --- a/encon-databind/pom.xml +++ b/encon-databind/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 encon-databind diff --git a/encon-handler/README.md b/encon-handler/README.md index b300d3f..8703472 100644 --- a/encon-handler/README.md +++ b/encon-handler/README.md @@ -14,7 +14,7 @@ First of all, add dependency: io.appulse.encon encon-handler - 1.6.2 + 1.6.3 ... @@ -23,7 +23,7 @@ First of all, add dependency: **Gradle**: ```groovy -compile 'io.appulse.encon:encon-handler:1.6.2' +compile 'io.appulse.encon:encon-handler:1.6.3' ``` ### Basics diff --git a/encon-handler/pom.xml b/encon-handler/pom.xml index 5be1c5d..1d33027 100644 --- a/encon-handler/pom.xml +++ b/encon-handler/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 encon-handler diff --git a/encon-spring/pom.xml b/encon-spring/pom.xml index 477a32a..d8abe1d 100644 --- a/encon-spring/pom.xml +++ b/encon-spring/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 encon-spring diff --git a/encon-terms/pom.xml b/encon-terms/pom.xml index aaddeca..529f970 100644 --- a/encon-terms/pom.xml +++ b/encon-terms/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 encon-terms @@ -58,7 +58,7 @@ limitations under the License. io.netty netty-buffer - provided + diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java b/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java index 80568ee..07af9ce 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java @@ -85,7 +85,7 @@ public static ErlangAtom atom (@NonNull String value) { * @return {@link ErlangInteger} new instance */ public static ErlangInteger number (char value) { - return new ErlangInteger(value); + return ErlangInteger.cached(value); } /** @@ -96,7 +96,7 @@ public static ErlangInteger number (char value) { * @return {@link ErlangInteger} new instance */ public static ErlangInteger number (byte value) { - return new ErlangInteger(value); + return ErlangInteger.cached(value); } /** @@ -107,7 +107,7 @@ public static ErlangInteger number (byte value) { * @return {@link ErlangInteger} new instance */ public static ErlangInteger number (short value) { - return new ErlangInteger(value); + return ErlangInteger.cached(value); } /** @@ -118,7 +118,7 @@ public static ErlangInteger number (short value) { * @return {@link ErlangInteger} new instance */ public static ErlangInteger number (int value) { - return new ErlangInteger(value); + return ErlangInteger.cached(value); } /** @@ -129,7 +129,7 @@ public static ErlangInteger number (int value) { * @return {@link ErlangInteger} new instance */ public static ErlangInteger number (long value) { - return new ErlangInteger(value); + return ErlangInteger.cached(value); } /** @@ -140,7 +140,7 @@ public static ErlangInteger number (long value) { * @return {@link ErlangInteger} new instance */ public static ErlangInteger number (@NonNull BigInteger value) { - return new ErlangInteger(value); + return ErlangInteger.cached(value); } /** diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java index 1ca8da3..e697902 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java @@ -95,7 +95,7 @@ public static T newInstance (@NonNull ByteBuf buffer) { case INTEGER: case SMALL_BIG: case LARGE_BIG: - return (T) new ErlangInteger(type, buffer); + return (T) ErlangInteger.cached(type, buffer); case FLOAT: case NEW_FLOAT: return (T) new ErlangFloat(type, buffer); diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java index f1aaa85..417cb62 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java @@ -26,18 +26,21 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; -import java.util.stream.IntStream; +import io.appulse.encon.common.LruCache; import io.appulse.encon.terms.ErlangTerm; import io.appulse.encon.terms.TermType; import io.appulse.encon.terms.exception.ErlangTermDecodeException; import io.appulse.encon.terms.exception.IllegalErlangTermTypeException; import io.netty.buffer.ByteBuf; +import io.netty.util.ByteProcessor; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.NonNull; import lombok.ToString; import lombok.experimental.FieldDefaults; +import lombok.experimental.NonFinal; import lombok.val; /** @@ -53,35 +56,95 @@ public class ErlangInteger extends ErlangTerm { private static final long serialVersionUID = -1757584303003802030L; - private static final int MAX_SMALL_INTEGER; + private static final int MAX_SMALL_INTEGER = 255; - private static final int MAX_INTEGER; + private static final int MAX_INTEGER = (1 << 27) - 1; - private static final int MIN_INTEGER; + private static final int MIN_INTEGER = -(1 << 27) - 1; - private static final int MIN_CACHE; + private static final int MAX_SMALL_BIG_BYTES_LENGTH = 255; - private static final int MAX_CACHE; + private static final LruCache CACHE = new LruCache<>(1000); - private static final int MAX_SMALL_BIG_BYTES_LENGTH; + /** + * Creates cached {@link ErlangInteger} value. + * + * @param number integer value + * + * @return new or cached {@link ErlangInteger} object + */ + public static ErlangInteger cached (byte value) { + int hashCode = 31 + value; - private static final ErlangInteger[] CACHE; + ErlangInteger result = CACHE.get(hashCode); + if (result != null) { + return result; + } + result = new ErlangInteger(value); + CACHE.put(hashCode, result); + return result; + } - static { - MAX_SMALL_INTEGER = 255; + /** + * Creates cached {@link ErlangInteger} value. + * + * @param number integer value + * + * @return new or cached {@link ErlangInteger} object + */ + public static ErlangInteger cached (char value) { + int hashCode = 31 + (byte) (value >> 8); + hashCode = 31 * hashCode + (byte) value; - MAX_INTEGER = (1 << 27) - 1; - MIN_INTEGER = -(1 << 27) - 1; + ErlangInteger result = CACHE.get(hashCode); + if (result != null) { + return result; + } + result = new ErlangInteger(value); + CACHE.put(hashCode, result); + return result; + } - MIN_CACHE = -1; - MAX_CACHE = 256; + /** + * Creates cached {@link ErlangInteger} value. + * + * @param number integer value + * + * @return new or cached {@link ErlangInteger} object + */ + public static ErlangInteger cached (short value) { + int hashCode = 31 + (byte) (value >> 8); + hashCode = 31 * hashCode + (byte) value; - MAX_SMALL_BIG_BYTES_LENGTH = 255; + ErlangInteger result = CACHE.get(hashCode); + if (result != null) { + return result; + } + result = new ErlangInteger(value); + CACHE.put(hashCode, result); + return result; + } - CACHE = IntStream.range(MIN_CACHE, MAX_CACHE) - .boxed() - .map(ErlangInteger::new) - .toArray(ErlangInteger[]::new); + /** + * Creates cached {@link ErlangInteger} value. + * + * @param number integer value + * + * @return new or cached {@link ErlangInteger} object + */ + public static ErlangInteger cached (int value) { + int hashCode = 31 + (byte) (value >> 24); + hashCode = 31 * hashCode + (byte) (value >> 16); + hashCode = 31 * hashCode + (byte) (value >> 8); + hashCode = 31 * hashCode + (byte) value; + + ErlangInteger result = CACHE.get(hashCode); + if (result != null) { + return result; + } + result = new ErlangInteger(value); + CACHE.put(hashCode, result); + return result; } /** @@ -91,14 +154,81 @@ public class ErlangInteger extends ErlangTerm { * * @return new or cached {@link ErlangInteger} object */ - public static ErlangInteger from (int number) { - return number > MAX_CACHE || number < MIN_CACHE - ? new ErlangInteger(number) - : CACHE[number - MIN_CACHE]; + public static ErlangInteger cached (long value) { + int hashCode = 31 + (byte) (value >> 56); + hashCode = 31 * hashCode + (byte) (value >> 48); + hashCode = 31 * hashCode + (byte) (value >> 40); + hashCode = 31 * hashCode + (byte) (value >> 32); + hashCode = 31 * hashCode + (byte) (value >> 24); + hashCode = 31 * hashCode + (byte) (value >> 16); + hashCode = 31 * hashCode + (byte) (value >> 8); + hashCode = 31 * hashCode + (byte) value; + + ErlangInteger result = CACHE.get(hashCode); + if (result != null) { + return result; + } + result = new ErlangInteger(value); + CACHE.put(hashCode, result); + return result; + } + + /** + * Creates cached {@link ErlangInteger} value. + * + * @param number integer value + * + * @return new or cached {@link ErlangInteger} object + */ + public static ErlangInteger cached (BigInteger value) { + if (value.bitLength() < Long.BYTES) { + return cached(value.longValue()); + } + // int hashCode = value.signum() == -1 + // ? 32 + // : 31; + // return CACHE.computeIfAbsent(hashCode, key -> new ErlangInteger(value)); + return new ErlangInteger(value); + } + + public static ErlangInteger cached (TermType type, @NonNull ByteBuf buffer) { + int index = buffer.readerIndex(); + ByteArrayHashCode byteProcessor = new ByteArrayHashCode(); + + int length; + switch (type) { + case SMALL_INTEGER: + length = Byte.BYTES; + break; + case INTEGER: + length = Integer.BYTES; + break; + case SMALL_BIG: + length = buffer.readByte() + Byte.BYTES; + break; + case LARGE_BIG: + default: + length = buffer.readInt() + Byte.BYTES; + } + + buffer.forEachByte(buffer.readerIndex(), length, byteProcessor); + + return CACHE.compute(byteProcessor.getHashCode(), (key, value) -> { + if (value == null) { + buffer.readerIndex(index); + return new ErlangInteger(type, buffer); + } else { + buffer.skipBytes(length); + return value; + } + }); } BigInteger value; + @NonFinal + byte[] cachedMagnitude; + /** * Constructs Erlang's term object with specific {@link TermType} from {@link ByteBuf}. * @@ -297,16 +427,18 @@ protected void serialize (ByteBuf buffer) { break; case SMALL_BIG: case LARGE_BIG: - byte[] bytes = value.abs().toByteArray(); - int index = 0; - for (; index < bytes.length && bytes[index] == 0; index++) { - // skip leading zeros + if (cachedMagnitude == null) { + byte[] bytes = value.abs().toByteArray(); + int index = 0; + for (; index < bytes.length && bytes[index] == 0; index++) { + // skip leading zeros + } + + cachedMagnitude = Arrays.copyOfRange(bytes, index, bytes.length); + reverse(cachedMagnitude); } - byte[] magnitude = Arrays.copyOfRange(bytes, index, bytes.length); - reverse(magnitude); - - int length = magnitude.length; + int length = cachedMagnitude.length; if ((length & 0xFF) == length) { buffer.writeByte(length); // length } else { @@ -316,7 +448,7 @@ protected void serialize (ByteBuf buffer) { ? 1 : 0; buffer.writeByte(sign); - buffer.writeBytes(magnitude); + buffer.writeBytes(cachedMagnitude); break; default: throw new IllegalErlangTermTypeException(getClass(), getType()); @@ -347,4 +479,17 @@ private void reverse (byte[] data) { right--; } } + + @Getter + @FieldDefaults(level = PRIVATE) + private static class ByteArrayHashCode implements ByteProcessor { + + int hashCode = 1; + + @Override + public boolean process (byte value) throws Exception { + hashCode = 31 * hashCode + value; + return true; + } + } } diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java index a56c401..b250284 100644 --- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java +++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java @@ -102,7 +102,7 @@ protected void serialize (ByteBuf buffer) { case LIST: val elements = value.codePoints() .boxed() - .map(ErlangInteger::from) + .map(ErlangInteger::cached) .toArray(ErlangInteger[]::new); buffer.writerIndex(positionBefore); diff --git a/encon/README.md b/encon/README.md index 59471c5..26ec9aa 100644 --- a/encon/README.md +++ b/encon/README.md @@ -21,7 +21,7 @@ Adding encon's dependency to your `JVM` app: io.appulse.encon encon - 1.6.2 + 1.6.3 ... @@ -30,7 +30,7 @@ Adding encon's dependency to your `JVM` app: **Gradle**: ```groovy -compile 'io.appulse.encon:encon:1.6.2' +compile 'io.appulse.encon:encon:1.6.3' ``` ## Start the Node diff --git a/encon/pom.xml b/encon/pom.xml index aefbcc7..0abdb65 100644 --- a/encon/pom.xml +++ b/encon/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 encon diff --git a/encon/src/main/java/io/appulse/encon/ModuleClient.java b/encon/src/main/java/io/appulse/encon/ModuleClient.java index 298d132..a018606 100644 --- a/encon/src/main/java/io/appulse/encon/ModuleClient.java +++ b/encon/src/main/java/io/appulse/encon/ModuleClient.java @@ -16,7 +16,9 @@ package io.appulse.encon; +import static io.netty.channel.ChannelOption.ALLOCATOR; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; +import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import static io.netty.channel.ChannelOption.TCP_NODELAY; import static java.util.concurrent.TimeUnit.SECONDS; @@ -109,6 +111,8 @@ private CompletableFuture createConnection (@NonNull RemoteNode remo .option(SO_KEEPALIVE, true) .option(TCP_NODELAY, true) .option(CONNECT_TIMEOUT_MILLIS, 5000) + .option(ALLOCATOR, moduleConnection.getAllocator()) + .option(SINGLE_EVENTEXECUTOR_PER_GROUP, true) .handler(HandshakeClientInitializer.builder() .node(node) .future(future) diff --git a/encon/src/main/java/io/appulse/encon/ModuleConnection.java b/encon/src/main/java/io/appulse/encon/ModuleConnection.java index 8616dcc..af92fa5 100644 --- a/encon/src/main/java/io/appulse/encon/ModuleConnection.java +++ b/encon/src/main/java/io/appulse/encon/ModuleConnection.java @@ -29,6 +29,8 @@ import io.appulse.encon.common.RemoteNode; import io.appulse.encon.connection.Connection; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -75,6 +77,9 @@ class ModuleConnection implements Closeable { @Getter Class serverChannelClass; + @Getter + ByteBufAllocator allocator; + Map> cache; ModuleConnection (@NonNull String prefix, int bossThreads, int workerThreads) { @@ -94,6 +99,7 @@ class ModuleConnection implements Closeable { clientChannelClass = NioSocketChannel.class; serverChannelClass = NioServerSocketChannel.class; } + allocator = new PooledByteBufAllocator(true); } @Override diff --git a/encon/src/main/java/io/appulse/encon/ModuleServer.java b/encon/src/main/java/io/appulse/encon/ModuleServer.java index e0c98ba..7c94115 100644 --- a/encon/src/main/java/io/appulse/encon/ModuleServer.java +++ b/encon/src/main/java/io/appulse/encon/ModuleServer.java @@ -17,8 +17,11 @@ package io.appulse.encon; import static io.netty.channel.ChannelOption.ALLOCATOR; +import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; +import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; +import static io.netty.channel.ChannelOption.SO_RCVBUF; import static io.netty.channel.ChannelOption.SO_REUSEADDR; import static io.netty.channel.ChannelOption.TCP_NODELAY; import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK; @@ -30,7 +33,6 @@ import io.appulse.encon.connection.handshake.HandshakeServerInitializer; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.handler.logging.LoggingHandler; import lombok.NonNull; @@ -87,11 +89,15 @@ private void start () { .build()) .option(SO_BACKLOG, 1024) .option(SO_REUSEADDR, true) + .option(CONNECT_TIMEOUT_MILLIS, 5000) + .option(ALLOCATOR, moduleConnection.getAllocator()) + .option(SINGLE_EVENTEXECUTOR_PER_GROUP, true) .childOption(SO_REUSEADDR, true) .childOption(SO_KEEPALIVE, true) .childOption(TCP_NODELAY, true) + .childOption(SO_RCVBUF, 128 * 1024) .childOption(WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(64 * 1024, 128 * 1024)) - .childOption(ALLOCATOR, new PooledByteBufAllocator(true)) + .childOption(ALLOCATOR, moduleConnection.getAllocator()) .bind(port); } } diff --git a/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java b/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java index 1e65cc4..1acf92c 100644 --- a/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java +++ b/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java @@ -99,7 +99,7 @@ public final ErlangTuple toTuple () { val elements = Stream.of(elements()) .collect(toCollection(LinkedList::new)); - elements.addFirst(ErlangInteger.from(getTag().getCode())); + elements.addFirst(ErlangInteger.cached(getTag().getCode())); return new ErlangTuple(elements); } diff --git a/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java b/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java index 13a5b21..6fef69a 100644 --- a/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java +++ b/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java @@ -16,9 +16,14 @@ package io.appulse.encon.connection.regular; +import static io.appulse.encon.connection.regular.Message.PASS_THROUGH_TAG; +import static io.appulse.encon.connection.regular.Message.VERSION_TAG; import static lombok.AccessLevel.PRIVATE; +// import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump; +// import static io.netty.util.internal.StringUtil.NEWLINE; import java.io.Closeable; +import java.util.List; import java.util.function.Consumer; import io.appulse.encon.Node; @@ -31,13 +36,17 @@ import io.appulse.encon.connection.control.SendToRegisteredProcess; import io.appulse.encon.connection.control.Unlink; import io.appulse.encon.mailbox.Mailbox; +import io.appulse.encon.terms.ErlangTerm; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; import lombok.experimental.NonFinal; import lombok.extern.slf4j.Slf4j; @@ -45,13 +54,42 @@ /** * - * @since 1.0.0 + * @since 1.6.2 * @author Artem Labazin */ @Slf4j -@RequiredArgsConstructor +@Builder +@AllArgsConstructor @FieldDefaults(level = PRIVATE, makeFinal = true) -public final class ConnectionHandler extends ChannelInboundHandlerAdapter implements Closeable { +public final class ConnectionHandler extends ByteToMessageDecoder implements Closeable { + + private static final ByteBuf TICK_TOCK = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0 }); + + private static ErlangTerm readTerm (ByteBuf buffer) { + val versionByte = buffer.readUnsignedByte(); + if (versionByte != VERSION_TAG) { + throw new IllegalArgumentException("Wrong version byte. Expected 0x83 (131), but was: " + versionByte); + } + return ErlangTerm.newInstance(buffer); + } + + // private static String formatByteBuf (ChannelHandlerContext ctx, String eventName, ByteBuf msg) { + // String chStr = ctx.channel().toString(); + // int length = msg.readableBytes(); + // if (length == 0) { + // StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 4); + // buf.append(chStr).append(' ').append(eventName).append(": 0B"); + // return buf.toString(); + // } else { + // int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4; + // StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + 10 + 1 + 2 + rows * 80); + + // buf.append(chStr).append(' ').append(eventName).append(": ").append(length).append('B').append(NEWLINE); + // appendPrettyHexDump(buf, msg); + + // return buf.toString(); + // } + // } @NonNull Node node; @@ -96,23 +134,78 @@ public void channelInactive (ChannelHandlerContext context) throws Exception { public void send (Message message) { log.debug("Sending message\nto {}\n {}\n", remote, message); - if (!channel.isWritable()) { - log.error("Channel for {} is not writable. Remote node is {}", - channel.remoteAddress(), remote); - throw new IllegalArgumentException("Channel is not writable"); - } - channel.writeAndFlush(message); + + val out = channel.alloc().buffer(); + message.writeTo(out); + + val messageLength = channel.alloc() + .buffer(Integer.BYTES) + .writeInt(out.readableBytes()); + + val popa = channel.alloc().compositeBuffer(2) + .addComponents(true, messageLength, out); + + channel.writeAndFlush(popa); } @Override - public void channelRead (ChannelHandlerContext context, Object obj) throws Exception { - val message = (Message) obj; + protected void decode (ChannelHandlerContext context, ByteBuf buffer, List out) { + if (!buffer.isReadable(4)) { + // log.debug("not enough bytes #1: {}", buffer.readableBytes()); + return; + } + int index = buffer.readerIndex(); + + int length = buffer.readInt(); + // log.debug("message length is: {}", length); + if (length == 0) { + // log.debug("TICK-TOCK message detected, sending response"); + TICK_TOCK.retain(); + context.writeAndFlush(TICK_TOCK.duplicate()); + if (buffer.isReadable()) { + // log.debug("There is no more bytes in message, stop pipelining"); + return; + } + + if (!buffer.isReadable(Integer.BYTES)) { + // log.debug("not enough bytes #2: {}", buffer.readableBytes()); + return; + } + index = buffer.readerIndex(); + length = buffer.readInt(); + // log.debug("new message length is: {}", length); + } + + if (!buffer.isReadable(length)) { + buffer.readerIndex(index); + // log.debug("not enough bytes #3: {} vs {}", buffer.readableBytes(), length + 4); + return; + } + + // MessageDecoder + val passThrough = buffer.readByte(); + if (passThrough != PASS_THROUGH_TAG) { + buffer.readerIndex(index); + // log.error("\n{}", formatByteBuf(context, "POPA", buffer)); + throw new IllegalArgumentException("Wrong pass through marker. Expected 0x70 (112), but was: " + passThrough + + " at index: " + buffer.readerIndex()); + } + + val header = readTerm(buffer); + ControlMessage controlMessage = ControlMessage.parse(header); + + ErlangTerm body = null; + if (buffer.isReadable()) { + body = readTerm(buffer); + } + + // ConnectionHandler + val message = new Message(controlMessage, body); log.debug("Received message\nfrom {}\n {}\n", remote, message); - ControlMessage header = message.getHeader(); - val mailbox = findMailbox(header); + val mailbox = findMailbox(controlMessage); if (mailbox == null) { - log.warn("There is no mailbox for message\n {}\n", message); + log.warn("There is no mailbox for message\n {}\n {}", message, node.mailboxes().keySet()); } else { mailbox.deliver(message); } diff --git a/encon/src/main/java/io/appulse/encon/connection/regular/Message.java b/encon/src/main/java/io/appulse/encon/connection/regular/Message.java index ac3b91c..b2909e8 100644 --- a/encon/src/main/java/io/appulse/encon/connection/regular/Message.java +++ b/encon/src/main/java/io/appulse/encon/connection/regular/Message.java @@ -29,6 +29,7 @@ import io.appulse.encon.terms.type.ErlangAtom; import io.appulse.encon.terms.type.ErlangPid; +import io.netty.buffer.ByteBuf; import lombok.NonNull; import lombok.Value; @@ -99,4 +100,15 @@ public static Message exit2 (@NonNull ErlangPid from, @NonNull ErlangPid to, @No ControlMessage header; ErlangTerm body; + + public void writeTo (ByteBuf buffer) { + buffer.writeByte(0x70); + buffer.writeByte(0x83); + header.writeTo(buffer); + + if (body != null) { + buffer.writeByte(0x83); + body.writeTo(buffer); + } + } } diff --git a/encon/src/main/java/io/appulse/encon/connection/regular/MessageDecoder.java b/encon/src/main/java/io/appulse/encon/connection/regular/MessageDecoder.java deleted file mode 100644 index f1f4ef2..0000000 --- a/encon/src/main/java/io/appulse/encon/connection/regular/MessageDecoder.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.encon.connection.regular; - -import static io.appulse.encon.connection.regular.Message.PASS_THROUGH_TAG; -import static io.appulse.encon.connection.regular.Message.VERSION_TAG; - -import java.util.List; - -import io.appulse.encon.connection.control.ControlMessage; -import io.appulse.encon.terms.ErlangTerm; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -/** - * - * @since 1.0.0 - * @author Artem Labazin - */ -@Slf4j -@Sharable -public class MessageDecoder extends MessageToMessageDecoder { - - @Override - public void exceptionCaught (ChannelHandlerContext context, Throwable cause) throws Exception { - log.error("Error during channel connection with {}", - context.channel().remoteAddress(), cause); - - context.fireExceptionCaught(cause); - context.close(); - } - - @Override - protected void decode (ChannelHandlerContext context, ByteBuf msg, List out) throws Exception { - log.debug("decoding"); - - msg.skipBytes(Integer.BYTES); // skip size - - val passThrough = msg.readByte(); - if (passThrough != PASS_THROUGH_TAG) { - throw new IllegalArgumentException("Wrong pass through marker. Expected 0x70 (112), but was: " + passThrough); - } - - val header = readTerm(msg); - val controlMessage = ControlMessage.parse(header); - - ErlangTerm body = null; - if (msg.readerIndex() < msg.capacity()) { - body = readTerm(msg); - } - - out.add(new Message(controlMessage, body)); - } - - private ErlangTerm readTerm (ByteBuf buffer) { - val versionByte = buffer.readUnsignedByte(); - if (versionByte != VERSION_TAG) { - throw new IllegalArgumentException("Wrong version byte. Expected 0x83 (131), but was: " + versionByte); - } - return ErlangTerm.newInstance(buffer); - } -} diff --git a/encon/src/main/java/io/appulse/encon/connection/regular/MessageEncoder.java b/encon/src/main/java/io/appulse/encon/connection/regular/MessageEncoder.java deleted file mode 100644 index 7d40bc1..0000000 --- a/encon/src/main/java/io/appulse/encon/connection/regular/MessageEncoder.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.encon.connection.regular; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -/** - * - * @since 1.0.0 - * @author Artem Labazin - */ -@Slf4j -@Sharable -public class MessageEncoder extends MessageToByteEncoder { - - public MessageEncoder () { - super(false); - } - - @Override - public void exceptionCaught (ChannelHandlerContext context, Throwable cause) throws Exception { - log.error("Error during channel connection with {}", - context.channel().remoteAddress(), cause); - - context.fireExceptionCaught(cause); - context.close(); - } - - @Override - protected void encode (ChannelHandlerContext context, Message message, ByteBuf out) throws Exception { - try { - out.writeByte(0x70); - out.writeByte(0x83); - message.getHeader().writeTo(out); - - val body = message.getBody(); - if (body != null) { - out.writeByte(0x83); - body.writeTo(out); - } - log.debug("Message was sent"); - } catch (Exception ex) { - log.error("Error during encoding message for {}\n {}\n", - context.channel().remoteAddress(), message, ex); - throw ex; - } - } -} diff --git a/encon/src/main/java/io/appulse/encon/connection/regular/RegularPipeline.java b/encon/src/main/java/io/appulse/encon/connection/regular/RegularPipeline.java index e985b0f..3d42e36 100644 --- a/encon/src/main/java/io/appulse/encon/connection/regular/RegularPipeline.java +++ b/encon/src/main/java/io/appulse/encon/connection/regular/RegularPipeline.java @@ -17,7 +17,6 @@ package io.appulse.encon.connection.regular; import static io.netty.handler.logging.LogLevel.DEBUG; -import static java.lang.Integer.MAX_VALUE; import java.util.function.Consumer; @@ -25,38 +24,23 @@ import io.appulse.encon.common.RemoteNode; import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelInboundHandler; -import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.logging.LoggingHandler; import lombok.NonNull; -import lombok.val; +import lombok.extern.slf4j.Slf4j; /** * * @since 1.0.0 * @author Artem Labazin */ +@Slf4j public final class RegularPipeline { private static final ChannelDuplexHandler LOGGING_HANDLER; - private static final LengthFieldPrepender LENGTH_FIELD_PREPENDER; - - private static final ChannelInboundHandler TICK_TOCK_HANDLER; - - private static final ChannelOutboundHandler MESSAGE_ENCODER; - - private static final ChannelInboundHandler MESSAGE_DECODER; - static { LOGGING_HANDLER = new LoggingHandler(DEBUG); - LENGTH_FIELD_PREPENDER = new LengthFieldPrepender(4, false); - TICK_TOCK_HANDLER = new TickTockHandler(); - MESSAGE_ENCODER = new MessageEncoder(); - MESSAGE_DECODER = new MessageDecoder(); } public static ConnectionHandler setup (@NonNull ChannelPipeline pipeline, @@ -64,16 +48,16 @@ public static ConnectionHandler setup (@NonNull ChannelPipeline pipeline, @NonNull RemoteNode remoteNode, @NonNull Consumer channelCloseAction ) { - val handler = new ConnectionHandler(node, remoteNode, channelCloseAction); - - pipeline - .addLast(LOGGING_HANDLER) - .addLast(TICK_TOCK_HANDLER) - .addLast(LENGTH_FIELD_PREPENDER) - .addLast(new LengthFieldBasedFrameDecoder(MAX_VALUE, 0, 4)) - .addLast(MESSAGE_ENCODER) - .addLast(MESSAGE_DECODER) - .addLast(handler); + ConnectionHandler handler = ConnectionHandler.builder() + .node(node) + .remote(remoteNode) + .channelCloseAction(channelCloseAction) + .build(); + + if (log.isDebugEnabled()) { + pipeline.addLast(LOGGING_HANDLER); + } + pipeline.addLast(handler); return handler; } diff --git a/encon/src/main/java/io/appulse/encon/connection/regular/TickTockHandler.java b/encon/src/main/java/io/appulse/encon/connection/regular/TickTockHandler.java deleted file mode 100644 index 3689db1..0000000 --- a/encon/src/main/java/io/appulse/encon/connection/regular/TickTockHandler.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.encon.connection.regular; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -/** - * - * @since 1.0.0 - * @author Artem Labazin - */ -@Slf4j -@Sharable -public class TickTockHandler extends ChannelInboundHandlerAdapter { - - private static final ByteBuf TICK_TOCK = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0 }); - - @Override - public void exceptionCaught (ChannelHandlerContext context, Throwable cause) throws Exception { - log.error("Error during channel connection with {}", - context.channel().remoteAddress(), cause); - - context.fireExceptionCaught(cause); - context.close(); - } - - @Override - public void channelRead (ChannelHandlerContext context, Object message) throws Exception { - val buffer = (ByteBuf) message; - - if (buffer.readableBytes() >= TICK_TOCK.maxCapacity() && buffer.getInt(0) == 0) { - log.debug("TICK-TOCK message detected, sending response"); - TICK_TOCK.retain(); - context.writeAndFlush(TICK_TOCK.duplicate()); - if (buffer.readableBytes() == TICK_TOCK.maxCapacity()) { - log.debug("There is no more bytes in message, stop pipelining"); - return; - } - buffer.readerIndex(TICK_TOCK.maxCapacity()); - } - - context.fireChannelRead(buffer); - } -} diff --git a/examples/custom-queue/pom.xml b/examples/custom-queue/pom.xml index 5c985b9..826be00 100644 --- a/examples/custom-queue/pom.xml +++ b/examples/custom-queue/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 io.appulse.encon.examples diff --git a/examples/databind/pom.xml b/examples/databind/pom.xml index 26cc99d..b999771 100644 --- a/examples/databind/pom.xml +++ b/examples/databind/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 io.appulse.encon.examples diff --git a/examples/echo-server-spring/pom.xml b/examples/echo-server-spring/pom.xml index f16cb12..667106b 100644 --- a/examples/echo-server-spring/pom.xml +++ b/examples/echo-server-spring/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 diff --git a/examples/echo-server/pom.xml b/examples/echo-server/pom.xml index 8256994..3ab7c77 100644 --- a/examples/echo-server/pom.xml +++ b/examples/echo-server/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 io.appulse.encon.examples diff --git a/examples/handler-advanced/pom.xml b/examples/handler-advanced/pom.xml index 4bde3cb..2e9bc54 100644 --- a/examples/handler-advanced/pom.xml +++ b/examples/handler-advanced/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 io.appulse.encon.examples diff --git a/examples/handler-basic/pom.xml b/examples/handler-basic/pom.xml index 54f4a1d..d7e4c5a 100644 --- a/examples/handler-basic/pom.xml +++ b/examples/handler-basic/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 io.appulse.encon.examples diff --git a/examples/load-config-spring/pom.xml b/examples/load-config-spring/pom.xml index ca2776f..ecd0ce3 100644 --- a/examples/load-config-spring/pom.xml +++ b/examples/load-config-spring/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 diff --git a/examples/load-config/pom.xml b/examples/load-config/pom.xml index 1f066e1..8149bd3 100644 --- a/examples/load-config/pom.xml +++ b/examples/load-config/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 io.appulse.encon.examples diff --git a/examples/pom.xml b/examples/pom.xml index 2fd89db..36acaff 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 examples diff --git a/examples/simple/pom.xml b/examples/simple/pom.xml index ac3d50d..557715c 100644 --- a/examples/simple/pom.xml +++ b/examples/simple/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse.encon examples - 1.6.2 + 1.6.3 io.appulse.encon.examples diff --git a/pom.xml b/pom.xml index ab455b1..0f7c9b2 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ limitations under the License. io.appulse.encon encon-parent - 1.6.2 + 1.6.3 pom @@ -74,7 +74,7 @@ limitations under the License. https://github.com/appulse-projects/encon-java scm:git:https://github.com/appulse-projects/encon-java.git scm:git:https://github.com/appulse-projects/encon-java.git - 1.6.2 + 1.6.3 @@ -109,6 +109,11 @@ limitations under the License. Sokol Andrey sokolandrey1993@mail.ru + + isv + Stan Ignatev + i.v.stanislav@gmail.com + @@ -165,17 +170,17 @@ limitations under the License. io.netty netty-buffer - 4.1.28.Final + 4.1.29.Final io.netty netty-handler - 4.1.28.Final + 4.1.29.Final io.netty netty-transport-native-epoll - 4.1.28.Final + 4.1.29.Final linux-x86_64 @@ -201,12 +206,12 @@ limitations under the License. org.assertj assertj-core - 3.11.0 + 3.11.1 org.mockito mockito-core - 2.21.0 + 2.22.0 @@ -316,41 +321,6 @@ limitations under the License. - - - org.apache.maven.plugins - maven-deploy-plugin - 2.8.2 - - true - - - - org.sonatype.plugins - nexus-staging-maven-plugin - 1.6.8 - true - - ossrh - https://oss.sonatype.org/ - true - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.6 - - - sign-artifacts - verify - - sign - - - - @@ -406,6 +376,41 @@ limitations under the License. + + + org.apache.maven.plugins + maven-deploy-plugin + 2.8.2 + + true + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.8 + true + + ossrh + https://oss.sonatype.org/ + true + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + +