From 90f7e13790e6a2f6b674c8de59cb9cf35a1a1833 Mon Sep 17 00:00:00 2001 From: nicholasmhughes Date: Fri, 20 Jul 2018 18:09:49 -0400 Subject: [PATCH 1/2] NIFI-5445 changing property for listen processors to allow binding to an IP address instead of an interface --- .../AbstractListenEventBatchingProcessor.java | 4 +-- .../listen/AbstractListenEventProcessor.java | 13 ++++++---- .../util/listen/ListenerProperties.java | 26 ++++++++++++------- .../processors/standard/ListenSyslog.java | 6 ++--- .../processors/standard/ListenTCPRecord.java | 6 ++--- 5 files changed, 32 insertions(+), 23 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java index c83fdae67e86..1543ed37220b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.processor.util.listen; -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; +import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -74,7 +74,7 @@ public abstract class AbstractListenEventBatchingProcessor exte @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); - descriptors.add(NETWORK_INTF_NAME); + descriptors.add(LOCAL_IP_ADDRESS); descriptors.add(PORT); descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_MESSAGE_QUEUE_SIZE); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index 20df8687d01a..ff6054275445 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.processor.util.listen; -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; +import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -36,6 +36,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.NetworkInterface; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; @@ -131,7 +132,7 @@ public abstract class AbstractListenEventProcessor extends Abst @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); - descriptors.add(NETWORK_INTF_NAME); + descriptors.add(LOCAL_IP_ADDRESS); descriptors.add(PORT); descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_MESSAGE_QUEUE_SIZE); @@ -180,13 +181,15 @@ public void onScheduled(final ProcessContext context) throws IOException { port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger()); - final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final String nicIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); InetAddress nicIPAddress = null; if (!StringUtils.isEmpty(nicIPAddressStr)) { - NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr); - nicIPAddress = netIF.getInetAddresses().nextElement(); + try { + nicIPAddress = InetAddress.getByName(nicIPAddressStr); + } catch (UnknownHostException e) { + } } // create the dispatcher and call open() to bind to the given port diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java index 128a9d9e3cda..9f5daf635112 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java @@ -25,6 +25,7 @@ import java.net.NetworkInterface; import java.net.SocketException; +import java.net.InetAddress; import java.util.Enumeration; import java.util.HashSet; import java.util.Set; @@ -34,28 +35,33 @@ */ public class ListenerProperties { - private static final Set interfaceSet = new HashSet<>(); + private static final Set ipSet = new HashSet<>(); static { try { final Enumeration interfaceEnum = NetworkInterface.getNetworkInterfaces(); while (interfaceEnum.hasMoreElements()) { final NetworkInterface ifc = interfaceEnum.nextElement(); - interfaceSet.add(ifc.getName()); + + final Enumeration ipEnum = ifc.getInetAddresses(); + while (ipEnum.hasMoreElements()) { + final InetAddress ip = ipEnum.nextElement(); + ipSet.add(ip.getHostAddress()); + } } } catch (SocketException e) { } } - public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder() - .name("Local Network Interface") - .description("The name of a local network interface to be used to restrict listening to a specific LAN.") + public static final PropertyDescriptor LOCAL_IP_ADDRESS = new PropertyDescriptor.Builder() + .name("Local IP Address") + .description("The IP address of a local network interface to be used to restrict listening to a specific LAN.") .addValidator(new Validator() { @Override public ValidationResult validate(String subject, String input, ValidationContext context) { ValidationResult result = new ValidationResult.Builder() - .subject("Local Network Interface").valid(true).input(input).build(); - if (interfaceSet.contains(input.toLowerCase())) { + .subject("Local IP Address").valid(true).input(input).build(); + if (ipSet.contains(input.toLowerCase())) { return result; } @@ -67,16 +73,16 @@ public ValidationResult validate(String subject, String input, ValidationContext realValue = ae.evaluate(); } - if (interfaceSet.contains(realValue.toLowerCase())) { + if (ipSet.contains(realValue.toLowerCase())) { return result; } - message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString(); + message = realValue + " is not a valid IP address. Valid addresses are " + ipSet.toString(); } catch (IllegalArgumentException e) { message = "Not a valid AttributeExpression: " + e.getMessage(); } - result = new ValidationResult.Builder().subject("Local Network Interface") + result = new ValidationResult.Builder().subject("Local IP Address") .valid(false).input(input).explanation(message).build(); return result; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 0509e0c8f82d..400774d9563f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -76,7 +76,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; +import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @@ -213,7 +213,7 @@ protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); descriptors.add(PROTOCOL); descriptors.add(PORT); - descriptors.add(NETWORK_INTF_NAME); + descriptors.add(LOCAL_IP_ADDRESS); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(CLIENT_AUTH); descriptors.add(RECV_BUFFER_SIZE); @@ -292,7 +292,7 @@ public void onScheduled(final ProcessContext context) throws IOException { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger(); final String protocol = context.getProperty(PROTOCOL).getValue(); - final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final String nicIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); final String charSet = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charSet)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java index 738c3e2ede91..890d89d22c42 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java @@ -74,7 +74,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; +import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @@ -204,7 +204,7 @@ public class ListenTCPRecord extends AbstractProcessor { static final List PROPERTIES; static { final List props = new ArrayList<>(); - props.add(ListenerProperties.NETWORK_INTF_NAME); + props.add(ListenerProperties.LOCAL_IP_ADDRESS); props.add(PORT); props.add(MAX_SOCKET_BUFFER_SIZE); props.add(MAX_CONNECTIONS); @@ -268,7 +268,7 @@ public void onScheduled(final ProcessContext context) throws IOException { // if the Network Interface Property wasn't provided then a null InetAddress will indicate to bind to all interfaces final InetAddress nicAddress; - final String nicAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final String nicAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); if (!StringUtils.isEmpty(nicAddressStr)) { NetworkInterface netIF = NetworkInterface.getByName(nicAddressStr); nicAddress = netIF.getInetAddresses().nextElement(); From a209bd6832f113cbed9f6933b45f4502f27d9146 Mon Sep 17 00:00:00 2001 From: nicholasmhughes Date: Mon, 17 Sep 2018 21:52:43 -0400 Subject: [PATCH 2/2] NIFI-5445 adding property for listen processors to allow binding to an IP address --- .../AbstractListenEventBatchingProcessor.java | 2 + .../listen/AbstractListenEventProcessor.java | 54 +++++++++++++++++-- .../util/listen/ListenerProperties.java | 40 ++++++++++++++ .../processors/standard/ListenSyslog.java | 11 +++- .../processors/standard/ListenTCPRecord.java | 11 +++- 5 files changed, 112 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java index 1543ed37220b..2b0399041851 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processor.util.listen; +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -74,6 +75,7 @@ public abstract class AbstractListenEventBatchingProcessor exte @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); + descriptors.add(NETWORK_INTF_NAME); descriptors.add(LOCAL_IP_ADDRESS); descriptors.add(PORT); descriptors.add(RECV_BUFFER_SIZE); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index ff6054275445..6663551a9284 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -16,12 +16,15 @@ */ package org.apache.nifi.processor.util.listen; +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -40,6 +43,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -132,6 +136,7 @@ public abstract class AbstractListenEventProcessor extends Abst @Override protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); + descriptors.add(NETWORK_INTF_NAME); descriptors.add(LOCAL_IP_ADDRESS); descriptors.add(PORT); descriptors.add(RECV_BUFFER_SIZE); @@ -175,19 +180,60 @@ public final List getSupportedPropertyDescriptors() { return descriptors; } + /** + * In order to add custom validation at sub-classes, implement {@link #customValidate(ValidationContext, Collection)} method. + */ + @Override + protected Collection customValidate(ValidationContext context) { + final Collection results = new ArrayList<>(); + + String netIntfNameStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + String nicIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); + + if (!StringUtils.isEmpty(netIntfNameStr) && !StringUtils.isEmpty(nicIPAddressStr)) { + results.add(new ValidationResult.Builder() + .subject(NETWORK_INTF_NAME.getDisplayName()) + .explanation(NETWORK_INTF_NAME.getDisplayName() + " cannot be provided when selecting " + LOCAL_IP_ADDRESS.getDisplayName()) + .valid(false) + .build()); + results.add(new ValidationResult.Builder() + .subject(LOCAL_IP_ADDRESS.getDisplayName()) + .explanation(LOCAL_IP_ADDRESS.getDisplayName() + " cannot be provided when selecting " + NETWORK_INTF_NAME.getDisplayName()) + .valid(false) + .build()); + } + + customValidate(context, results); + return results; + } + + + /** + * Sub-classes can add custom validation by implementing this method. + * @param validationContext the validation context + * @param validationResults add custom validation result to this collection + */ + protected void customValidate(ValidationContext validationContext, Collection validationResults) { + + } + @OnScheduled public void onScheduled(final ProcessContext context) throws IOException { charset = Charset.forName(context.getProperty(CHARSET).getValue()); port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger()); - final String nicIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); - + final String netIntfNameStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final String lclIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); + String nicIPAddressStr = new String(); InetAddress nicIPAddress = null; - if (!StringUtils.isEmpty(nicIPAddressStr)) { + if (!StringUtils.isEmpty(netIntfNameStr)) { + NetworkInterface netIF = NetworkInterface.getByName(netIntfNameStr); + nicIPAddress = netIF.getInetAddresses().nextElement(); + } else if (!StringUtils.isEmpty(lclIPAddressStr)) { try { - nicIPAddress = InetAddress.getByName(nicIPAddressStr); + nicIPAddress = InetAddress.getByName(lclIPAddressStr); } catch (UnknownHostException e) { } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java index 9f5daf635112..4d0c5f1fefd1 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java @@ -35,6 +35,7 @@ */ public class ListenerProperties { + private static final Set interfaceSet = new HashSet<>(); private static final Set ipSet = new HashSet<>(); static { @@ -42,6 +43,7 @@ public class ListenerProperties { final Enumeration interfaceEnum = NetworkInterface.getNetworkInterfaces(); while (interfaceEnum.hasMoreElements()) { final NetworkInterface ifc = interfaceEnum.nextElement(); + interfaceSet.add(ifc.getName()); final Enumeration ipEnum = ifc.getInetAddresses(); while (ipEnum.hasMoreElements()) { @@ -53,6 +55,44 @@ public class ListenerProperties { } } + public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder() + .name("Local Network Interface") + .description("The name of a local network interface to be used to restrict listening to a specific LAN.") + .addValidator(new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = new ValidationResult.Builder() + .subject("Local Network Interface").valid(true).input(input).build(); + if (interfaceSet.contains(input.toLowerCase())) { + return result; + } + + String message; + String realValue = input; + try { + if (context.isExpressionLanguagePresent(input)) { + AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input); + realValue = ae.evaluate(); + } + + if (interfaceSet.contains(realValue.toLowerCase())) { + return result; + } + + message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString(); + + } catch (IllegalArgumentException e) { + message = "Not a valid AttributeExpression: " + e.getMessage(); + } + result = new ValidationResult.Builder().subject("Local Network Interface") + .valid(false).input(input).explanation(message).build(); + + return result; + } + }) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + public static final PropertyDescriptor LOCAL_IP_ADDRESS = new PropertyDescriptor.Builder() .name("Local IP Address") .description("The IP address of a local network interface to be used to restrict listening to a specific LAN.") diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 400774d9563f..0058edf40a17 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -76,6 +76,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; @SupportsBatching @@ -213,6 +214,7 @@ protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); descriptors.add(PROTOCOL); descriptors.add(PORT); + descriptors.add(NETWORK_INTF_NAME); descriptors.add(LOCAL_IP_ADDRESS); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(CLIENT_AUTH); @@ -292,7 +294,14 @@ public void onScheduled(final ProcessContext context) throws IOException { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final int maxMessageQueueSize = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger(); final String protocol = context.getProperty(PROTOCOL).getValue(); - final String nicIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); + final String netIntfNameStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final String lclIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); + String nicIPAddressStr = new String(); + if (!StringUtils.isEmpty(netIntfNameStr)) { + nicIPAddressStr = netIntfNameStr; + } else { + nicIPAddressStr = lclIPAddressStr; + } final String charSet = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charSet)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java index 890d89d22c42..feecc5762047 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java @@ -74,6 +74,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; import static org.apache.nifi.processor.util.listen.ListenerProperties.LOCAL_IP_ADDRESS; @SupportsBatching @@ -204,6 +205,7 @@ public class ListenTCPRecord extends AbstractProcessor { static final List PROPERTIES; static { final List props = new ArrayList<>(); + props.add(ListenerProperties.NETWORK_INTF_NAME); props.add(ListenerProperties.LOCAL_IP_ADDRESS); props.add(PORT); props.add(MAX_SOCKET_BUFFER_SIZE); @@ -268,7 +270,14 @@ public void onScheduled(final ProcessContext context) throws IOException { // if the Network Interface Property wasn't provided then a null InetAddress will indicate to bind to all interfaces final InetAddress nicAddress; - final String nicAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); + final String netIntfNameStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final String lclIPAddressStr = context.getProperty(LOCAL_IP_ADDRESS).evaluateAttributeExpressions().getValue(); + String nicAddressStr = new String(); + if (!StringUtils.isEmpty(netIntfNameStr)) { + nicAddressStr = netIntfNameStr; + } else { + nicAddressStr = lclIPAddressStr; + } if (!StringUtils.isEmpty(nicAddressStr)) { NetworkInterface netIF = NetworkInterface.getByName(nicAddressStr); nicAddress = netIF.getInetAddresses().nextElement();