ret = executor.submit(task);
+ return ret;
+ } else
+ return executor.submit(() -> r.call());
- final Future ret = executor.submit(task);
- return ret;
}
@Override
diff --git a/dempsy-framework.core/src/main/java/net/dempsy/threading/ThreadingModel.java b/dempsy-framework.core/src/main/java/net/dempsy/threading/ThreadingModel.java
index 764c1485..e41b4985 100644
--- a/dempsy-framework.core/src/main/java/net/dempsy/threading/ThreadingModel.java
+++ b/dempsy-framework.core/src/main/java/net/dempsy/threading/ThreadingModel.java
@@ -56,11 +56,6 @@ public default Thread newThread(final Runnable runnable, final String name) {
return new Thread(runnable, name);
}
- /**
- * How many pending tasks are there.
- */
- public int getNumberPending();
-
/**
* How many pending limited tasks are there
*/
diff --git a/dempsy-framework.core/src/main/java/net/dempsy/transport/Listener.java b/dempsy-framework.core/src/main/java/net/dempsy/transport/Listener.java
index 6a23de9c..31c08e6b 100644
--- a/dempsy-framework.core/src/main/java/net/dempsy/transport/Listener.java
+++ b/dempsy-framework.core/src/main/java/net/dempsy/transport/Listener.java
@@ -16,6 +16,8 @@
package net.dempsy.transport;
+import java.util.function.Supplier;
+
/**
*
* This is the core abstraction for receiving messages. The client side of a transport implementation (called an "Adaptor") needs to be wired to a MessageTransportListener
@@ -34,6 +36,10 @@ public interface Listener extends AutoCloseable {
*/
public boolean onMessage(T message) throws MessageTransportException;
+ public default boolean onMessage(final Supplier supplier) {
+ return onMessage(supplier.get());
+ }
+
@Override
public default void close() {}
diff --git a/dempsy-framework.core/src/main/java/net/dempsy/transport/Receiver.java b/dempsy-framework.core/src/main/java/net/dempsy/transport/Receiver.java
index e3453ff6..77604215 100644
--- a/dempsy-framework.core/src/main/java/net/dempsy/transport/Receiver.java
+++ b/dempsy-framework.core/src/main/java/net/dempsy/transport/Receiver.java
@@ -16,7 +16,7 @@
package net.dempsy.transport;
-import net.dempsy.threading.ThreadingModel;
+import net.dempsy.Infrastructure;
public interface Receiver extends AutoCloseable {
/**
@@ -27,7 +27,7 @@ public interface Receiver extends AutoCloseable {
/**
* A receiver is started with a Listener and a threading model.
*/
- public void start(Listener> listener, ThreadingModel threadingModel) throws MessageTransportException;
+ public void start(Listener> listener, Infrastructure threadingModel) throws MessageTransportException;
/**
* What is a unique Id for the transport that this {@link Receiver} is associated with. This information is used
diff --git a/dempsy-framework.core/src/main/java/net/dempsy/transport/RoutedMessage.java b/dempsy-framework.core/src/main/java/net/dempsy/transport/RoutedMessage.java
new file mode 100644
index 00000000..a00f4765
--- /dev/null
+++ b/dempsy-framework.core/src/main/java/net/dempsy/transport/RoutedMessage.java
@@ -0,0 +1,23 @@
+package net.dempsy.transport;
+
+import java.io.Serializable;
+
+public class RoutedMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public final int[] containers;
+ public final Object key;
+ public final Object message;
+
+ @SuppressWarnings("unused")
+ private RoutedMessage() {
+ containers = null;
+ key = null;
+ message = null;
+ }
+
+ public RoutedMessage(final int[] containers, final Object key, final Object message) {
+ this.containers = containers;
+ this.key = key;
+ this.message = message;
+ }
+}
\ No newline at end of file
diff --git a/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/AbstractTcpReceiver.java b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/AbstractTcpReceiver.java
new file mode 100644
index 00000000..f33dd1b7
--- /dev/null
+++ b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/AbstractTcpReceiver.java
@@ -0,0 +1,58 @@
+package net.dempsy.transport.tcp;
+
+import java.net.InetAddress;
+import java.util.function.Supplier;
+
+import net.dempsy.serialization.Serializer;
+import net.dempsy.transport.Receiver;
+
+public abstract class AbstractTcpReceiver> implements Receiver {
+ public final static int DEFAULT_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024;
+ protected final Serializer serializer;
+
+ protected int internalPort;
+ protected boolean useLocalHost = false;
+ protected TcpAddressResolver resolver = a -> a;
+ protected final String serId;
+ protected int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE_BYTES;
+ protected Supplier addrSupplier = null;
+
+ public AbstractTcpReceiver(final Serializer serializer, final int port) {
+ this.internalPort = port;
+ this.serializer = serializer;
+ this.serId = serializer.getClass().getPackage().getName();
+ }
+
+ public AbstractTcpReceiver(final Serializer serializer) {
+ this(serializer, -1);
+ }
+
+ @SuppressWarnings("unchecked")
+ public T setUseLocalHost(final boolean useLocalHost) {
+ this.useLocalHost = useLocalHost;
+ return (T) this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T setAddressSupplier(final Supplier addrSupplier) {
+ this.addrSupplier = addrSupplier;
+ return (T) this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T setResolver(final TcpAddressResolver resolver) {
+ this.resolver = resolver;
+ return (T) this;
+ }
+
+ public AbstractTcpReceiver setMaxMessageSize(final int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ return this;
+ }
+
+ @Override
+ public abstract TcpAddress getAddress();
+
+ public abstract AbstractTcpReceiver setNumHandlers(int numHandlerThreads);
+
+}
diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpAddress.java b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpAddress.java
similarity index 83%
rename from dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpAddress.java
rename to dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpAddress.java
index 5e39ef36..d3686d44 100644
--- a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpAddress.java
+++ b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpAddress.java
@@ -4,7 +4,7 @@
import net.dempsy.transport.NodeAddress;
-public class TcpAddress implements NodeAddress {
+public abstract class TcpAddress implements NodeAddress {
private static final long serialVersionUID = 1L;
@@ -12,20 +12,22 @@ public class TcpAddress implements NodeAddress {
public final InetAddress inetAddress;
public final int port;
public final String serializerId;
+ public final int recvBufferSize;
- @SuppressWarnings("unused")
- private TcpAddress() {
+ protected TcpAddress() {
guid = null;
inetAddress = null;
port = -1;
serializerId = null;
+ recvBufferSize = -1;
}
- public TcpAddress(final InetAddress inetAddress, final int port, final String serializerId) {
+ public TcpAddress(final InetAddress inetAddress, final int port, final String serializerId, final int recvBufferSize) {
this.inetAddress = inetAddress;
this.port = port;
this.guid = inetAddress.getHostAddress() + ":" + port;
this.serializerId = serializerId;
+ this.recvBufferSize = recvBufferSize;
}
@Override
diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpAddressResolver.java b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpAddressResolver.java
similarity index 69%
rename from dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpAddressResolver.java
rename to dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpAddressResolver.java
index 97c21732..96c799ec 100644
--- a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpAddressResolver.java
+++ b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpAddressResolver.java
@@ -8,7 +8,7 @@
* map internal ports bound to, to external ports.
*/
@FunctionalInterface
-public interface TcpAddressResolver {
+public interface TcpAddressResolver {
- public TcpAddress getExternalAddresses(final TcpAddress addr) throws DempsyException;
+ public T getExternalAddresses(final T addr) throws DempsyException;
}
diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpUtils.java b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpUtils.java
similarity index 59%
rename from dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpUtils.java
rename to dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpUtils.java
index 23851614..26da1a93 100644
--- a/dempsy-framework.impl/src/main/java/net/dempsy/transport/tcp/TcpUtils.java
+++ b/dempsy-framework.core/src/main/java/net/dempsy/transport/tcp/TcpUtils.java
@@ -4,24 +4,33 @@
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.List;
import net.dempsy.DempsyException;
public class TcpUtils {
public static InetAddress getFirstNonLocalhostInetAddress() throws SocketException {
+ final List addrs = getAllInetAddress();
+ return addrs.stream()
+ .filter(i -> !i.isLoopbackAddress() && i instanceof Inet4Address)
+ .findFirst()
+ .orElseThrow(() -> new DempsyException("There are no non-local network interfaces among " + addrs));
+ }
+
+ public static List getAllInetAddress() throws SocketException {
+ final List ret = new ArrayList<>();
final Enumeration netInterfaces = NetworkInterface.getNetworkInterfaces();
while (netInterfaces.hasMoreElements()) {
final NetworkInterface networkInterface = netInterfaces.nextElement();
for (final Enumeration loopInetAddress = networkInterface.getInetAddresses(); loopInetAddress.hasMoreElements();) {
final InetAddress tempInetAddress = loopInetAddress.nextElement();
- if (!tempInetAddress.isLoopbackAddress() && tempInetAddress instanceof Inet4Address)
- return tempInetAddress;
+ ret.add(tempInetAddress);
}
}
- throw new DempsyException("There are no non-local network interfaces among " + Collections.list(netInterfaces));
+ return ret;
}
}
diff --git a/dempsy-framework.impl/pom.xml b/dempsy-framework.impl/pom.xml
index 4f787866..72f99e2c 100644
--- a/dempsy-framework.impl/pom.xml
+++ b/dempsy-framework.impl/pom.xml
@@ -16,10 +16,6 @@
net.dempsy
dempsy-framework.core
-
- io.netty
- netty-all
-
io.dropwizard.metrics
diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/NodeManager.java b/dempsy-framework.impl/src/main/java/net/dempsy/NodeManager.java
index 8306d8c0..adc224f1 100644
--- a/dempsy-framework.impl/src/main/java/net/dempsy/NodeManager.java
+++ b/dempsy-framework.impl/src/main/java/net/dempsy/NodeManager.java
@@ -154,8 +154,7 @@ else if (firstAdaptorClusterName.get() == null)
LOGGER.warn("The node at " + nodeId + " contains no message processors but has a Reciever set. The receiver will never be started.");
threading = tr.track(new DefaultThreadingModel("NodeThreadPool-" + nodeId + "-"))
- .setCoresFactor(1.0).setAdditionalThreads(1).setHardShutdown(true)
- .setMaxNumberOfQueuedLimitedTasks(10000).start();
+ .configure(node.getConfiguration()).start();
final NodeReceiver nodeReciever = receiver == null ? null : tr
.track(new NodeReceiver(containers.stream().map(pc -> pc.container).collect(Collectors.toList()), threading, nodeStatsCollector));
@@ -191,7 +190,7 @@ public boolean execute() {
if (LOGGER.isDebugEnabled())
LOGGER.info(logmessage, e);
else
- LOGGER.info(logmessage);
+ LOGGER.info(logmessage, e);
}
return false;
}
@@ -216,25 +215,27 @@ public String toString() {
this.rsManager = tr.start(new RoutingStrategyManager(), this);
// create the router but don't start it yet.
- this.router = new Router(rsManager, nodeAddress, nodeReciever, tManager, nodeStatsCollector);
+ this.router = new Router(rsManager, nodeAddress, nodeId, nodeReciever, tManager, nodeStatsCollector);
// set up containers
containers.forEach(pc -> pc.container.setDispatcher(router));
- // start containers
- containers.forEach(pc -> tr.start(pc.container, this));
-
- // set up adaptors
- adaptors.values().forEach(a -> a.setDispatcher(router));
-
// IB routing strategy
final int numContainers = containers.size();
for (int i = 0; i < numContainers; i++) {
final PerContainer c = containers.get(i);
c.inboundStrategy.setContainerDetails(c.clusterDefinition.getClusterId(), new ContainerAddress(nodeAddress, i), c.container);
- tr.start(c.inboundStrategy, this);
}
+ // start containers after setting inbound
+ containers.forEach(pc -> tr.start(pc.container.setInbound(pc.inboundStrategy), this));
+
+ // set up adaptors
+ adaptors.values().forEach(a -> a.setDispatcher(router));
+
+ // start IB routing strategy
+ containers.forEach(pc -> tr.start(pc.inboundStrategy, this));
+
// start router
tr.start(this.router, this);
@@ -253,7 +254,7 @@ public boolean execute() {
startAdaptorAfterRouterIsRunning.process();
if (receiver != null)
- tr.track(receiver).start(nodeReciever, threading);
+ tr.track(receiver).start(nodeReciever, this);
tr.track(session); // close this session when we're done.
// =====================================
@@ -279,7 +280,7 @@ public boolean isReady() {
}
@Override
- public void close() throws Exception {
+ public void close() {
stop();
}
@@ -351,6 +352,11 @@ public String getNodeId() {
return nodeId;
}
+ @Override
+ public ThreadingModel getThreadingModel() {
+ return threading;
+ }
+
// Testing accessors
// ==============================================================================
diff --git a/dempsy-framework.impl/src/main/java/net/dempsy/NodeReceiver.java b/dempsy-framework.impl/src/main/java/net/dempsy/NodeReceiver.java
index e99b3fbb..4e55d1dc 100644
--- a/dempsy-framework.impl/src/main/java/net/dempsy/NodeReceiver.java
+++ b/dempsy-framework.impl/src/main/java/net/dempsy/NodeReceiver.java
@@ -2,14 +2,15 @@
import java.util.Arrays;
import java.util.List;
+import java.util.function.Supplier;
-import net.dempsy.Router.RoutedMessage;
import net.dempsy.container.Container;
import net.dempsy.messages.KeyedMessage;
import net.dempsy.monitoring.NodeStatsCollector;
import net.dempsy.threading.ThreadingModel;
import net.dempsy.transport.Listener;
import net.dempsy.transport.MessageTransportException;
+import net.dempsy.transport.RoutedMessage;
public class NodeReceiver implements Listener {
@@ -25,18 +26,35 @@ public NodeReceiver(final List nodeContainers, final ThreadingModel t
@Override
public boolean onMessage(final RoutedMessage message) throws MessageTransportException {
- // TODO: consider if blocking should be configurable by cluster? node? etc.
statsCollector.messageReceived(message);
feedbackLoop(message);
return true;
}
+ @Override
+ public boolean onMessage(final Supplier supplier) {
+ statsCollector.messageReceived(supplier);
+ threadModel.submitLimited(new ThreadingModel.Rejectable
-
+
+
net.dempsy
dempsy-commons-bom
- 2.1.1
+ ${dempsy-commons-bom.version}
pom
import
@@ -102,11 +98,6 @@
slf4j-api
1.6.4
-
- io.netty
- netty-all
- 4.1.9.Final
-
org.quartz-scheduler
quartz