diff --git a/CHANGELOG.md b/CHANGELOG.md
index 542ec23..e36e98d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -22,6 +22,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Turn on checkstyle JavaDocs module.
- Add updates to the protocol, like new `ControlMessage`.
+## [1.6.3](https://github.com/appulse-projects/encon-java/releases/tag/1.6.3) - 2018-09-11
+
+### Changed
+
+- Removed multi-handlers `netty` pipeline, use single one instead;
+- Cache `atom` and `pid` values;
+- Improve benchmarks.
+
## [1.6.2](https://github.com/appulse-projects/encon-java/releases/tag/1.6.2) - 2018-09-06
### Added
diff --git a/README.md b/README.md
index a091d79..ebcded2 100644
--- a/README.md
+++ b/README.md
@@ -46,7 +46,7 @@ $> mvn clean compile
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
-[INFO] encon 1.6.2 ........................................ SUCCESS [ 1.210 s]
+[INFO] encon 1.6.3 ........................................ SUCCESS [ 1.210 s]
[INFO] encon-common ....................................... SUCCESS [ 25.693 s]
[INFO] encon-terms ........................................ SUCCESS [ 27.517 s]
[INFO] encon-config ....................................... SUCCESS [ 18.707 s]
@@ -64,7 +64,7 @@ $> mvn clean compile
[INFO] handler-advanced ................................... SUCCESS [ 11.289 s]
[INFO] load-config ........................................ SUCCESS [ 3.725 s]
[INFO] load-config-spring ................................. SUCCESS [ 6.420 s]
-[INFO] benchmark 1.6.2 .................................... SUCCESS [ 5.594 s]
+[INFO] benchmark 1.6.3 .................................... SUCCESS [ 5.594 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
diff --git a/benchmark/README.md b/benchmark/README.md
index 8a1f186..67c7610 100644
--- a/benchmark/README.md
+++ b/benchmark/README.md
@@ -22,9 +22,23 @@
| Benchmark | Mode | Cnt | Score | Error | Units |
|----------------------------------------------------------------------------------------------------------------|-------|-----|------------:|------------:|-------|
-| [EnconBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L103) | thrpt | 25 | 4611737.930 | ± 54595.578 | ops/s |
-| [EnconBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L177) | thrpt | 25 | 13969.833 | ± 164.199 | ops/s |
-| [EnconBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L167) | thrpt | 25 | 28028.764 | ± 262.720 | ops/s |
-| [JInterfaceBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L99) | thrpt | 25 | 4370604.847 | ± 15769.278 | ops/s |
-| [JInterfaceBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L175) | thrpt | 25 | 17196.521 | ± 113.159 | ops/s |
-| [JInterfaceBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L165) | thrpt | 25 | 34529.556 | ± 207.977 | ops/s |
+| [EnconBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L103) | thrpt | 25 | 4562837.232 | ± 48730.020 | ops/s |
+| [EnconBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L177) | thrpt | 25 | 13744.084 | ± 160.906 | ops/s |
+| [EnconBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java#L167) | thrpt | 25 | 27665.670 | ± 230.607 | ops/s |
+| [JInterfaceBenchmark.mailbox2mailbox](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L99) | thrpt | 25 | 4345167.985 | ± 22392.570 | ops/s |
+| [JInterfaceBenchmark.node2node](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L175) | thrpt | 25 | 13850.978 | ± 126.660 | ops/s |
+| [JInterfaceBenchmark.oneDirection](./src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java#L165) | thrpt | 25 | 27590.545 | ± 253.874 | ops/s |
+
+How to run the benchmarks:
+
+```bash
+$> mvn clean package \
+ -DskipTests \
+ -Dgpg.skip \
+ -Dfindbugs.skip=true \
+ -Dpmd.skip=true \
+ -Dcheckstyle.skip \
+ -Dmaven.test.skip=true \
+ -pl benchmark -am; and \
+ java -Xms1G -Xmx2G -jar benchmark/target/benchmarks.jar
+```
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index 7de3aa8..3da57f9 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
benchmark
diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java b/benchmark/src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java
deleted file mode 100644
index e03d194..0000000
--- a/benchmark/src/main/java/io/appulse/encon/benchmark/EnconBenchmark.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Copyright 2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.appulse.encon.benchmark;
-
-import static io.appulse.encon.terms.Erlang.atom;
-import static io.appulse.encon.terms.Erlang.binary;
-import static io.appulse.encon.terms.Erlang.list;
-import static io.appulse.encon.terms.Erlang.number;
-import static io.appulse.encon.terms.Erlang.string;
-import static io.appulse.encon.terms.Erlang.tuple;
-import static java.lang.Boolean.TRUE;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.openjdk.jmh.annotations.Level.Trial;
-import static org.openjdk.jmh.annotations.Mode.Throughput;
-import static org.openjdk.jmh.annotations.Scope.Thread;
-
-import io.appulse.encon.Node;
-import io.appulse.encon.Nodes;
-import io.appulse.encon.config.NodeConfig;
-import io.appulse.encon.connection.regular.Message;
-import io.appulse.encon.mailbox.Mailbox;
-import io.appulse.encon.terms.ErlangTerm;
-import io.appulse.encon.terms.type.ErlangPid;
-
-import lombok.val;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-
-/**
- *
- * @since 1.6.0
- * @author Artem Labazin
- */
-public class EnconBenchmark {
-
- @State(Thread)
- public static class MailboxesState {
-
- @Setup(Trial)
- public void setup () {
- val config = NodeConfig.builder()
- .shortName(TRUE)
- .build();
-
- node = Nodes.singleNode("node-" + System.nanoTime(), config);
-
- mailbox1 = node.mailbox().build();
- pid1 = mailbox1.getPid();
-
- mailbox2 = node.mailbox().build();
- pid2 = mailbox2.getPid();
-
- message = tuple(
- atom("ok"),
- tuple(
- binary(new byte[] { 56, 16, 78 }),
- number(42),
- string("Hello world"),
- list(number(1), number(2), number(3))
- )
- );
- }
-
- @TearDown(Trial)
- public void tearDown () {
- node.close();
- }
-
- Node node;
-
- Mailbox mailbox1;
-
- ErlangPid pid1;
-
- Mailbox mailbox2;
-
- ErlangPid pid2;
-
- ErlangTerm message;
- }
-
- @Benchmark
- @BenchmarkMode(Throughput)
- @OutputTimeUnit(SECONDS)
- public Message mailbox2mailbox (MailboxesState state) {
- state.mailbox1.send(state.pid2, state.message);
-
- Message one = state.mailbox2.receive();
- state.mailbox2.send(state.pid1, one.getBody());
-
- Message two = state.mailbox1.receive();
- return two;
- }
-
- @State(Thread)
- public static class NodesState {
-
- @Setup(Trial)
- public void setup () {
- val config = NodeConfig.builder()
- .shortName(TRUE)
- .build();
-
- node1 = Nodes.singleNode("node-" + System.nanoTime(), config);
-
- mailbox1 = node1.mailbox().build();
- pid1 = mailbox1.getPid();
-
- node2 = Nodes.singleNode("node-" + System.nanoTime(), config);
-
- mailbox2 = node2.mailbox().build();
- pid2 = mailbox2.getPid();
-
- message = tuple(
- atom("ok"),
- tuple(
- binary(new byte[] { 56, 16, 78 }),
- number(42),
- string("Hello world"),
- list(number(1), number(2), number(3))
- )
- );
- }
-
- @TearDown(Trial)
- public void tearDown () {
- node1.close();
- node2.close();
- }
-
- Node node1;
-
- Mailbox mailbox1;
-
- ErlangPid pid1;
-
- Node node2;
-
- Mailbox mailbox2;
-
- ErlangPid pid2;
-
- ErlangTerm message;
- }
-
- @Benchmark
- @BenchmarkMode(Throughput)
- @OutputTimeUnit(SECONDS)
- public Message oneDirection (NodesState nodes) {
- nodes.mailbox1.send(nodes.pid2, nodes.message);
-
- Message request = nodes.mailbox2.receive();
- return request;
- }
-
- @Benchmark
- @BenchmarkMode(Throughput)
- @OutputTimeUnit(SECONDS)
- public Message node2node (NodesState nodes) {
- nodes.mailbox1.send(nodes.pid2, nodes.message);
-
- Message one = nodes.mailbox2.receive();
- nodes.mailbox2.send(nodes.pid1, one.getBody());
-
- Message two = nodes.mailbox1.receive();
- return two;
- }
-}
diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_Node2NodeBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_Node2NodeBenchmarks.java
new file mode 100644
index 0000000..d510c21
--- /dev/null
+++ b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_Node2NodeBenchmarks.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appulse.encon.benchmark;
+
+import static io.appulse.encon.terms.Erlang.binary;
+import static java.lang.Boolean.TRUE;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.openjdk.jmh.annotations.Level.Trial;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+import static org.openjdk.jmh.annotations.Scope.Benchmark;
+
+import java.util.stream.IntStream;
+
+import io.appulse.encon.Node;
+import io.appulse.encon.Nodes;
+import io.appulse.encon.config.NodeConfig;
+import io.appulse.encon.config.ServerConfig;
+import io.appulse.encon.mailbox.Mailbox;
+import io.appulse.encon.terms.ErlangTerm;
+import io.appulse.encon.terms.type.ErlangPid;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.infra.ThreadParams;
+
+/**
+ *
+ * @since 1.6.0
+ * @author Artem Labazin
+ */
+@State(Benchmark)
+@OutputTimeUnit(SECONDS)
+@Warmup(iterations = 1)
+@BenchmarkMode(Throughput)
+@Measurement(iterations = 1)
+public class Encon_Node2NodeBenchmarks {
+
+ Node serverNode;
+
+ Mailbox serverMailbox;
+
+ ErlangPid serverMailboxPid;
+
+ Thread serverThread;
+
+ ErlangTerm data;
+
+ Node clientNode;
+
+ Mailbox[] clientMailboxes;
+
+ @Setup(Trial)
+ public void setup () throws Exception {
+ serverNode = Nodes.singleNode("node-server-" + System.nanoTime(), NodeConfig.builder()
+ .shortName(TRUE)
+ .server(ServerConfig.builder()
+ .bossThreads(1)
+ .workerThreads(2)
+ .build()
+ )
+ .build()
+ );
+
+ serverMailbox = serverNode.mailbox().build();
+ serverMailboxPid = serverMailbox.getPid();
+ data = binary(new byte[] { 1, 2, 3, 4, 5 });
+
+ serverThread = new Thread(() -> {
+ try {
+ while (!java.lang.Thread.interrupted()) {
+ ErlangTerm payload = serverMailbox.receive().getBody();
+ serverMailbox.send(payload.asPid(), data);
+ }
+ } catch (Throwable ex) {
+ }
+ });
+ serverThread.start();
+
+ clientNode = Nodes.singleNode("node-client-" + System.nanoTime(), NodeConfig.builder().shortName(TRUE)
+ .server(ServerConfig.builder()
+ .bossThreads(1)
+ .workerThreads(8)
+ .build()
+ )
+ .build());
+
+ clientMailboxes = IntStream.range(0, 8)
+ .boxed()
+ .map(it -> clientNode.mailbox().build())
+ .toArray(Mailbox[]::new);
+ }
+
+ @TearDown(Trial)
+ public void tearDown () {
+ for (Mailbox mailbox : clientMailboxes) {
+ mailbox.close();
+ }
+ clientNode.close();
+
+ serverMailbox.close();
+ serverNode.close();
+
+ serverThread.interrupt();
+ }
+
+ @Threads(1)
+ @Benchmark
+ public void client_1 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ Mailbox mailbox = clientMailboxes[0];
+ mailbox.send(serverMailboxPid, mailbox.getPid());
+ blackHole.consume(mailbox.receive());
+ }
+
+ @Threads(2)
+ @Benchmark
+ public void clients_2 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ Mailbox mailbox = clientMailboxes[thredParams.getThreadIndex()];
+ mailbox.send(serverMailboxPid, mailbox.getPid());
+ blackHole.consume(mailbox.receive());
+ }
+
+ @Threads(4)
+ @Benchmark
+ public void clients_4 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ Mailbox mailbox = clientMailboxes[thredParams.getThreadIndex()];
+ mailbox.send(serverMailboxPid, mailbox.getPid());
+ blackHole.consume(mailbox.receive());
+ }
+
+ @Threads(8)
+ @Benchmark
+ public void clients_8 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ Mailbox mailbox = clientMailboxes[thredParams.getThreadIndex()];
+ mailbox.send(serverMailboxPid, mailbox.getPid());
+ blackHole.consume(mailbox.receive());
+ }
+}
diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_SimpleBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_SimpleBenchmarks.java
new file mode 100644
index 0000000..dd27743
--- /dev/null
+++ b/benchmark/src/main/java/io/appulse/encon/benchmark/Encon_SimpleBenchmarks.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appulse.encon.benchmark;
+
+import static io.appulse.encon.terms.Erlang.binary;
+import static java.lang.Boolean.TRUE;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.openjdk.jmh.annotations.Level.Trial;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+import static org.openjdk.jmh.annotations.Scope.Benchmark;
+
+import io.appulse.encon.Node;
+import io.appulse.encon.Nodes;
+import io.appulse.encon.config.NodeConfig;
+import io.appulse.encon.config.ServerConfig;
+import io.appulse.encon.connection.regular.Message;
+import io.appulse.encon.mailbox.Mailbox;
+import io.appulse.encon.terms.ErlangTerm;
+import io.appulse.encon.terms.type.ErlangPid;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ *
+ * @since 1.6.0
+ * @author Artem Labazin
+ */
+@OutputTimeUnit(SECONDS)
+@Warmup(iterations = 1)
+@BenchmarkMode(Throughput)
+@Measurement(iterations = 1)
+public class Encon_SimpleBenchmarks {
+
+ @Benchmark
+ public void mailbox2mailboxAndBack (Mailbox2MailboxAndBackState state, Blackhole blackHole) {
+ state.mailbox1.send(state.pid2, state.data);
+ Message message = state.mailbox2.receive();
+ state.mailbox2.send(state.pid1, message.getBody());
+ blackHole.consume(state.mailbox1.receive());
+ }
+
+ @Benchmark
+ public void oneDirectionSend (OneDirectionSendState state, Blackhole blackHole) {
+ state.clientMailbox.send(state.serverMailboxPid, state.data);
+ blackHole.consume(state.serverMailbox.receive());
+ }
+
+ @State(Benchmark)
+ public static class Mailbox2MailboxAndBackState {
+
+ Node node1;
+
+ Mailbox mailbox1;
+
+ ErlangPid pid1;
+
+ Node node2;
+
+ Mailbox mailbox2;
+
+ ErlangPid pid2;
+
+ ErlangTerm data;
+
+ @Setup(Trial)
+ public void setup () throws Exception {
+ NodeConfig config = NodeConfig.builder()
+ .shortName(TRUE)
+ .server(ServerConfig.builder()
+ .bossThreads(1)
+ .workerThreads(1)
+ .build()
+ )
+ .build();
+
+ node1 = Nodes.singleNode("node-1-" + System.nanoTime(), config);
+ mailbox1 = node1.mailbox().build();
+ pid1 = mailbox1.getPid();
+
+ node2 = Nodes.singleNode("node-2-" + System.nanoTime(), config);
+ mailbox2 = node2.mailbox().build();
+ pid2 = mailbox2.getPid();
+
+ data = binary(new byte[] { 1, 2, 3, 4, 5 });
+ }
+
+ @TearDown(Trial)
+ public void tearDown () {
+ mailbox1.close();
+ node1.close();
+
+ mailbox2.close();
+ node2.close();
+ }
+ }
+
+ @State(Benchmark)
+ public static class OneDirectionSendState {
+
+ Node serverNode;
+
+ Mailbox serverMailbox;
+
+ ErlangPid serverMailboxPid;
+
+ Thread serverThread;
+
+ Node clientNode;
+
+ Mailbox clientMailbox;
+
+ ErlangTerm data;
+
+ @Setup(Trial)
+ public void setup () throws Exception {
+ NodeConfig config = NodeConfig.builder()
+ .shortName(TRUE)
+ .server(ServerConfig.builder()
+ .bossThreads(1)
+ .workerThreads(1)
+ .build()
+ )
+ .build();
+
+ serverNode = Nodes.singleNode("node-server-" + System.nanoTime(), config);
+ serverMailbox = serverNode.mailbox().build();
+ serverMailboxPid = serverMailbox.getPid();
+
+ serverThread = new Thread(() -> {
+ try {
+ while (!java.lang.Thread.interrupted()) {
+ serverMailbox.receive();
+ }
+ } catch (Throwable ex) {
+ }
+ });
+ serverThread.start();
+
+ clientNode = Nodes.singleNode("node-client-" + System.nanoTime(), config);
+ clientMailbox = clientNode.mailbox().build();
+
+ data = binary(new byte[] { 1, 2, 3, 4, 5 });
+ }
+
+ @TearDown(Trial)
+ public void tearDown () {
+ clientMailbox.close();
+ clientNode.close();
+
+ serverMailbox.close();
+ serverNode.close();
+
+ serverThread.interrupt();
+ }
+ }
+}
diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java
deleted file mode 100644
index 43a881f..0000000
--- a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterfaceBenchmark.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright 2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.appulse.encon.benchmark;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.openjdk.jmh.annotations.Level.Trial;
-import static org.openjdk.jmh.annotations.Mode.Throughput;
-import static org.openjdk.jmh.annotations.Scope.Thread;
-
-import com.ericsson.otp.erlang.OtpErlangAtom;
-import com.ericsson.otp.erlang.OtpErlangBinary;
-import com.ericsson.otp.erlang.OtpErlangInt;
-import com.ericsson.otp.erlang.OtpErlangList;
-import com.ericsson.otp.erlang.OtpErlangObject;
-import com.ericsson.otp.erlang.OtpErlangPid;
-import com.ericsson.otp.erlang.OtpErlangString;
-import com.ericsson.otp.erlang.OtpErlangTuple;
-import com.ericsson.otp.erlang.OtpMbox;
-import com.ericsson.otp.erlang.OtpNode;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-
-/**
- *
- * @since 1.6.0
- * @author Artem Labazin
- */
-public class JInterfaceBenchmark {
-
- @State(Thread)
- public static class MailboxesState {
-
- @Setup(Trial)
- public void setup () throws Exception {
- node = new OtpNode("node-" + System.nanoTime() + "@localhost");
-
- mailbox1 = node.createMbox();
- pid1 = mailbox1.self();
-
- mailbox2 = node.createMbox();
- pid2 = mailbox2.self();
-
- message = new OtpErlangTuple(new OtpErlangObject[] {
- new OtpErlangAtom("ok"),
- new OtpErlangTuple(new OtpErlangObject[] {
- new OtpErlangBinary(new byte[] { 56, 16, 78 }),
- new OtpErlangInt(42),
- new OtpErlangString("Hello world"),
- new OtpErlangList(new OtpErlangObject[] {
- new OtpErlangInt(1),
- new OtpErlangInt(2),
- new OtpErlangInt(3)
- })
- })
- });
- }
-
- @TearDown(Trial)
- public void tearDown () {
- mailbox1.close();
- mailbox2.close();
- node.close();
- }
-
- OtpNode node;
-
- OtpMbox mailbox1;
-
- OtpErlangPid pid1;
-
- OtpMbox mailbox2;
-
- OtpErlangPid pid2;
-
- OtpErlangObject message;
- }
-
- @Benchmark
- @BenchmarkMode(Throughput)
- @OutputTimeUnit(SECONDS)
- public OtpErlangObject mailbox2mailbox (MailboxesState state) throws Exception {
- state.mailbox1.send(state.pid2, state.message);
-
- OtpErlangObject one = state.mailbox2.receive();
- state.mailbox2.send(state.pid1, one);
-
- OtpErlangObject two = state.mailbox1.receive();
- return two;
- }
-
- @State(Thread)
- public static class NodesState {
-
- @Setup(Trial)
- public void setup () throws Exception {
- node1 = new OtpNode("node-" + System.nanoTime() + "@localhost");
-
- mailbox1 = node1.createMbox();
- pid1 = mailbox1.self();
-
- node2 = new OtpNode("node-" + System.nanoTime() + "@localhost");
-
- mailbox2 = node2.createMbox();
- pid2 = mailbox2.self();
-
- message = new OtpErlangTuple(new OtpErlangObject[] {
- new OtpErlangAtom("ok"),
- new OtpErlangTuple(new OtpErlangObject[] {
- new OtpErlangBinary(new byte[] { 56, 16, 78 }),
- new OtpErlangInt(42),
- new OtpErlangString("Hello world"),
- new OtpErlangList(new OtpErlangObject[] {
- new OtpErlangInt(1),
- new OtpErlangInt(2),
- new OtpErlangInt(3)
- })
- })
- });
- }
-
- @TearDown(Trial)
- public void tearDown () {
- mailbox1.close();
- node1.close();
- mailbox2.close();
- node2.close();
- }
-
- OtpNode node1;
-
- OtpMbox mailbox1;
-
- OtpErlangPid pid1;
-
- OtpNode node2;
-
- OtpMbox mailbox2;
-
- OtpErlangPid pid2;
-
- OtpErlangObject message;
- }
-
- @Benchmark
- @BenchmarkMode(Throughput)
- @OutputTimeUnit(SECONDS)
- public OtpErlangObject oneDirection (NodesState nodes) throws Exception {
- nodes.mailbox1.send(nodes.pid2, nodes.message);
-
- OtpErlangObject request = nodes.mailbox2.receive();
- return request;
- }
-
- @Benchmark
- @BenchmarkMode(Throughput)
- @OutputTimeUnit(SECONDS)
- public OtpErlangObject node2node (NodesState state) throws Exception {
- state.mailbox1.send(state.pid2, state.message);
-
- OtpErlangObject one = state.mailbox2.receive();
- state.mailbox2.send(state.pid1, one);
-
- OtpErlangObject two = state.mailbox1.receive();
- return two;
- }
-}
diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_Node2NodeBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_Node2NodeBenchmarks.java
new file mode 100644
index 0000000..8cee0be
--- /dev/null
+++ b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_Node2NodeBenchmarks.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appulse.encon.benchmark;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.openjdk.jmh.annotations.Level.Trial;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+import static org.openjdk.jmh.annotations.Scope.Benchmark;
+
+import java.util.stream.IntStream;
+
+import com.ericsson.otp.erlang.OtpErlangBinary;
+import com.ericsson.otp.erlang.OtpErlangObject;
+import com.ericsson.otp.erlang.OtpErlangPid;
+import com.ericsson.otp.erlang.OtpMbox;
+import com.ericsson.otp.erlang.OtpNode;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.infra.ThreadParams;
+
+/**
+ *
+ * @since 1.6.0
+ * @author Artem Labazin
+ */
+@State(Benchmark)
+@OutputTimeUnit(SECONDS)
+@Warmup(iterations = 1)
+@BenchmarkMode(Throughput)
+@Measurement(iterations = 1)
+public class JInterface_Node2NodeBenchmarks {
+
+ OtpNode serverNode;
+
+ OtpMbox serverMailbox;
+
+ OtpErlangPid serverMailboxPid;
+
+ Thread serverThread;
+
+ OtpErlangObject data;
+
+ OtpNode clientNode;
+
+ OtpMbox[] clientMailboxes;
+
+ @Setup(Trial)
+ public void setup () throws Exception {
+ serverNode = new OtpNode("node-server-" + System.nanoTime() + "@localhost");
+ serverMailbox = serverNode.createMbox();
+ serverMailboxPid = serverMailbox.self();
+ data = new OtpErlangBinary(new byte[] { 1, 2, 3, 4, 5 });
+
+ serverThread = new Thread(() -> {
+ try {
+ while (!java.lang.Thread.currentThread().isInterrupted()) {
+ OtpErlangObject message = serverMailbox.receive();
+ serverMailbox.send((OtpErlangPid) message, data);
+ }
+ } catch (Throwable ex) {
+ }
+ });
+ serverThread.start();
+
+ clientNode = new OtpNode("node-client-" + System.nanoTime() + "@localhost");
+ clientMailboxes = IntStream.range(0, 8)
+ .boxed()
+ .map(it -> clientNode.createMbox())
+ .toArray(OtpMbox[]::new);
+ }
+
+ @TearDown(Trial)
+ public void tearDown () throws Exception {
+ for (OtpMbox mailbox : clientMailboxes) {
+ mailbox.close();
+ }
+ clientNode.close();
+
+ serverMailbox.close();
+ serverNode.close();
+
+ serverThread.interrupt();
+ }
+
+ @Threads(1)
+ @Benchmark
+ public void client_1 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ OtpMbox mailbox = clientMailboxes[0];
+ mailbox.send(serverMailboxPid, mailbox.self());
+ blackHole.consume(mailbox.receive());
+ }
+
+ @Threads(2)
+ @Benchmark
+ public void clients_2 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ OtpMbox mailbox = clientMailboxes[thredParams.getThreadIndex()];
+ mailbox.send(serverMailboxPid, mailbox.self());
+ blackHole.consume(mailbox.receive());
+ }
+
+ @Threads(4)
+ @Benchmark
+ public void clients_4 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ OtpMbox mailbox = clientMailboxes[thredParams.getThreadIndex()];
+ mailbox.send(serverMailboxPid, mailbox.self());
+ blackHole.consume(mailbox.receive());
+ }
+
+ @Threads(8)
+ @Benchmark
+ public void clients_8 (ThreadParams thredParams, Blackhole blackHole) throws Exception {
+ OtpMbox mailbox = clientMailboxes[thredParams.getThreadIndex()];
+ mailbox.send(serverMailboxPid, mailbox.self());
+ blackHole.consume(mailbox.receive());
+ }
+}
diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_SimpleBenchmarks.java b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_SimpleBenchmarks.java
new file mode 100644
index 0000000..dac0a09
--- /dev/null
+++ b/benchmark/src/main/java/io/appulse/encon/benchmark/JInterface_SimpleBenchmarks.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appulse.encon.benchmark;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.openjdk.jmh.annotations.Level.Trial;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+import static org.openjdk.jmh.annotations.Scope.Benchmark;
+
+import com.ericsson.otp.erlang.OtpErlangBinary;
+import com.ericsson.otp.erlang.OtpErlangObject;
+import com.ericsson.otp.erlang.OtpErlangPid;
+import com.ericsson.otp.erlang.OtpMbox;
+import com.ericsson.otp.erlang.OtpNode;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ *
+ * @since 1.6.0
+ * @author Artem Labazin
+ */
+@OutputTimeUnit(SECONDS)
+@Warmup(iterations = 1)
+@BenchmarkMode(Throughput)
+@Measurement(iterations = 1)
+public class JInterface_SimpleBenchmarks {
+
+ @Benchmark
+ public void mailbox2mailboxAndBack (Mailbox2MailboxAndBackState state, Blackhole blackHole) throws Exception {
+ state.mailbox1.send(state.pid2, state.data);
+ OtpErlangObject message = state.mailbox2.receive();
+ state.mailbox2.send(state.pid1, message);
+ blackHole.consume(state.mailbox1.receive());
+ }
+
+ @Benchmark
+ public void oneDirectionSend (OneDirectionSendState state, Blackhole blackHole) throws Exception {
+ state.clientMailbox.send(state.serverMailboxPid, state.data);
+ blackHole.consume(state.serverMailbox.receive());
+ }
+
+ @State(Benchmark)
+ public static class Mailbox2MailboxAndBackState {
+
+ OtpNode node1;
+
+ OtpMbox mailbox1;
+
+ OtpErlangPid pid1;
+
+ OtpNode node2;
+
+ OtpMbox mailbox2;
+
+ OtpErlangPid pid2;
+
+ OtpErlangObject data;
+
+ @Setup(Trial)
+ public void setup () throws Exception {
+ node1 = new OtpNode("node-1-" + System.nanoTime() + "@localhost");
+ mailbox1 = node1.createMbox();
+ pid1 = mailbox1.self();
+
+ node2 = new OtpNode("node-2-" + System.nanoTime() + "@localhost");
+ mailbox2 = node2.createMbox();
+ pid2 = mailbox2.self();
+
+ data = new OtpErlangBinary(new byte[] { 1, 2, 3, 4, 5 });
+ }
+
+ @TearDown(Trial)
+ public void tearDown () {
+ mailbox1.close();
+ node1.close();
+
+ mailbox2.close();
+ node2.close();
+ }
+ }
+
+ @State(Benchmark)
+ public static class OneDirectionSendState {
+
+ OtpNode serverNode;
+
+ OtpMbox serverMailbox;
+
+ OtpErlangPid serverMailboxPid;
+
+ Thread serverThread;
+
+ OtpNode clientNode;
+
+ OtpMbox clientMailbox;
+
+ OtpErlangObject data;
+
+ @Setup(Trial)
+ public void setup () throws Exception {
+ serverNode = new OtpNode("node-server-" + System.nanoTime() + "@localhost");
+ serverMailbox = serverNode.createMbox();
+ serverMailboxPid = serverMailbox.self();
+
+ serverThread = new Thread(() -> {
+ try {
+ while (!java.lang.Thread.interrupted()) {
+ serverMailbox.receive();
+ }
+ } catch (Throwable ex) {
+ }
+ });
+ serverThread.start();
+
+ clientNode = new OtpNode("node-client-" + System.nanoTime() + "@localhost");
+ clientMailbox = clientNode.createMbox();
+
+ data = new OtpErlangBinary(new byte[] { 1, 2, 3, 4, 5 });
+ }
+
+ @TearDown(Trial)
+ public void tearDown () {
+ clientMailbox.close();
+ clientNode.close();
+
+ serverMailbox.close();
+ serverNode.close();
+
+ serverThread.interrupt();
+ }
+ }
+}
diff --git a/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java b/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java
index 0c08909..b6cabde 100644
--- a/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java
+++ b/benchmark/src/main/java/io/appulse/encon/benchmark/Main.java
@@ -115,7 +115,10 @@ private static void startEpmd () {
}
executor = Executors.newSingleThreadExecutor();
- server = new ServerCommandExecutor(new CommonOptions(), new ServerCommandOptions());
+ ServerCommandOptions options = new ServerCommandOptions();
+ options.setChecks(true);
+
+ server = new ServerCommandExecutor(new CommonOptions(), options);
executor.execute(server::execute);
}
diff --git a/benchmark/src/main/resources/logback.xml b/benchmark/src/main/resources/logback.xml
new file mode 100644
index 0000000..1e462a4
--- /dev/null
+++ b/benchmark/src/main/resources/logback.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/encon-common/pom.xml b/encon-common/pom.xml
index 7c603fb..e9aa521 100644
--- a/encon-common/pom.xml
+++ b/encon-common/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
encon-common
diff --git a/encon-common/src/main/java/io/appulse/encon/common/LruCache.java b/encon-common/src/main/java/io/appulse/encon/common/LruCache.java
new file mode 100644
index 0000000..e718862
--- /dev/null
+++ b/encon-common/src/main/java/io/appulse/encon/common/LruCache.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.appulse.encon.common;
+
+import java.util.LinkedHashMap;
+import java.util.Map.Entry;
+
+/**
+ *
+ * @since 1.6.2
+ * @author Artem Labazin
+ */
+public class LruCache extends LinkedHashMap {
+
+ private static final long serialVersionUID = -1100634446524987320L;
+
+ private final int maxSize;
+
+ public LruCache (int maxSize) {
+ super(maxSize + 1, 1.0F, true);
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ protected boolean removeEldestEntry (Entry eldest) {
+ return size() > maxSize;
+ }
+}
diff --git a/encon-config/README.md b/encon-config/README.md
index f7773d8..b1643d6 100644
--- a/encon-config/README.md
+++ b/encon-config/README.md
@@ -14,7 +14,7 @@ First of all, add config's dependency:
io.appulse.encon
encon-config
- 1.6.2
+ 1.6.3
...
@@ -23,7 +23,7 @@ First of all, add config's dependency:
**Gradle**:
```groovy
-compile 'io.appulse.encon:encon-config:1.6.2'
+compile 'io.appulse.encon:encon-config:1.6.3'
```
### File based configuration
diff --git a/encon-config/pom.xml b/encon-config/pom.xml
index 7681346..d7a6e2a 100644
--- a/encon-config/pom.xml
+++ b/encon-config/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
encon-config
diff --git a/encon-databind/README.md b/encon-databind/README.md
index 6612d95..3845a6e 100644
--- a/encon-databind/README.md
+++ b/encon-databind/README.md
@@ -14,12 +14,12 @@ First of all, add databind's dependency:
io.appulse.encon
encon
- 1.6.2
+ 1.6.3
io.appulse.encon
encon-databind
- 1.6.2
+ 1.6.3
...
@@ -28,8 +28,8 @@ First of all, add databind's dependency:
**Gradle**:
```groovy
-compile 'io.appulse.encon:encon:1.6.2'
-compile 'io.appulse.encon:encon-databind:1.6.2'
+compile 'io.appulse.encon:encon:1.6.3'
+compile 'io.appulse.encon:encon-databind:1.6.3'
```
Let's imagine, you have POJO like this:
diff --git a/encon-databind/pom.xml b/encon-databind/pom.xml
index 6389cea..e91a7e1 100644
--- a/encon-databind/pom.xml
+++ b/encon-databind/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
encon-databind
diff --git a/encon-handler/README.md b/encon-handler/README.md
index b300d3f..8703472 100644
--- a/encon-handler/README.md
+++ b/encon-handler/README.md
@@ -14,7 +14,7 @@ First of all, add dependency:
io.appulse.encon
encon-handler
- 1.6.2
+ 1.6.3
...
@@ -23,7 +23,7 @@ First of all, add dependency:
**Gradle**:
```groovy
-compile 'io.appulse.encon:encon-handler:1.6.2'
+compile 'io.appulse.encon:encon-handler:1.6.3'
```
### Basics
diff --git a/encon-handler/pom.xml b/encon-handler/pom.xml
index 5be1c5d..1d33027 100644
--- a/encon-handler/pom.xml
+++ b/encon-handler/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
encon-handler
diff --git a/encon-spring/pom.xml b/encon-spring/pom.xml
index 477a32a..d8abe1d 100644
--- a/encon-spring/pom.xml
+++ b/encon-spring/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
encon-spring
diff --git a/encon-terms/pom.xml b/encon-terms/pom.xml
index aaddeca..529f970 100644
--- a/encon-terms/pom.xml
+++ b/encon-terms/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
encon-terms
@@ -58,7 +58,7 @@ limitations under the License.
io.netty
netty-buffer
- provided
+
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java b/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java
index 6ccc046..07af9ce 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/Erlang.java
@@ -53,7 +53,7 @@ public final class Erlang {
/**
* Cached enpty (0-size string) {@link ErlangAtom} instance.
*/
- public static final ErlangAtom EMPTY_ATOM = new ErlangAtom("");
+ public static final ErlangAtom EMPTY_ATOM = ErlangAtom.cached("");
/**
* Creates new {@link ErlangAtom} instance from {@code boolean} (true/false value) .
@@ -63,7 +63,7 @@ public final class Erlang {
* @return {@link ErlangAtom} new instance
*/
public static ErlangAtom atom (boolean value) {
- return new ErlangAtom(value);
+ return ErlangAtom.cached(value);
}
/**
@@ -74,7 +74,7 @@ public static ErlangAtom atom (boolean value) {
* @return {@link ErlangAtom} new instance
*/
public static ErlangAtom atom (@NonNull String value) {
- return new ErlangAtom(value);
+ return ErlangAtom.cached(value);
}
/**
@@ -85,7 +85,7 @@ public static ErlangAtom atom (@NonNull String value) {
* @return {@link ErlangInteger} new instance
*/
public static ErlangInteger number (char value) {
- return new ErlangInteger(value);
+ return ErlangInteger.cached(value);
}
/**
@@ -96,7 +96,7 @@ public static ErlangInteger number (char value) {
* @return {@link ErlangInteger} new instance
*/
public static ErlangInteger number (byte value) {
- return new ErlangInteger(value);
+ return ErlangInteger.cached(value);
}
/**
@@ -107,7 +107,7 @@ public static ErlangInteger number (byte value) {
* @return {@link ErlangInteger} new instance
*/
public static ErlangInteger number (short value) {
- return new ErlangInteger(value);
+ return ErlangInteger.cached(value);
}
/**
@@ -118,7 +118,7 @@ public static ErlangInteger number (short value) {
* @return {@link ErlangInteger} new instance
*/
public static ErlangInteger number (int value) {
- return new ErlangInteger(value);
+ return ErlangInteger.cached(value);
}
/**
@@ -129,7 +129,7 @@ public static ErlangInteger number (int value) {
* @return {@link ErlangInteger} new instance
*/
public static ErlangInteger number (long value) {
- return new ErlangInteger(value);
+ return ErlangInteger.cached(value);
}
/**
@@ -140,7 +140,7 @@ public static ErlangInteger number (long value) {
* @return {@link ErlangInteger} new instance
*/
public static ErlangInteger number (@NonNull BigInteger value) {
- return new ErlangInteger(value);
+ return ErlangInteger.cached(value);
}
/**
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java
index 38d64cb..e697902 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/ErlangTerm.java
@@ -95,7 +95,7 @@ public static T newInstance (@NonNull ByteBuf buffer) {
case INTEGER:
case SMALL_BIG:
case LARGE_BIG:
- return (T) new ErlangInteger(type, buffer);
+ return (T) ErlangInteger.cached(type, buffer);
case FLOAT:
case NEW_FLOAT:
return (T) new ErlangFloat(type, buffer);
@@ -108,7 +108,7 @@ public static T newInstance (@NonNull ByteBuf buffer) {
return (T) new ErlangPort(type, buffer);
case PID:
case NEW_PID:
- return (T) new ErlangPid(type, buffer);
+ return (T) ErlangPid.cached(type, buffer);
case SMALL_TUPLE:
case LARGE_TUPLE:
return (T) new ErlangTuple(type, buffer);
@@ -133,7 +133,7 @@ public static T newInstance (@NonNull ByteBuf buffer) {
case SMALL_ATOM_UTF8:
case ATOM:
case SMALL_ATOM:
- return (T) new ErlangAtom(type, buffer);
+ return (T) ErlangAtom.cached(type, buffer);
default:
val message = String.format("Unknown term type %s (%d)", type.name(), typeByte);
throw new ErlangTermDecodeException(message);
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java b/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java
index feed999..85b4c73 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/TermType.java
@@ -16,8 +16,13 @@
package io.appulse.encon.terms;
+import static java.util.stream.Collectors.toMap;
import static lombok.AccessLevel.PRIVATE;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
import io.appulse.encon.terms.type.ErlangAtom;
import io.appulse.encon.terms.type.ErlangBinary;
import io.appulse.encon.terms.type.ErlangBitString;
@@ -36,7 +41,6 @@
import lombok.Getter;
import lombok.experimental.FieldDefaults;
-import lombok.val;
/**
* Enumeration of all available term types.
@@ -945,6 +949,13 @@ public enum TermType {
*/
UNKNOWN(-1);
+ private static final Map VALUES;
+
+ static {
+ VALUES = Stream.of(TermType.values())
+ .collect(toMap(it -> (int) it.getCode(), Function.identity()));
+ }
+
@Getter
byte code;
@@ -967,12 +978,7 @@ public enum TermType {
*
* @return parsed {@link TermType} instance
*/
- public static TermType of (byte code) {
- for (val type : values()) {
- if (type.getCode() == code) {
- return type;
- }
- }
- return UNKNOWN;
+ public static TermType of (int code) {
+ return VALUES.getOrDefault(code, UNKNOWN);
}
}
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java b/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java
index 77bcad1..001a946 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/term/CollectionTerm.java
@@ -28,8 +28,8 @@
import java.util.Map;
import java.util.Optional;
+import io.appulse.encon.terms.Erlang;
import io.appulse.encon.terms.ErlangTerm;
-import io.appulse.encon.terms.type.ErlangAtom;
import io.appulse.encon.terms.type.ErlangList;
import io.appulse.encon.terms.type.ErlangMap;
import io.appulse.encon.terms.type.ErlangTuple;
@@ -212,7 +212,7 @@ default ErlangTerm getUnsafe (int index) {
* key term. {@link Optional#empty} otherwise.
*/
default Optional getByAtom (@NonNull String fieldName) {
- val term = new ErlangAtom(fieldName);
+ val term = Erlang.atom(fieldName);
return get(term);
}
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java
index 7517984..e8138b7 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangAtom.java
@@ -21,21 +21,25 @@
import static io.appulse.encon.terms.TermType.SMALL_ATOM_UTF8;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Locale.ENGLISH;
import static lombok.AccessLevel.PRIVATE;
import java.nio.charset.Charset;
+import java.util.Arrays;
+import io.appulse.encon.common.LruCache;
import io.appulse.encon.terms.ErlangTerm;
import io.appulse.encon.terms.TermType;
import io.appulse.encon.terms.exception.IllegalErlangTermTypeException;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ByteProcessor;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.FieldDefaults;
import lombok.experimental.NonFinal;
-import lombok.val;
/**
* An atom is a literal, a constant with name. An atom is to be enclosed in
@@ -53,44 +57,56 @@ public class ErlangAtom extends ErlangTerm {
private static final long serialVersionUID = -2748345367418129439L;
- private static final byte[] TRUE_BYTES = Boolean.TRUE.toString().getBytes(UTF_8);
-
- private static final byte[] FALSE_BYTES = Boolean.FALSE.toString().getBytes(UTF_8);
-
private static final int MAX_ATOM_CODE_POINTS_LENGTH = 255;
private static final int MAX_SMALL_ATOM_BYTES_LENGTH = 255;
- @NonFinal
- String value;
+ private static final LruCache CACHE = new LruCache<>(1000);
- byte[] bytes;
+ private static final ErlangAtom ATOM_TRUE = cached(Boolean.TRUE.toString().toLowerCase(ENGLISH));
- transient Charset charset;
+ private static final ErlangAtom ATOM_FALSE = cached(Boolean.FALSE.toString().toLowerCase(ENGLISH));
+
+ public static ErlangAtom cached (boolean value) {
+ return value
+ ? ATOM_TRUE
+ : ATOM_FALSE;
+ }
+
+ public static ErlangAtom cached (String value) {
+ Charset charset = UTF_8;
+ byte[] bytes = value.getBytes(charset);
+ int hashCode = Arrays.hashCode(bytes);
+ return CACHE.computeIfAbsent(hashCode, key -> new ErlangAtom(value, charset, bytes));
+ }
- /**
- * Constructs Erlang's term object with specific {@link TermType} from {@link ByteBuf}.
- *
- * @param type object's type
- *
- * @param buffer byte buffer
- */
@SuppressWarnings("deprecation")
- public ErlangAtom (TermType type, @NonNull ByteBuf buffer) {
- super(type);
+ public static ErlangAtom cached (TermType type, ByteBuf buffer) {
+ ByteArrayHashCode byteProcessor = new ByteArrayHashCode();
- val length = type == SMALL_ATOM || type == SMALL_ATOM_UTF8
+ int length = type == SMALL_ATOM || type == SMALL_ATOM_UTF8
? buffer.readUnsignedByte()
: buffer.readUnsignedShort();
- charset = type == SMALL_ATOM_UTF8 || type == ATOM_UTF8
- ? UTF_8
- : ISO_8859_1;
+ buffer.forEachByte(buffer.readerIndex(), length, byteProcessor);
- bytes = new byte[length];
- buffer.readBytes(bytes);
+ return CACHE.compute(byteProcessor.getHashCode(), (key, value) -> {
+ if (value == null) {
+ return new ErlangAtom(type, buffer, length);
+ } else {
+ buffer.skipBytes(length);
+ return value;
+ }
+ });
}
+ @NonFinal
+ String value;
+
+ byte[] bytes;
+
+ transient Charset charset;
+
/**
* Constructs Erlang's atom object with specific {@link String} value.
*
@@ -124,8 +140,47 @@ public ErlangAtom (boolean value) {
charset = UTF_8;
this.value = Boolean.toString(value);
bytes = value
- ? TRUE_BYTES
- : FALSE_BYTES;
+ ? Boolean.TRUE.toString().getBytes(charset)
+ : Boolean.FALSE.toString().getBytes(charset);
+ }
+
+ /**
+ * Constructs Erlang's term object with specific {@link TermType} from {@link ByteBuf}.
+ *
+ * @param type object's type
+ *
+ * @param buffer byte buffer
+ *
+ * @param length amount of useful bytes
+ */
+ private ErlangAtom (TermType type, ByteBuf buffer, int length) {
+ super(type);
+
+ charset = type == SMALL_ATOM_UTF8 || type == ATOM_UTF8
+ ? UTF_8
+ : ISO_8859_1;
+
+ bytes = new byte[length];
+ buffer.readBytes(bytes);
+ }
+
+ @SuppressWarnings("PMD.ArrayIsStoredDirectly")
+ private ErlangAtom (String value, Charset charset, byte[] bytes) {
+ super();
+
+ this.value = value.codePointCount(0, value.length()) <= MAX_ATOM_CODE_POINTS_LENGTH
+ ? value
+ // Throwing an exception would be better I think, but truncation
+ // seems to be the way it has been done in other parts of OTP...
+ : new String(value.codePoints().toArray(), 0, MAX_ATOM_CODE_POINTS_LENGTH);
+
+ this.charset = charset;
+ this.bytes = bytes;
+ if (bytes.length > MAX_SMALL_ATOM_BYTES_LENGTH) {
+ setType(ATOM_UTF8);
+ } else {
+ setType(SMALL_ATOM_UTF8);
+ }
}
@Override
@@ -175,4 +230,17 @@ protected void serialize (ByteBuf buffer) {
}
buffer.writeBytes(bytes);
}
+
+ @Getter
+ @FieldDefaults(level = PRIVATE)
+ private static class ByteArrayHashCode implements ByteProcessor {
+
+ int hashCode = 1;
+
+ @Override
+ public boolean process (byte value) throws Exception {
+ hashCode = 31 * hashCode + value;
+ return true;
+ }
+ }
}
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java
index f1aaa85..417cb62 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangInteger.java
@@ -26,18 +26,21 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
-import java.util.stream.IntStream;
+import io.appulse.encon.common.LruCache;
import io.appulse.encon.terms.ErlangTerm;
import io.appulse.encon.terms.TermType;
import io.appulse.encon.terms.exception.ErlangTermDecodeException;
import io.appulse.encon.terms.exception.IllegalErlangTermTypeException;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ByteProcessor;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.FieldDefaults;
+import lombok.experimental.NonFinal;
import lombok.val;
/**
@@ -53,35 +56,95 @@ public class ErlangInteger extends ErlangTerm {
private static final long serialVersionUID = -1757584303003802030L;
- private static final int MAX_SMALL_INTEGER;
+ private static final int MAX_SMALL_INTEGER = 255;
- private static final int MAX_INTEGER;
+ private static final int MAX_INTEGER = (1 << 27) - 1;
- private static final int MIN_INTEGER;
+ private static final int MIN_INTEGER = -(1 << 27) - 1;
- private static final int MIN_CACHE;
+ private static final int MAX_SMALL_BIG_BYTES_LENGTH = 255;
- private static final int MAX_CACHE;
+ private static final LruCache CACHE = new LruCache<>(1000);
- private static final int MAX_SMALL_BIG_BYTES_LENGTH;
+ /**
+ * Creates cached {@link ErlangInteger} value.
+ *
+ * @param number integer value
+ *
+ * @return new or cached {@link ErlangInteger} object
+ */
+ public static ErlangInteger cached (byte value) {
+ int hashCode = 31 + value;
- private static final ErlangInteger[] CACHE;
+ ErlangInteger result = CACHE.get(hashCode);
+ if (result != null) {
+ return result;
+ }
+ result = new ErlangInteger(value);
+ CACHE.put(hashCode, result);
+ return result;
+ }
- static {
- MAX_SMALL_INTEGER = 255;
+ /**
+ * Creates cached {@link ErlangInteger} value.
+ *
+ * @param number integer value
+ *
+ * @return new or cached {@link ErlangInteger} object
+ */
+ public static ErlangInteger cached (char value) {
+ int hashCode = 31 + (byte) (value >> 8);
+ hashCode = 31 * hashCode + (byte) value;
- MAX_INTEGER = (1 << 27) - 1;
- MIN_INTEGER = -(1 << 27) - 1;
+ ErlangInteger result = CACHE.get(hashCode);
+ if (result != null) {
+ return result;
+ }
+ result = new ErlangInteger(value);
+ CACHE.put(hashCode, result);
+ return result;
+ }
- MIN_CACHE = -1;
- MAX_CACHE = 256;
+ /**
+ * Creates cached {@link ErlangInteger} value.
+ *
+ * @param number integer value
+ *
+ * @return new or cached {@link ErlangInteger} object
+ */
+ public static ErlangInteger cached (short value) {
+ int hashCode = 31 + (byte) (value >> 8);
+ hashCode = 31 * hashCode + (byte) value;
- MAX_SMALL_BIG_BYTES_LENGTH = 255;
+ ErlangInteger result = CACHE.get(hashCode);
+ if (result != null) {
+ return result;
+ }
+ result = new ErlangInteger(value);
+ CACHE.put(hashCode, result);
+ return result;
+ }
- CACHE = IntStream.range(MIN_CACHE, MAX_CACHE)
- .boxed()
- .map(ErlangInteger::new)
- .toArray(ErlangInteger[]::new);
+ /**
+ * Creates cached {@link ErlangInteger} value.
+ *
+ * @param number integer value
+ *
+ * @return new or cached {@link ErlangInteger} object
+ */
+ public static ErlangInteger cached (int value) {
+ int hashCode = 31 + (byte) (value >> 24);
+ hashCode = 31 * hashCode + (byte) (value >> 16);
+ hashCode = 31 * hashCode + (byte) (value >> 8);
+ hashCode = 31 * hashCode + (byte) value;
+
+ ErlangInteger result = CACHE.get(hashCode);
+ if (result != null) {
+ return result;
+ }
+ result = new ErlangInteger(value);
+ CACHE.put(hashCode, result);
+ return result;
}
/**
@@ -91,14 +154,81 @@ public class ErlangInteger extends ErlangTerm {
*
* @return new or cached {@link ErlangInteger} object
*/
- public static ErlangInteger from (int number) {
- return number > MAX_CACHE || number < MIN_CACHE
- ? new ErlangInteger(number)
- : CACHE[number - MIN_CACHE];
+ public static ErlangInteger cached (long value) {
+ int hashCode = 31 + (byte) (value >> 56);
+ hashCode = 31 * hashCode + (byte) (value >> 48);
+ hashCode = 31 * hashCode + (byte) (value >> 40);
+ hashCode = 31 * hashCode + (byte) (value >> 32);
+ hashCode = 31 * hashCode + (byte) (value >> 24);
+ hashCode = 31 * hashCode + (byte) (value >> 16);
+ hashCode = 31 * hashCode + (byte) (value >> 8);
+ hashCode = 31 * hashCode + (byte) value;
+
+ ErlangInteger result = CACHE.get(hashCode);
+ if (result != null) {
+ return result;
+ }
+ result = new ErlangInteger(value);
+ CACHE.put(hashCode, result);
+ return result;
+ }
+
+ /**
+ * Creates cached {@link ErlangInteger} value.
+ *
+ * @param number integer value
+ *
+ * @return new or cached {@link ErlangInteger} object
+ */
+ public static ErlangInteger cached (BigInteger value) {
+ if (value.bitLength() < Long.BYTES) {
+ return cached(value.longValue());
+ }
+ // int hashCode = value.signum() == -1
+ // ? 32
+ // : 31;
+ // return CACHE.computeIfAbsent(hashCode, key -> new ErlangInteger(value));
+ return new ErlangInteger(value);
+ }
+
+ public static ErlangInteger cached (TermType type, @NonNull ByteBuf buffer) {
+ int index = buffer.readerIndex();
+ ByteArrayHashCode byteProcessor = new ByteArrayHashCode();
+
+ int length;
+ switch (type) {
+ case SMALL_INTEGER:
+ length = Byte.BYTES;
+ break;
+ case INTEGER:
+ length = Integer.BYTES;
+ break;
+ case SMALL_BIG:
+ length = buffer.readByte() + Byte.BYTES;
+ break;
+ case LARGE_BIG:
+ default:
+ length = buffer.readInt() + Byte.BYTES;
+ }
+
+ buffer.forEachByte(buffer.readerIndex(), length, byteProcessor);
+
+ return CACHE.compute(byteProcessor.getHashCode(), (key, value) -> {
+ if (value == null) {
+ buffer.readerIndex(index);
+ return new ErlangInteger(type, buffer);
+ } else {
+ buffer.skipBytes(length);
+ return value;
+ }
+ });
}
BigInteger value;
+ @NonFinal
+ byte[] cachedMagnitude;
+
/**
* Constructs Erlang's term object with specific {@link TermType} from {@link ByteBuf}.
*
@@ -297,16 +427,18 @@ protected void serialize (ByteBuf buffer) {
break;
case SMALL_BIG:
case LARGE_BIG:
- byte[] bytes = value.abs().toByteArray();
- int index = 0;
- for (; index < bytes.length && bytes[index] == 0; index++) {
- // skip leading zeros
+ if (cachedMagnitude == null) {
+ byte[] bytes = value.abs().toByteArray();
+ int index = 0;
+ for (; index < bytes.length && bytes[index] == 0; index++) {
+ // skip leading zeros
+ }
+
+ cachedMagnitude = Arrays.copyOfRange(bytes, index, bytes.length);
+ reverse(cachedMagnitude);
}
- byte[] magnitude = Arrays.copyOfRange(bytes, index, bytes.length);
- reverse(magnitude);
-
- int length = magnitude.length;
+ int length = cachedMagnitude.length;
if ((length & 0xFF) == length) {
buffer.writeByte(length); // length
} else {
@@ -316,7 +448,7 @@ protected void serialize (ByteBuf buffer) {
? 1
: 0;
buffer.writeByte(sign);
- buffer.writeBytes(magnitude);
+ buffer.writeBytes(cachedMagnitude);
break;
default:
throw new IllegalErlangTermTypeException(getClass(), getType());
@@ -347,4 +479,17 @@ private void reverse (byte[] data) {
right--;
}
}
+
+ @Getter
+ @FieldDefaults(level = PRIVATE)
+ private static class ByteArrayHashCode implements ByteProcessor {
+
+ int hashCode = 1;
+
+ @Override
+ public boolean process (byte value) throws Exception {
+ hashCode = 31 * hashCode + value;
+ return true;
+ }
+ }
}
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java
index 9f6da5e..cf2abd9 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPid.java
@@ -17,15 +17,20 @@
package io.appulse.encon.terms.type;
import static io.appulse.encon.terms.TermType.PID;
+import static io.appulse.encon.terms.TermType.SMALL_ATOM;
+import static io.appulse.encon.terms.TermType.SMALL_ATOM_UTF8;
import static java.util.Optional.ofNullable;
import static lombok.AccessLevel.PRIVATE;
+import io.appulse.encon.common.LruCache;
import io.appulse.encon.common.NodeDescriptor;
+import io.appulse.encon.terms.Erlang;
import io.appulse.encon.terms.ErlangTerm;
import io.appulse.encon.terms.TermType;
import io.appulse.encon.terms.exception.IllegalErlangTermTypeException;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ByteProcessor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -46,6 +51,34 @@ public class ErlangPid extends ErlangTerm {
private static final long serialVersionUID = 7083159089429831665L;
+ private static final LruCache CACHE = new LruCache<>(1000);
+
+ @SuppressWarnings("deprecation")
+ public static ErlangPid cached (TermType type, ByteBuf buffer) {
+ int index = buffer.readerIndex();
+ byte nodeNameType = buffer.readByte();
+ int nodeNameLength = nodeNameType == SMALL_ATOM.getCode() || nodeNameType == SMALL_ATOM_UTF8.getCode()
+ ? buffer.readUnsignedByte()
+ : buffer.readUnsignedShort();
+
+ int length = type == PID
+ ? nodeNameLength + 9
+ : nodeNameLength + 12;
+
+ ByteArrayHashCode byteProcessor = new ByteArrayHashCode();
+ buffer.forEachByte(buffer.readerIndex(), length, byteProcessor);
+
+ return CACHE.compute(byteProcessor.getHashCode(), (key, value) -> {
+ if (value == null) {
+ buffer.readerIndex(index);
+ return new ErlangPid(type, buffer);
+ } else {
+ buffer.skipBytes(length);
+ return value;
+ }
+ });
+ }
+
@NonFinal
NodeDescriptor descriptor;
@@ -87,7 +120,7 @@ public ErlangPid (TermType type, @NonNull ByteBuf buffer) {
@Builder
private ErlangPid (TermType type, @NonNull String node, int id, int serial, int creation) {
super(ofNullable(type).orElse(PID));
- this.node = new ErlangAtom(node);
+ this.node = Erlang.atom(node);
if (getType() == PID) {
this.id = id & 0x7FFF; // 15 bits
@@ -150,4 +183,17 @@ protected void serialize (ByteBuf buffer) {
throw new IllegalErlangTermTypeException(getClass(), getType());
}
}
+
+ @Getter
+ @FieldDefaults(level = PRIVATE)
+ private static class ByteArrayHashCode implements ByteProcessor {
+
+ int hashCode = 1;
+
+ @Override
+ public boolean process (byte value) throws Exception {
+ hashCode = 31 * hashCode + value;
+ return true;
+ }
+ }
}
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java
index 2ff68a0..056e7b0 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangPort.java
@@ -21,6 +21,7 @@
import static lombok.AccessLevel.PRIVATE;
import io.appulse.encon.common.NodeDescriptor;
+import io.appulse.encon.terms.Erlang;
import io.appulse.encon.terms.ErlangTerm;
import io.appulse.encon.terms.TermType;
import io.appulse.encon.terms.exception.IllegalErlangTermTypeException;
@@ -94,7 +95,7 @@ public ErlangPort (TermType type, @NonNull ByteBuf buffer) {
@Builder
private ErlangPort (TermType type, @NonNull String node, int id, int creation) {
super(ofNullable(type).orElse(PORT));
- this.node = new ErlangAtom(node);
+ this.node = Erlang.atom(node);
if (getType() == PORT) {
this.id = id & 0xFFFFFFF; // 28 bits
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java
index ddd5c63..646c671 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangReference.java
@@ -23,6 +23,7 @@
import java.util.stream.LongStream;
import io.appulse.encon.common.NodeDescriptor;
+import io.appulse.encon.terms.Erlang;
import io.appulse.encon.terms.ErlangTerm;
import io.appulse.encon.terms.TermType;
import io.appulse.encon.terms.exception.IllegalErlangTermTypeException;
@@ -108,7 +109,7 @@ public ErlangReference (TermType type, @NonNull ByteBuf buffer) {
private ErlangReference (TermType type, @NonNull String node, long id, long[] ids, int creation) {
super(ofNullable(type).orElse(NEW_REFERENCE));
- this.node = new ErlangAtom(node);
+ this.node = Erlang.atom(node);
this.ids = ofNullable(ids)
.map(it -> it.clone())
.map(it -> {
diff --git a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java
index a56c401..b250284 100644
--- a/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java
+++ b/encon-terms/src/main/java/io/appulse/encon/terms/type/ErlangString.java
@@ -102,7 +102,7 @@ protected void serialize (ByteBuf buffer) {
case LIST:
val elements = value.codePoints()
.boxed()
- .map(ErlangInteger::from)
+ .map(ErlangInteger::cached)
.toArray(ErlangInteger[]::new);
buffer.writerIndex(positionBefore);
diff --git a/encon/README.md b/encon/README.md
index 59471c5..26ec9aa 100644
--- a/encon/README.md
+++ b/encon/README.md
@@ -21,7 +21,7 @@ Adding encon's dependency to your `JVM` app:
io.appulse.encon
encon
- 1.6.2
+ 1.6.3
...
@@ -30,7 +30,7 @@ Adding encon's dependency to your `JVM` app:
**Gradle**:
```groovy
-compile 'io.appulse.encon:encon:1.6.2'
+compile 'io.appulse.encon:encon:1.6.3'
```
## Start the Node
diff --git a/encon/pom.xml b/encon/pom.xml
index aefbcc7..0abdb65 100644
--- a/encon/pom.xml
+++ b/encon/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
io.appulse.encon
encon-parent
- 1.6.2
+ 1.6.3
encon
diff --git a/encon/src/main/java/io/appulse/encon/ModuleClient.java b/encon/src/main/java/io/appulse/encon/ModuleClient.java
index 298d132..a018606 100644
--- a/encon/src/main/java/io/appulse/encon/ModuleClient.java
+++ b/encon/src/main/java/io/appulse/encon/ModuleClient.java
@@ -16,7 +16,9 @@
package io.appulse.encon;
+import static io.netty.channel.ChannelOption.ALLOCATOR;
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import static io.netty.channel.ChannelOption.TCP_NODELAY;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -109,6 +111,8 @@ private CompletableFuture createConnection (@NonNull RemoteNode remo
.option(SO_KEEPALIVE, true)
.option(TCP_NODELAY, true)
.option(CONNECT_TIMEOUT_MILLIS, 5000)
+ .option(ALLOCATOR, moduleConnection.getAllocator())
+ .option(SINGLE_EVENTEXECUTOR_PER_GROUP, true)
.handler(HandshakeClientInitializer.builder()
.node(node)
.future(future)
diff --git a/encon/src/main/java/io/appulse/encon/ModuleConnection.java b/encon/src/main/java/io/appulse/encon/ModuleConnection.java
index 8616dcc..af92fa5 100644
--- a/encon/src/main/java/io/appulse/encon/ModuleConnection.java
+++ b/encon/src/main/java/io/appulse/encon/ModuleConnection.java
@@ -29,6 +29,8 @@
import io.appulse.encon.common.RemoteNode;
import io.appulse.encon.connection.Connection;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@@ -75,6 +77,9 @@ class ModuleConnection implements Closeable {
@Getter
Class extends ServerChannel> serverChannelClass;
+ @Getter
+ ByteBufAllocator allocator;
+
Map> cache;
ModuleConnection (@NonNull String prefix, int bossThreads, int workerThreads) {
@@ -94,6 +99,7 @@ class ModuleConnection implements Closeable {
clientChannelClass = NioSocketChannel.class;
serverChannelClass = NioServerSocketChannel.class;
}
+ allocator = new PooledByteBufAllocator(true);
}
@Override
diff --git a/encon/src/main/java/io/appulse/encon/ModuleServer.java b/encon/src/main/java/io/appulse/encon/ModuleServer.java
index e0c98ba..7c94115 100644
--- a/encon/src/main/java/io/appulse/encon/ModuleServer.java
+++ b/encon/src/main/java/io/appulse/encon/ModuleServer.java
@@ -17,8 +17,11 @@
package io.appulse.encon;
import static io.netty.channel.ChannelOption.ALLOCATOR;
+import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
import static io.netty.channel.ChannelOption.TCP_NODELAY;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
@@ -30,7 +33,6 @@
import io.appulse.encon.connection.handshake.HandshakeServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.logging.LoggingHandler;
import lombok.NonNull;
@@ -87,11 +89,15 @@ private void start () {
.build())
.option(SO_BACKLOG, 1024)
.option(SO_REUSEADDR, true)
+ .option(CONNECT_TIMEOUT_MILLIS, 5000)
+ .option(ALLOCATOR, moduleConnection.getAllocator())
+ .option(SINGLE_EVENTEXECUTOR_PER_GROUP, true)
.childOption(SO_REUSEADDR, true)
.childOption(SO_KEEPALIVE, true)
.childOption(TCP_NODELAY, true)
+ .childOption(SO_RCVBUF, 128 * 1024)
.childOption(WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(64 * 1024, 128 * 1024))
- .childOption(ALLOCATOR, new PooledByteBufAllocator(true))
+ .childOption(ALLOCATOR, moduleConnection.getAllocator())
.bind(port);
}
}
diff --git a/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java b/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java
index 1e65cc4..1acf92c 100644
--- a/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java
+++ b/encon/src/main/java/io/appulse/encon/connection/control/ControlMessage.java
@@ -99,7 +99,7 @@ public final ErlangTuple toTuple () {
val elements = Stream.of(elements())
.collect(toCollection(LinkedList::new));
- elements.addFirst(ErlangInteger.from(getTag().getCode()));
+ elements.addFirst(ErlangInteger.cached(getTag().getCode()));
return new ErlangTuple(elements);
}
diff --git a/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java b/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java
index 13a5b21..6fef69a 100644
--- a/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java
+++ b/encon/src/main/java/io/appulse/encon/connection/regular/ConnectionHandler.java
@@ -16,9 +16,14 @@
package io.appulse.encon.connection.regular;
+import static io.appulse.encon.connection.regular.Message.PASS_THROUGH_TAG;
+import static io.appulse.encon.connection.regular.Message.VERSION_TAG;
import static lombok.AccessLevel.PRIVATE;
+// import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
+// import static io.netty.util.internal.StringUtil.NEWLINE;
import java.io.Closeable;
+import java.util.List;
import java.util.function.Consumer;
import io.appulse.encon.Node;
@@ -31,13 +36,17 @@
import io.appulse.encon.connection.control.SendToRegisteredProcess;
import io.appulse.encon.connection.control.Unlink;
import io.appulse.encon.mailbox.Mailbox;
+import io.appulse.encon.terms.ErlangTerm;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.experimental.NonFinal;
import lombok.extern.slf4j.Slf4j;
@@ -45,13 +54,42 @@
/**
*
- * @since 1.0.0
+ * @since 1.6.2
* @author Artem Labazin
*/
@Slf4j
-@RequiredArgsConstructor
+@Builder
+@AllArgsConstructor
@FieldDefaults(level = PRIVATE, makeFinal = true)
-public final class ConnectionHandler extends ChannelInboundHandlerAdapter implements Closeable {
+public final class ConnectionHandler extends ByteToMessageDecoder implements Closeable {
+
+ private static final ByteBuf TICK_TOCK = Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0 });
+
+ private static ErlangTerm readTerm (ByteBuf buffer) {
+ val versionByte = buffer.readUnsignedByte();
+ if (versionByte != VERSION_TAG) {
+ throw new IllegalArgumentException("Wrong version byte. Expected 0x83 (131), but was: " + versionByte);
+ }
+ return ErlangTerm.newInstance(buffer);
+ }
+
+ // private static String formatByteBuf (ChannelHandlerContext ctx, String eventName, ByteBuf msg) {
+ // String chStr = ctx.channel().toString();
+ // int length = msg.readableBytes();
+ // if (length == 0) {
+ // StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 4);
+ // buf.append(chStr).append(' ').append(eventName).append(": 0B");
+ // return buf.toString();
+ // } else {
+ // int rows = length / 16 + (length % 15 == 0? 0 : 1) + 4;
+ // StringBuilder buf = new StringBuilder(chStr.length() + 1 + eventName.length() + 2 + 10 + 1 + 2 + rows * 80);
+
+ // buf.append(chStr).append(' ').append(eventName).append(": ").append(length).append('B').append(NEWLINE);
+ // appendPrettyHexDump(buf, msg);
+
+ // return buf.toString();
+ // }
+ // }
@NonNull
Node node;
@@ -96,23 +134,78 @@ public void channelInactive (ChannelHandlerContext context) throws Exception {
public void send (Message message) {
log.debug("Sending message\nto {}\n {}\n",
remote, message);
- if (!channel.isWritable()) {
- log.error("Channel for {} is not writable. Remote node is {}",
- channel.remoteAddress(), remote);
- throw new IllegalArgumentException("Channel is not writable");
- }
- channel.writeAndFlush(message);
+
+ val out = channel.alloc().buffer();
+ message.writeTo(out);
+
+ val messageLength = channel.alloc()
+ .buffer(Integer.BYTES)
+ .writeInt(out.readableBytes());
+
+ val popa = channel.alloc().compositeBuffer(2)
+ .addComponents(true, messageLength, out);
+
+ channel.writeAndFlush(popa);
}
@Override
- public void channelRead (ChannelHandlerContext context, Object obj) throws Exception {
- val message = (Message) obj;
+ protected void decode (ChannelHandlerContext context, ByteBuf buffer, List