Permalink
Browse files

Merge pull request #90 from jimfcarroll/master

Allow user to specify network interface name when configuring an NioReceiver
  • Loading branch information...
jimfcarroll committed Mar 5, 2018
2 parents bac3527 + 68a4120 commit f98d4840950ef89508aadc01c300f90c920d2a68
@@ -5,7 +5,7 @@
<parent>
<groupId>net.dempsy</groupId>
<artifactId>dempsy-framework.parent</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.9.1-SNAPSHOT</version>
</parent>
<artifactId>dempsy-framework.api</artifactId>
@@ -5,7 +5,7 @@
<parent>
<groupId>net.dempsy</groupId>
<artifactId>dempsy-framework.parent</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.9.1-SNAPSHOT</version>
</parent>
<artifactId>dempsy-framework.core</artifactId>
@@ -22,17 +22,16 @@
/**
* What address can a Sender use to send messages to this receiver. This is called PRIOR to start
*/
- public NodeAddress getAddress();
+ public NodeAddress getAddress(Infrastructure infra);
/**
* A receiver is started with a Listener and a threading model.
*/
public void start(Listener<?> listener, Infrastructure threadingModel) throws MessageTransportException;
/**
- * What is a unique Id for the transport that this {@link Receiver} is associated with. This information is used
- * by the TransportManager to look up a {@link SenderFactory} that's compatible with this {@link Receiver}. The default
- * behavior for this method is to provide the package name of the implementing class
+ * What is a unique Id for the transport that this {@link Receiver} is associated with. This information is used by the TransportManager to look up a {@link SenderFactory} that's compatible with this
+ * {@link Receiver}. The default behavior for this method is to provide the package name of the implementing class
*/
public default String transportTypeId() {
return this.getClass().getPackage().getName();
@@ -3,6 +3,7 @@
import java.net.InetAddress;
import java.util.function.Supplier;
+import net.dempsy.Infrastructure;
import net.dempsy.serialization.Serializer;
import net.dempsy.transport.Receiver;
@@ -51,7 +52,7 @@ public T setResolver(final TcpAddressResolver<A> resolver) {
}
@Override
- public abstract TcpAddress getAddress();
+ public abstract TcpAddress getAddress(Infrastructure infra);
public abstract AbstractTcpReceiver<A, T> setNumHandlers(int numHandlerThreads);
@@ -5,24 +5,34 @@
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
+import java.util.Vector;
import net.dempsy.DempsyException;
public class TcpUtils {
public static InetAddress getFirstNonLocalhostInetAddress() throws SocketException {
- final List<InetAddress> addrs = getAllInetAddress();
+ return getFirstNonLocalhostInetAddress(null);
+ }
+
+ public static InetAddress getFirstNonLocalhostInetAddress(final String interfaceName) throws SocketException {
+ final List<InetAddress> addrs = getAllInetAddress(interfaceName);
return addrs.stream()
.filter(i -> !i.isLoopbackAddress() && i instanceof Inet4Address)
.findFirst()
.orElseThrow(() -> new DempsyException("There are no non-local network interfaces among " + addrs));
}
public static List<InetAddress> getAllInetAddress() throws SocketException {
+ return getAllInetAddress(null);
+ }
+
+ public static List<InetAddress> getAllInetAddress(final String interfaceName) throws SocketException {
final List<InetAddress> ret = new ArrayList<>();
- final Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
+ final Enumeration<NetworkInterface> netInterfaces = getInterfaces(interfaceName);
while (netInterfaces.hasMoreElements()) {
final NetworkInterface networkInterface = netInterfaces.nextElement();
for (final Enumeration<InetAddress> loopInetAddress = networkInterface.getInetAddresses(); loopInetAddress.hasMoreElements();) {
@@ -33,4 +43,10 @@ public static InetAddress getFirstNonLocalhostInetAddress() throws SocketExcepti
return ret;
}
+ static Enumeration<NetworkInterface> getInterfaces(final String interfaceName) throws SocketException {
+ return interfaceName != null
+ ? new Vector<NetworkInterface>(Arrays.asList(NetworkInterface.getByName(interfaceName))).elements()
+ : NetworkInterface.getNetworkInterfaces();
+
+ }
}
@@ -0,0 +1,62 @@
+package net.dempsy.transport.tcp;
+
+import static net.dempsy.util.Functional.uncheck;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+public class TcpUtilsTest {
+
+ @Test
+ public void testGetFirstNonLocalhostInetAddress() throws Exception {
+ final InetAddress addr = TcpUtils.getFirstNonLocalhostInetAddress();
+ assertNotNull(addr);
+ assertFalse(addr.isLoopbackAddress());
+ }
+
+ @Test
+ public void testGetFirstNonLocalhostInetAddressNamed() throws Exception {
+ final List<NetworkInterface> ifs = Collections.list(TcpUtils.getInterfaces(null)).stream()
+ .filter(nif -> !uncheck(() -> nif.isLoopback()))
+ .collect(Collectors.toList());
+
+ final NetworkInterface nif;
+ final NetworkInterface notNif;
+
+ if (ifs.size() > 1) {
+ nif = ifs.get(1);
+ notNif = ifs.get(0);
+ } else if (ifs.size() == 1) {
+ nif = ifs.get(0);
+ notNif = null;
+ } else {
+ nif = null;
+ notNif = null;
+ }
+
+ if (nif != null) { // otherwise we can do no testing.
+ final String name = nif.getDisplayName();
+ final List<InetAddress> expectedAddrs = Collections.list(nif.getInetAddresses());
+ if (expectedAddrs.size() > 0) { // otherwise, we still can't really do anything without a lot of work
+ final InetAddress expected = expectedAddrs.get(0);
+ final InetAddress addr = TcpUtils.getFirstNonLocalhostInetAddress(name);
+ assertEquals(expected, addr);
+
+ if (notNif != null) {
+ final List<InetAddress> noneOfThese = Collections.list(notNif.getInetAddresses());
+ assertFalse(noneOfThese.contains(addr));
+ }
+ } else
+ System.out.println("Can't test TcpUtils.getFirstNonLocalhostInetAddress(name)");
+ } else
+ System.out.println("Can't test TcpUtils.getFirstNonLocalhostInetAddress(name)");
+ }
+}
@@ -5,7 +5,7 @@
<parent>
<groupId>net.dempsy</groupId>
<artifactId>dempsy-framework.parent</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.9.1-SNAPSHOT</version>
</parent>
<artifactId>dempsy-framework.impl</artifactId>
@@ -167,13 +167,13 @@ public NodeManager start() throws DempsyException {
// it we're all adaptor then don't bother to get the receiver.
if (containers.size() == 0) {
- // here there's no point in a reciever since there's nothing to recieve.
+ // here there's no point in a receiver since there's nothing to receive.
if (firstAdaptorClusterName.get() == null)
throw new IllegalStateException("There seems to be no clusters or adaptors defined for this node \"" + node.toString() + "\"");
} else {
receiver = (Receiver) node.getReceiver();
if (receiver != null) // otherwise we're all adaptor
- nodeAddress = receiver.getAddress();
+ nodeAddress = receiver.getAddress(this);
else if (firstAdaptorClusterName.get() == null)
throw new IllegalStateException("There seems to be no clusters or adaptors defined for this node \"" + node.toString() + "\"");
}
@@ -193,8 +193,10 @@ else if (firstAdaptorClusterName.get() == null)
nodeStatsCollector.setMessagesPendingGauge(() -> threading.getNumberLimitedPending());
- final NodeReceiver nodeReciever = receiver == null ? null : tr
- .track(new NodeReceiver(containers.stream().map(pc -> pc.container).collect(Collectors.toList()), threading, nodeStatsCollector));
+ final NodeReceiver nodeReciever = receiver == null ? null
+ : tr
+ .track(new NodeReceiver(containers.stream().map(pc -> pc.container).collect(Collectors.toList()), threading,
+ nodeStatsCollector));
final Map<ClusterId, ClusterInformation> messageTypesByClusterId = new HashMap<>();
containers.stream().map(pc -> pc.clusterDefinition).forEach(c -> {
@@ -45,7 +45,8 @@
private final AtomicBoolean isReady = new AtomicBoolean(false);
private final NodeStatsCollector statsCollector;
- public OutgoingDispatcher(final RoutingStrategyManager manager, final NodeAddress thisNode, final String thisNodeId, final NodeReceiver nodeReciever,
+ public OutgoingDispatcher(final RoutingStrategyManager manager, final NodeAddress thisNode, final String thisNodeId,
+ final NodeReceiver nodeReciever,
final TransportManager tmanager, final NodeStatsCollector statsCollector) {
this.manager = manager;
this.thisNode = thisNode;
@@ -171,7 +172,7 @@ public boolean execute() {
}
// see if node info is dupped.
- if (alreadySeen.contains(ni.nodeAddress)) {
+ if (alreadySeen.contains(ni)) {
LOGGER.warn("The node " + ni.nodeAddress + " seems to be registed more than once.");
continue;
}
@@ -146,7 +146,7 @@ public void close() {
}
@Override
- public NodeAddress getAddress() {
+ public NodeAddress getAddress(final Infrastructure infra) {
return this.address;
}
@@ -24,7 +24,7 @@ public PassthroughReceiver() {
}
@Override
- public NodeAddress getAddress() throws MessageTransportException {
+ public NodeAddress getAddress(final Infrastructure infra) throws MessageTransportException {
return destination;
}
@@ -46,7 +46,7 @@ public void start(final Infrastructure infra) {
}
synchronized void imDone(final PassthroughSender sender) {
- senders.remove(sender.reciever.getAddress());
+ senders.remove(sender.reciever.getAddress(null /* Infrastructure is ignored in PassthroughReceiver */ ));
}
}
@@ -39,6 +39,9 @@
public class NioReceiver<T> extends AbstractTcpReceiver<NioAddress, NioReceiver<T>> implements DisruptableRecevier {
private static Logger LOGGER = LoggerFactory.getLogger(NioReceiver.class);
+ public static final String CONFIG_KEY_RECEIVER_NETWORK_IF_NAME = "reciever_network_if";
+ public static final String DEFAULT_RECEIVER_NETWORK_IF_NAME = null;
+
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private NioAddress internal = null;
@@ -71,11 +74,27 @@ public void close() {
}
@Override
- public synchronized NioAddress getAddress() {
+ public synchronized NioAddress getAddress(final Infrastructure infra) {
if (internal == null) {
+ final String ifNameToGetAddrFrom = infra.getConfigValue(NioReceiver.class, CONFIG_KEY_RECEIVER_NETWORK_IF_NAME,
+ DEFAULT_RECEIVER_NETWORK_IF_NAME);
+
+ if (useLocalHost) {
+ if (ifNameToGetAddrFrom != null)
+ LOGGER.warn("Both \"useLocalHost\" as well as the property " + CONFIG_KEY_RECEIVER_NETWORK_IF_NAME + " for "
+ + NioReceiver.class.getPackage().getName() + ". The property will be ignored.");
+ if (addrSupplier != null)
+ LOGGER.warn("Both IP address supplier (" + addrSupplier.getClass().getName()
+ + ") as well as \"useLocalHost\" was set. The address supplier will be ignored.");
+ } else {
+ if (addrSupplier != null && ifNameToGetAddrFrom != null)
+ LOGGER.warn("Both IP Address supplier (" + addrSupplier.getClass().getName() + ") as well as the property "
+ + CONFIG_KEY_RECEIVER_NETWORK_IF_NAME + " for " + NioReceiver.class.getPackage().getName()
+ + ". The property will be ignored.");
+ }
try {
final InetAddress addr = useLocalHost ? Inet4Address.getLocalHost()
- : (addrSupplier == null ? TcpUtils.getFirstNonLocalhostInetAddress() : addrSupplier.get());
+ : (addrSupplier == null ? TcpUtils.getFirstNonLocalhostInetAddress(ifNameToGetAddrFrom) : addrSupplier.get());
binding = new Binding(addr, internalPort);
final InetSocketAddress inetSocketAddress = binding.bound;
internalPort = inetSocketAddress.getPort();
@@ -95,7 +114,7 @@ public void start(final Listener<?> listener, final Infrastructure infra) throws
throw new IllegalStateException("Cannot restart an " + NioReceiver.class.getSimpleName());
if (binding == null)
- getAddress(); // sets binding via side affect.
+ getAddress(infra); // sets binding via side affect.
// before starting the acceptor, make sure we have Readers created.
try {
@@ -257,8 +276,8 @@ private Client(final NioAddress thisNode, final Listener<T> listener, final Seri
/**
* Read the size
- * @return -1 if there aren't enough bytes read in to figure out the size. -2 if the
- * socket channel reached it's eof. Otherwise, the size actually read.
+ *
+ * @return -1 if there aren't enough bytes read in to figure out the size. -2 if the socket channel reached it's eof. Otherwise, the size actually read.
*/
private final int readSize(final SocketChannel channel, final ByteBuffer bb) throws IOException {
final int size;
@@ -51,7 +51,7 @@ public void testBlockingQueue() throws Exception {
final TestInfrastructure infra = new TestInfrastructure(new DefaultThreadingModel("BQTest-testBlockingQueue-"));
final TransportManager tranMan = chain(new TransportManager(), c -> c.start(infra));
SenderFactory sf = tranMan.getAssociatedInstance(transportTypeId);) {
- final Sender sender = sf.getSender(r.getAddress());
+ final Sender sender = sf.getSender(r.getAddress(infra));
r.start((final String msg) -> {
message.set(new String(msg));
return true;
@@ -75,7 +75,7 @@ public void testBlockingQueueOverflow() throws Throwable {
final Receiver r = new BlockingQueueReceiver(input);
final TransportManager tranMan = chain(new TransportManager(), c -> c.start(infra));
final SenderFactory sf = tranMan.getAssociatedInstance(transportTypeId);) {
- final Sender sender = sf.getSender(r.getAddress());
+ final Sender sender = sf.getSender(r.getAddress(infra));
final AtomicBoolean finallySent = new AtomicBoolean(false);
final AtomicLong receiveCount = new AtomicLong();
Oops, something went wrong.

0 comments on commit f98d484

Please sign in to comment.