From 09c5b22c6aeb726f6d8cbb8945c4e1126bc27b68 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 1 Mar 2017 13:30:04 -0500 Subject: [PATCH 1/5] NIFI-3541: Add local network interface capability to site-to-site client and remote group and ports --- .../nifi/remote/client/SiteToSiteClient.java | 40 +++++- .../remote/client/SiteToSiteClientConfig.java | 7 + .../remote/util/SiteToSiteRestApiClient.java | 109 +++++++++------- .../apache/nifi/controller/AbstractPort.java | 32 ++--- .../nifi/groups/RemoteProcessGroup.java | 24 ++++ .../apache/nifi/remote/RemoteGroupPort.java | 1 + .../remote/StandardRemoteProcessGroup.java | 122 ++++++++++++++---- .../nifi/remote/StandardRemoteGroupPort.java | 27 +++- .../remote/TestStandardRemoteGroupPort.java | 1 + .../apache/nifi/web/api/dto/DtoFactory.java | 66 ++++++---- 10 files changed, 303 insertions(+), 126 deletions(-) diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 3d7baccf2cd9..daff70d6625f 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.net.InetAddress; import java.security.KeyStore; import java.security.SecureRandom; import java.util.LinkedHashSet; @@ -168,6 +169,7 @@ public static class Builder implements Serializable { private int batchCount; private long batchSize; private long batchNanos; + private InetAddress localAddress; private SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW; private HttpProxy httpProxy; @@ -198,6 +200,7 @@ public Builder fromConfig(final SiteToSiteClientConfig config) { this.batchCount = config.getPreferredBatchCount(); this.batchSize = config.getPreferredBatchSize(); this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + this.localAddress = config.getLocalAddress(); this.httpProxy = config.getHttpProxy(); return this; @@ -223,12 +226,31 @@ public Builder url(final String url) { } /** - *

Specifies the URLs of the remote NiFi instance.

- *

If this URL points to a NiFi node in a NiFi cluster, data transfer to and from - * nodes will be automatically load balanced across the different nodes.

+ *

+ * Specifies the local address to use when communicating with the remote NiFi instance. + *

+ * + * @param localAddress the local address to use, or null to use anyLocal address. + * @return the builder + */ + public Builder localAddress(final InetAddress localAddress) { + this.localAddress = localAddress; + return this; + } + + /** + *

+ * Specifies the URLs of the remote NiFi instance. + *

+ *

+ * If this URL points to a NiFi node in a NiFi cluster, data transfer to and from + * nodes will be automatically load balanced across the different nodes. + *

* - *

Multiple urls provide better connectivity with a NiFi cluster, able to connect - * to the target cluster at long as one of the specified urls is accessible.

+ *

+ * Multiple urls provide better connectivity with a NiFi cluster, able to connect + * to the target cluster at long as one of the specified urls is accessible. + *

* * @param urls urls of remote instance * @return the builder @@ -717,6 +739,7 @@ class StandardSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializ private final long batchSize; private final long batchNanos; private final HttpProxy httpProxy; + private final InetAddress localAddress; // some serialization frameworks require a default constructor private StandardSiteToSiteClientConfig() { @@ -740,6 +763,7 @@ private StandardSiteToSiteClientConfig() { this.batchNanos = 0; this.transportProtocol = null; this.httpProxy = null; + this.localAddress = null; } private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { @@ -766,6 +790,7 @@ private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { this.batchNanos = builder.batchNanos; this.transportProtocol = builder.getTransportProtocol(); this.httpProxy = builder.getHttpProxy(); + this.localAddress = builder.localAddress; } @Override @@ -931,5 +956,10 @@ public SiteToSiteTransportProtocol getTransportProtocol() { public HttpProxy getHttpProxy() { return httpProxy; } + + @Override + public InetAddress getLocalAddress() { + return localAddress; + } } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 5bdeee45f09b..83e832847180 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -18,6 +18,7 @@ import java.io.File; import java.io.Serializable; +import java.net.InetAddress; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -36,6 +37,7 @@ public interface SiteToSiteClientConfig extends Serializable { * for backward compatibility for implementations that does not expect multiple URLs. * {@link #getUrls()} should be used instead then should support multiple URLs when making requests. */ + @Deprecated String getUrl(); /** @@ -171,4 +173,9 @@ public interface SiteToSiteClientConfig extends Serializable { */ HttpProxy getHttpProxy(); + /** + * @return the InetAddress to bind to for the local address when creating a socket, or + * {@code null} to bind to the {@code anyLocal} address. + */ + InetAddress getLocalAddress(); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 89da6a0f9d84..e6777b0eb258 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -16,6 +16,55 @@ */ package org.apache.nifi.remote.util; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; + import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; @@ -87,53 +136,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.security.cert.Certificate; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Predicate; -import java.util.regex.Pattern; - -import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; - public class SiteToSiteRestApiClient implements Closeable { private static final String EVENT_CATEGORY = "Site-to-Site"; @@ -160,6 +162,7 @@ public class SiteToSiteRestApiClient implements Closeable { private CloseableHttpAsyncClient httpAsyncClient; private boolean compress = false; + private InetAddress localAddress = null; private long requestExpirationMillis = 0; private int serverTransactionTtl = 0; private int batchCount = 0; @@ -239,6 +242,10 @@ private void setupRequestConfig() { .setConnectTimeout(connectTimeoutMillis) .setSocketTimeout(readTimeoutMillis); + if (localAddress != null) { + requestConfigBuilder.setLocalAddress(localAddress); + } + if (proxy != null) { requestConfigBuilder.setProxy(proxy.getHttpHost()); } @@ -916,6 +923,8 @@ private void startExtendingTtl(final String transactionUrl, final Closeable stre extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator; extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis; extendingApiClient.readTimeoutMillis = this.readTimeoutMillis; + extendingApiClient.localAddress = this.localAddress; + final int extendFrequency = serverTransactionTtl / 2; ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> { @@ -1197,10 +1206,12 @@ public void setBaseUrl(final String baseUrl) { public void setConnectTimeoutMillis(final int connectTimeoutMillis) { this.connectTimeoutMillis = connectTimeoutMillis; + setupRequestConfig(); } public void setReadTimeoutMillis(final int readTimeoutMillis) { this.readTimeoutMillis = readTimeoutMillis; + setupRequestConfig(); } public static String getFirstUrl(final String clusterUrlStr) { @@ -1336,6 +1347,10 @@ private void setBaseUrl(final String scheme, final String host, final int port, public void setCompress(final boolean compress) { this.compress = compress; } + + public void setLocalAddress(final InetAddress localAddress) { + this.localAddress = localAddress; + } public void setRequestExpirationMillis(final long requestExpirationMillis) { if (requestExpirationMillis < 0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java index 1177dad1af86..4d061b872f30 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -16,6 +16,22 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.nifi.authorization.Resource; @@ -36,22 +52,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.FormatUtils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; - public abstract class AbstractPort implements Port { public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder() diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 64e2ca098904..cb1e6c874857 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -17,18 +17,22 @@ package org.apache.nifi.groups; import org.apache.nifi.authorization.resource.ComponentAuthorizable; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import java.net.InetAddress; +import java.util.Collection; import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable { + @Override String getIdentifier(); String getTargetUri(); @@ -154,6 +158,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable */ String getAuthorizationIssue(); + /** + * Validates the current configuration, returning ValidationResults for any + * invalid configuration parameter. + * + * @return Collection of validation result objects for any invalid findings + * only. If the collection is empty then the component is valid. Guaranteed + * non-null + */ + Collection validate(); + /** * @return the {@link EventReporter} that can be used to report any notable * events @@ -180,6 +194,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable void setProxyPassword(String proxyPassword); + void setNetworkInterface(String interfaceName); + + String getNetworkInterface(); + + /** + * Returns the InetAddress that the will this instance will bind to when communicating with a + * remote NiFi instance, or null if no specific address has been specified + */ + InetAddress getLocalAddress(); + /** * Initiates a task in the remote process group to re-initialize, as a * result of clustering changes diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index 8cad10343418..f8f4b20922cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -33,6 +33,7 @@ public RemoteGroupPort(String id, String name, ProcessGroup processGroup, Connec public abstract TransferDirection getTransferDirection(); + @Override public abstract boolean isUseCompression(); public abstract void setUseCompression(boolean useCompression); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 855dab754932..67c8f11839f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -18,14 +18,15 @@ import static java.util.Objects.requireNonNull; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.ClientResponse.Status; -import com.sun.jersey.api.client.UniformInterfaceException; import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Date; +import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -40,12 +41,15 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; + import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; @@ -74,6 +78,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.UniformInterfaceException; + /** * Represents the Root Process Group of a remote NiFi Instance. Holds * information about that remote instance, as well as {@link IncomingPort}s and @@ -99,7 +108,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final AtomicReference comments = new AtomicReference<>(); private final AtomicReference processGroup; private final AtomicBoolean transmitting = new AtomicBoolean(false); - private final FlowController flowController; private final SSLContext sslContext; private volatile String communicationsTimeout = "30 sec"; @@ -111,6 +119,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private volatile String proxyUser; private volatile String proxyPassword; + private String networkInterfaceName; + private InetAddress localAddress; + private ValidationResult nicValidationResult; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); @@ -135,7 +148,6 @@ public StandardRemoteProcessGroup(final String id, final String targetUris, fina final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) { this.nifiProperties = nifiProperties; this.id = requireNonNull(id); - this.flowController = requireNonNull(flowController); this.targetUris = targetUris; this.targetId = null; @@ -354,6 +366,11 @@ public String getAuthorizationIssue() { return authorizationIssue; } + @Override + public Collection validate() { + return (nicValidationResult == null) ? Collections.emptyList() : Collections.singletonList(nicValidationResult); + } + public int getInputPortCount() { readLock.lock(); try { @@ -606,7 +623,7 @@ private void addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) { } final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(), - this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties); + this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties); outputPorts.put(descriptor.getId(), port); if (descriptor.getConcurrentlySchedulableTaskCount() != null) { @@ -672,7 +689,7 @@ private void addInputPort(final RemoteProcessGroupPortDescriptor descriptor) { } final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(), this, - TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties); + TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties); if (descriptor.getConcurrentlySchedulableTaskCount() != null) { port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount()); @@ -741,15 +758,6 @@ private void setCounts(final ProcessGroupCounts counts) { } } - private ProcessGroup getRootGroup() { - return getRootGroup(getProcessGroup()); - } - - private ProcessGroup getRootGroup(final ProcessGroup context) { - final ProcessGroup parent = context.getParent(); - return parent == null ? context : getRootGroup(parent); - } - @Override public Date getLastRefreshTime() { readLock.lock(); @@ -856,10 +864,75 @@ public void refreshFlowContents() throws CommunicationsException { } } + @Override + public String getNetworkInterface() { + readLock.lock(); + try { + return networkInterfaceName; + } finally { + readLock.unlock(); + } + } + + @Override + public void setNetworkInterface(final String interfaceName) { + writeLock.lock(); + try { + this.networkInterfaceName = interfaceName; + + try { + final Enumeration inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses(); + + if (inetAddresses.hasMoreElements()) { + this.localAddress = inetAddresses.nextElement(); + this.nicValidationResult = null; + } else { + this.localAddress = null; + this.nicValidationResult = new ValidationResult.Builder() + .input(interfaceName) + .subject("Network Interface Name") + .valid(false) + .explanation("No IP Address could be found that is bound to the interface with name " + interfaceName) + .build(); + } + } catch (final Exception e) { + this.localAddress = null; + this.nicValidationResult = new ValidationResult.Builder() + .input(interfaceName) + .subject("Network Interface Name") + .valid(false) + .explanation("Could not obtain Network Interface with name " + interfaceName) + .build(); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public InetAddress getLocalAddress() { + readLock.lock(); + try { + if (nicValidationResult != null && !nicValidationResult.isValid()) { + return null; + } + + return localAddress; + } finally { + readLock.unlock(); + } + } + private SiteToSiteRestApiClient getSiteToSiteRestApiClient() { SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter()); apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + + final InetAddress localAddress = getLocalAddress(); + if (localAddress != null) { + apiClient.setLocalAddress(localAddress); + } + return apiClient; } @@ -886,17 +959,6 @@ private Set convertRemotePort(final Set result.isValid()); + + return groupValid; } @Override @@ -444,6 +457,14 @@ public void verifyCanStart() { if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty()) { throw new IllegalStateException("Port " + getName() + " has no incoming connections"); } + + final Optional resultOption = remoteGroup.validate().stream() + .filter(result -> !result.isValid()) + .findFirst(); + + if (resultOption.isPresent()) { + throw new IllegalStateException("Remote Process Group is not valid: " + resultOption.get().toString()); + } } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 2d48515e0d47..31cd1549c549 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -110,6 +110,7 @@ private void setupMock(final SiteToSiteTransportProtocol protocol, connectableType = null; break; } + port = spy(new StandardRemoteGroupPort(ID, NAME, processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null))); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 8049c1252093..113d491241f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,32 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.nifi.action.Action; import org.apache.nifi.action.component.details.ComponentDetails; import org.apache.nifi.action.component.details.ExtensionDetails; @@ -157,32 +183,6 @@ import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -1550,7 +1550,19 @@ public RemoteProcessGroupDTO createRemoteProcessGroupDto(final RemoteProcessGrou } if (group.getAuthorizationIssue() != null) { - dto.setAuthorizationIssues(Arrays.asList(group.getAuthorizationIssue())); + final List authIssues = new ArrayList<>(); + final String authIssue = group.getAuthorizationIssue(); + if (authIssue != null) { + authIssues.add(authIssue); + } + + final Collection validationResults = group.validate(); + validationResults.stream() + .filter(result -> !result.isValid()) + .map(result -> result.toString()) + .forEach(str -> authIssues.add(str)); + + dto.setAuthorizationIssues(authIssues); } dto.setActiveRemoteInputPortCount(activeRemoteInputPortCount); From eb2ccd9f3925d04bec1198df230f2f0e7d20d97e Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Wed, 1 Mar 2017 13:51:34 -0500 Subject: [PATCH 2/5] NIFI-3541: - Allowing the user to specify the network interface to send/receive data for a Remote Process Group. Signed-off-by: Mark Payne --- .../web/api/dto/RemoteProcessGroupDTO.java | 21 ++++++++++++ .../RemoteProcessGroupEntityMerger.java | 3 ++ .../apache/nifi/web/api/dto/DtoFactory.java | 2 ++ .../impl/StandardRemoteProcessGroupDAO.java | 5 +++ .../new-remote-process-group-dialog.jsp | 34 +++++++++++++------ .../remote-process-group-configuration.jsp | 34 +++++++++++++------ .../canvas/remote-process-group-details.jsp | 34 +++++++++++++------ .../src/main/webapp/css/dialog.css | 4 --- .../remote-process-group-configuration.css | 13 ++----- .../nf-ng-remote-process-group-component.js | 5 +++ .../nf-remote-process-group-configuration.js | 2 ++ .../canvas/nf-remote-process-group-details.js | 2 ++ .../js/nf/canvas/nf-remote-process-group.js | 29 ++++++++++++---- 13 files changed, 133 insertions(+), 55 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java index df01b82e1a9c..ee64b69f526c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java @@ -39,12 +39,14 @@ public class RemoteProcessGroupDTO extends ComponentDTO { private String communicationsTimeout; private String yieldDuration; private String transportProtocol; + private String localNetworkInterface; private String proxyHost; private Integer proxyPort; private String proxyUser; private String proxyPassword; private Collection authorizationIssues; + private Collection validationErrors; private Boolean transmitting; private Integer inputPortCount; @@ -349,6 +351,25 @@ public void setTransportProtocol(String transportProtocol) { this.transportProtocol = transportProtocol; } + @ApiModelProperty("The local network interface to send/receive data. If not specified, any local address is used. If clustered, all nodes must have an interface with this identifier.") + public String getLocalNetworkInterface() { + return localNetworkInterface; + } + + public void setLocalNetworkInterface(String localNetworkInterface) { + this.localNetworkInterface = localNetworkInterface; + } + + @ApiModelProperty( + "The validation errors for the remote process group. These validation errors represent the problems with the remote process group that must be resolved before it can transmit." + ) + public Collection getValidationErrors() { + return validationErrors; + } + + public void setValidationErrors(Collection validationErrors) { + this.validationErrors = validationErrors; + } public String getProxyHost() { return proxyHost; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java index 3209e025c1b8..69437deedacd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java @@ -75,6 +75,7 @@ private static void mergeDtos(final RemoteProcessGroupDTO clientDto, final Map> authorizationErrorMap = new HashMap<>(); + final Map> validationErrorMap = new HashMap<>(); Boolean mergedIsTargetSecure = null; final Set mergedInputPorts = new HashSet<>(); final Set mergedOutputPorts = new HashSet<>(); @@ -88,6 +89,7 @@ private static void mergeDtos(final RemoteProcessGroupDTO clientDto, final Map
-
- Transport Protocol -
+
+
+ Transport Protocol +
+
+
+
+
-
-
+
+
+ Local Network Interface +
+
+
+ +
+
-
+
HTTP Proxy server hostname
@@ -44,7 +56,7 @@
-
+
HTTP Proxy server port
@@ -56,7 +68,7 @@
-
+
HTTP Proxy user
@@ -65,7 +77,7 @@
-
+
HTTP Proxy password
@@ -77,7 +89,7 @@
-
+
Communications timeout
@@ -86,7 +98,7 @@
-
+
Yield duration
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp index c8af6d3ed1d4..36ecd48b1f18 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-configuration.jsp @@ -36,16 +36,28 @@
-
- Transport Protocol -
+
+
+ Transport Protocol +
+
+
+
+
-
-
+
+
+ Local Network Interface +
+
+
+ +
+
-
+
HTTP Proxy server hostname
@@ -54,7 +66,7 @@
-
+
HTTP Proxy server port
@@ -66,7 +78,7 @@
-
+
HTTP Proxy user
@@ -75,7 +87,7 @@
-
+
HTTP Proxy password
@@ -87,7 +99,7 @@
-
+
Communications timeout
@@ -96,7 +108,7 @@
-
+
Yield duration
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp index 6f7f99271078..3388a681bdd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-details.jsp @@ -36,16 +36,28 @@
-
- Transport Protocol -
+
+
+ Transport Protocol +
+
+
+
+
-
-
+
+
+ Local Network Interface +
+
+
+ +
+
-
+
HTTP Proxy server hostname
@@ -54,7 +66,7 @@
-
+
HTTP Proxy server port
@@ -66,7 +78,7 @@
-
+
HTTP Proxy user
@@ -75,7 +87,7 @@
-
+
HTTP Proxy password
@@ -87,7 +99,7 @@
-
+
Communications timeout
@@ -96,7 +108,7 @@
-
+
Yield duration
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css index 40a520f013f7..4aaa25360c62 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/dialog.css @@ -18,10 +18,6 @@ Specific dialog settings. */ -#new-remote-process-group-transport-protocol-combo { - width: 160px; -} - #fill-color-dialog { display: none; width: 240px; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css index cc235b64ddc8..58dcfcd005b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css @@ -49,22 +49,13 @@ display: none; } -#remote-process-group-transport-protocol-combo { - width: 160px; -} - -div.remote-process-group-timeout-setting, div.remote-process-group-yield-duration-setting, -div.remote-process-group-proxy-host-setting, div.remote-process-group-proxy-port-setting, -div.remote-process-group-proxy-user-setting, div.remote-process-group-proxy-password-setting { +div.remote-process-group-setting-left, div.remote-process-group-setting-right { float: left; width: 49%; } -div.remote-process-group-yield-duration-setting, -div.remote-process-group-proxy-port-setting, -div.remote-process-group-proxy-password-setting { +div.remote-process-group-setting-right { margin-left: 2%; - width: 49%; } /* remote process group port configuration */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js index 5e6e116a0088..08f1b1a216da 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js @@ -78,6 +78,7 @@ 'communicationsTimeout': $('#new-remote-process-group-timeout').val(), 'yieldDuration': $('#new-remote-process-group-yield-duration').val(), 'transportProtocol': $('#new-remote-process-group-transport-protocol-combo').combo('getSelectedOption').value, + 'localNetworkInterface': $('#new-remote-process-group-local-network-interface').val(), 'proxyHost': $('#new-remote-process-group-proxy-host').val(), 'proxyPort': $('#new-remote-process-group-proxy-port').val(), 'proxyUser': $('#new-remote-process-group-proxy-user').val(), @@ -155,6 +156,7 @@ init: function () { var defaultTimeout = "30 sec"; var defaultYieldDuration = "10 sec"; + // configure the new remote process group dialog this.getElement().modal({ scrollableContentStyle: 'scrollable', @@ -167,6 +169,7 @@ $('#new-remote-process-group-transport-protocol-combo').combo('setSelectedOption', { value: 'RAW' }); + $('#new-remote-process-group-local-network-interface').val(''); $('#new-remote-process-group-proxy-host').val(''); $('#new-remote-process-group-proxy-port').val(''); $('#new-remote-process-group-proxy-user').val(''); @@ -174,9 +177,11 @@ } } }); + // set default values $('#new-remote-process-group-timeout').val(defaultTimeout); $('#new-remote-process-group-yield-duration').val(defaultYieldDuration); + // initialize the transport protocol combo $('#new-remote-process-group-transport-protocol-combo').combo({ options: [{ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js index 06cf866af5bc..c616bc5571d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js @@ -150,6 +150,7 @@ $('#remote-process-group-transport-protocol-combo').combo('setSelectedOption', { value: 'RAW' }); + $('#remote-process-group-local-network-interface').val(''); $('#remote-process-group-proxy-host').val(''); $('#remote-process-group-proxy-port').val(''); $('#remote-process-group-proxy-user').val(''); @@ -191,6 +192,7 @@ $('#remote-process-group-proxy-port').val(selectionData.component.proxyPort); $('#remote-process-group-proxy-user').val(selectionData.component.proxyUser); $('#remote-process-group-proxy-password').val(selectionData.component.proxyPassword); + $('#remote-process-group-local-network-interface').val(selectionData.component.localNetworkInterface); // select the appropriate transport-protocol $('#remote-process-group-transport-protocol-combo').combo('setSelectedOption', { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js index 7c6d66702495..6c613982d411 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js @@ -65,6 +65,7 @@ nfCommon.clearField('read-only-remote-process-group-timeout'); nfCommon.clearField('read-only-remote-process-group-yield-duration'); nfCommon.clearField('read-only-remote-process-group-transport-protocol'); + nfCommon.clearField('read-only-remote-process-group-local-network-interface'); nfCommon.clearField('read-only-remote-process-group-proxy-host'); nfCommon.clearField('read-only-remote-process-group-proxy-port'); nfCommon.clearField('read-only-remote-process-group-proxy-user'); @@ -91,6 +92,7 @@ nfCommon.populateField('read-only-remote-process-group-timeout', selectionData.component.communicationsTimeout); nfCommon.populateField('read-only-remote-process-group-yield-duration', selectionData.component.yieldDuration); nfCommon.populateField('read-only-remote-process-group-transport-protocol', selectionData.component.transportProtocol); + nfCommon.populateField('read-only-remote-process-group-local-network-interface', selectionData.component.localNetworkInterface); nfCommon.populateField('read-only-remote-process-group-proxy-host', selectionData.component.proxyHost); nfCommon.populateField('read-only-remote-process-group-proxy-port', selectionData.component.proxyPort); nfCommon.populateField('read-only-remote-process-group-proxy-user', selectionData.component.proxyUser); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js index 9842c71d335d..b91afd1e952d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js @@ -667,6 +667,21 @@ }); }; + var hasIssues = function (d) { + return !nfCommon.isEmpty(d.component.authorizationIssues) || !nfCommon.isEmpty(d.component.validationErrors); + }; + + var getIssues = function (d) { + var issues = []; + if (!nfCommon.isEmpty(d.component.authorizationIssues)) { + issues = issues.concat(d.component.authorizationIssues); + } + if (!nfCommon.isEmpty(d.component.validationErrors)) { + issues = issues.concat(d.component.validationErrors); + } + return issues; + }; + /** * Updates the process group status. * @@ -723,7 +738,7 @@ .text(function (d) { var icon = ''; if (d.permissions.canRead) { - if (!nfCommon.isEmpty(d.component.authorizationIssues)) { + if (hasIssues(d)) { icon = '\uf071'; } else if (d.component.transmitting === true) { icon = '\uf140'; @@ -736,7 +751,7 @@ .attr('font-family', function (d) { var family = ''; if (d.permissions.canRead) { - if (!nfCommon.isEmpty(d.component.authorizationIssues) || d.component.transmitting) { + if (hasIssues(d) || d.component.transmitting) { family = 'FontAwesome'; } else { family = 'flowfont'; @@ -745,20 +760,20 @@ return family; }) .classed('invalid', function (d) { - return d.permissions.canRead && !nfCommon.isEmpty(d.component.authorizationIssues); + return d.permissions.canRead && hasIssues(d); }) .classed('transmitting', function (d) { - return d.permissions.canRead && nfCommon.isEmpty(d.component.authorizationIssues) && d.component.transmitting === true; + return d.permissions.canRead && !hasIssues(d) && d.component.transmitting === true; }) .classed('not-transmitting', function (d) { - return d.permissions.canRead && nfCommon.isEmpty(d.component.authorizationIssues) && d.component.transmitting === false; + return d.permissions.canRead && !hasIssues(d) && d.component.transmitting === false; }) .each(function (d) { // get the tip var tip = d3.select('#authorization-issues-' + d.id); // if there are validation errors generate a tooltip - if (d.permissions.canRead && !nfCommon.isEmpty(d.component.authorizationIssues)) { + if (d.permissions.canRead && hasIssues(d)) { // create the tip if necessary if (tip.empty()) { tip = d3.select('#remote-process-group-tooltips').append('div') @@ -770,7 +785,7 @@ // update the tip tip.html(function () { - var list = nfCommon.formatUnorderedList(d.component.authorizationIssues); + var list = nfCommon.formatUnorderedList(getIssues(d)); if (list === null || list.length === 0) { return ''; } else { From dbfe0598f3d205fd0e5751b4012efa1ddd27b9e9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 1 Mar 2017 15:15:31 -0500 Subject: [PATCH 3/5] NIFI-3541: Updated serializer / synchronizer / fingerprintfactory to account for local network interface --- .../io/socket/ssl/SSLSocketChannel.java | 7 +++- .../client/socket/EndpointConnectionPool.java | 14 ++++++-- .../remote/client/socket/SocketClient.java | 2 +- .../RemoteProcessGroupEntityMerger.java | 2 +- .../nifi/remote/protocol/ServerProtocol.java | 4 +-- .../controller/StandardFlowSynchronizer.java | 7 ++++ .../serialization/FlowFromDOMFactory.java | 1 + .../serialization/StandardFlowSerializer.java | 5 ++- .../nifi/fingerprint/FingerprintFactory.java | 1 + .../remote/StandardRemoteProcessGroup.java | 35 ++++++++++--------- .../src/main/resources/FlowConfiguration.xsd | 1 + .../StandardHttpFlowFileServerProtocol.java | 2 +- .../socket/SocketFlowFileServerProtocol.java | 4 +-- .../apache/nifi/web/api/dto/DtoFactory.java | 25 +++++++------ .../impl/StandardRemoteProcessGroupDAO.java | 23 +++++++----- .../nf-remote-process-group-configuration.js | 3 +- 16 files changed, 86 insertions(+), 50 deletions(-) diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 7a09f5f1c605..7e5b303260cd 100644 --- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -18,6 +18,7 @@ import java.io.Closeable; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; @@ -68,9 +69,13 @@ public class SSLSocketChannel implements Closeable { private boolean closed = false; private volatile boolean interrupted = false; - public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException { + public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final InetAddress localAddress, final boolean client) throws IOException { this.socketAddress = new InetSocketAddress(hostname, port); this.channel = SocketChannel.open(); + if (localAddress != null) { + final SocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0); + this.channel.bind(localSocketAddress); + } this.hostname = hostname; this.port = port; this.engine = sslContext.createSSLEngine(); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 6f08f73f9808..926e4b4d759e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -23,7 +23,9 @@ import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; import java.nio.channels.SocketChannel; import java.security.cert.CertificateException; @@ -87,9 +89,11 @@ public class EndpointConnectionPool implements PeerStatusProvider { private final SiteInfoProvider siteInfoProvider; private final PeerSelector peerSelector; + private final InetAddress localAddress; public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, - final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider) { + final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider, + final InetAddress localAddress) { Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null"); this.remoteDestination = remoteDestination; @@ -97,6 +101,7 @@ public EndpointConnectionPool(final RemoteDestination remoteDestination, final i this.eventReporter = eventReporter; this.commsTimeout = commsTimeoutMillis; this.idleExpirationMillis = idleExpirationMillis; + this.localAddress = localAddress; this.siteInfoProvider = siteInfoProvider; @@ -440,7 +445,7 @@ private CommunicationsSession establishSiteToSiteConnection(final String hostnam + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); } - final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, localAddress, true); socketChannel.connect(); commsSession = new SSLSocketChannelCommunicationsSession(socketChannel); @@ -452,6 +457,11 @@ private CommunicationsSession establishSiteToSiteConnection(final String hostnam } } else { final SocketChannel socketChannel = SocketChannel.open(); + if (localAddress != null) { + final SocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0); + socketChannel.socket().bind(localSocketAddress); + } + socketChannel.socket().connect(new InetSocketAddress(hostname, port), commsTimeout); socketChannel.socket().setSoTimeout(commsTimeout); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 1d3cce7d48ab..ba6839ce4366 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -52,7 +52,7 @@ public SocketClient(final SiteToSiteClientConfig config) { commsTimeout, (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(), - siteInfoProvider + siteInfoProvider, config.getLocalAddress() ); this.compress = config.isUseCompression(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java index 69437deedacd..a426d939e1fc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java @@ -33,7 +33,6 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger entityMap) { ComponentEntityMerger.super.merge(clientEntity, entityMap); for (Map.Entry entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); final RemoteProcessGroupEntity entityStatus = entry.getValue(); if (entityStatus != clientEntity) { mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); @@ -47,6 +46,7 @@ public void merge(RemoteProcessGroupEntity clientEntity, Map entityMap) { final RemoteProcessGroupDTO clientDto = clientEntity.getComponent(); final Map dtoMap = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java index 43660688c722..69ae396c8d4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java @@ -145,8 +145,8 @@ void sendPeerList( Peer peer, Optional clusterNodeInfo, String remoteInputHost, - int remoteInputPort, - int remoteInputHttpPort, + Integer remoteInputPort, + Integer remoteInputHttpPort, boolean isSiteToSiteSecure) throws IOException; void shutdown(Peer peer); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index d8654756a582..db9e68b821a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -45,6 +45,7 @@ import javax.xml.validation.SchemaFactory; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.cluster.protocol.DataFlow; @@ -1126,6 +1127,12 @@ private ProcessGroup addProcessGroup(final FlowController controller, final Proc remoteGroup.setProxyPassword(remoteGroupDto.getProxyPassword()); } + if (StringUtils.isBlank(remoteGroupDto.getLocalNetworkInterface())) { + remoteGroup.setNetworkInterface(null); + } else { + remoteGroup.setNetworkInterface(remoteGroupDto.getLocalNetworkInterface()); + } + final Set inputPorts = new HashSet<>(); for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) { inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 6c39f167cbcc..f8d38bc81546 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -262,6 +262,7 @@ public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element, dto.setProxyHost(getString(element, "proxyHost")); dto.setProxyPort(getOptionalInt(element, "proxyPort")); dto.setProxyUser(getString(element, "proxyUser")); + dto.setLocalNetworkInterface(getString(element, "networkInterface")); final String rawPassword = getString(element, "proxyPassword"); final String proxyPassword = encryptor == null ? rawPassword : decrypt(rawPassword, encryptor); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index b8936ba1c3e2..f6e3d2b2881c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -69,7 +69,7 @@ */ public class StandardFlowSerializer implements FlowSerializer { - private static final String MAX_ENCODING_VERSION = "1.0"; + private static final String MAX_ENCODING_VERSION = "1.1"; private final StringEncryptor encryptor; @@ -261,6 +261,9 @@ private void addRemoteProcessGroup(final Element parentElement, final RemoteProc final String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX; addTextElement(element, "proxyPassword", value); } + if (remoteRef.getNetworkInterface() != null) { + addTextElement(element, "networkInterface", remoteRef.getNetworkInterface()); + } for (final RemoteGroupPort port : remoteRef.getInputPorts()) { if (port.hasIncomingConnection()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 3634850b3b31..3679b9819f72 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -679,6 +679,7 @@ private StringBuilder addLabelFingerprint(final StringBuilder builder, final Lab private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final Element remoteProcessGroupElem) throws FingerprintException { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "id")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "url")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "networkInterface")); final NodeList inputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "inputPort"); final NodeList outputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "outputPort"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 67c8f11839f2..286b2dcef167 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -879,30 +879,33 @@ public void setNetworkInterface(final String interfaceName) { writeLock.lock(); try { this.networkInterfaceName = interfaceName; + if (interfaceName == null) { + this.nicValidationResult = null; + } else { + try { + final Enumeration inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses(); - try { - final Enumeration inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses(); - - if (inetAddresses.hasMoreElements()) { - this.localAddress = inetAddresses.nextElement(); - this.nicValidationResult = null; - } else { + if (inetAddresses.hasMoreElements()) { + this.localAddress = inetAddresses.nextElement(); + this.nicValidationResult = null; + } else { + this.localAddress = null; + this.nicValidationResult = new ValidationResult.Builder() + .input(interfaceName) + .subject("Network Interface Name") + .valid(false) + .explanation("No IP Address could be found that is bound to the interface with name " + interfaceName) + .build(); + } + } catch (final Exception e) { this.localAddress = null; this.nicValidationResult = new ValidationResult.Builder() .input(interfaceName) .subject("Network Interface Name") .valid(false) - .explanation("No IP Address could be found that is bound to the interface with name " + interfaceName) + .explanation("Could not obtain Network Interface with name " + interfaceName) .build(); } - } catch (final Exception e) { - this.localAddress = null; - this.nicValidationResult = new ValidationResult.Builder() - .input(interfaceName) - .subject("Network Interface Name") - .valid(false) - .explanation("Could not obtain Network Interface with name " + interfaceName) - .build(); } } finally { writeLock.unlock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 4607320270dd..02a9ca51d598 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -205,6 +205,7 @@ +