From 01aa2aaddb382a19d4b9d0908030dc6b56010a74 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 13 Nov 2017 13:17:18 -0500 Subject: [PATCH 1/2] NIFI-4598: When we retrieve the 'controller' from a remote NiFi instance in order to determine which ports are available, cache those results for up to some configurable amount of time (default 30 secs) so that we don't constantly issue HTTP Requests to the remote nifi --- .../org/apache/nifi/util/NiFiProperties.java | 1 + .../client/AbstractSiteToSiteClient.java | 1 + .../CachingRemoteGroupContentsManager.java | 115 ++++++++++++++++++ .../client/NopRemoteGroupContentsManager.java | 32 +++++ .../nifi/remote/client/PeerSelector.java | 2 +- .../client/RemoteGroupContentsManager.java | 28 +++++ .../nifi/remote/client/SiteInfoProvider.java | 6 + .../nifi/remote/client/SiteToSiteClient.java | 23 ++++ .../remote/client/SiteToSiteClientConfig.java | 11 ++ .../nifi/remote/client/http/HttpClient.java | 2 + .../remote/util/SiteToSiteRestApiClient.java | 89 +++++++++++++- .../main/asciidoc/administration-guide.adoc | 3 + .../remote/StandardRemoteProcessGroup.java | 6 +- .../src/main/resources/conf/nifi.properties | 1 + 14 files changed, 314 insertions(+), 6 deletions(-) create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java create mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index d51dea44b2e0..beb222a49690 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -73,6 +73,7 @@ public abstract class NiFiProperties { public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure"; public static final String SITE_TO_SITE_HTTP_ENABLED = "nifi.remote.input.http.enabled"; public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = "nifi.remote.input.http.transaction.ttl"; + public static final String REMOTE_CONTENTS_CACHE_EXPIRATION = "nifi.remote.contents.cache.expiration"; public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory"; public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java index c6b98a05f9af..a35be863b380 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java @@ -32,6 +32,7 @@ public AbstractSiteToSiteClient(final SiteToSiteClientConfig config) { siteInfoProvider.setSslContext(config.getSslContext()); siteInfoProvider.setConnectTimeoutMillis(commsTimeout); siteInfoProvider.setReadTimeoutMillis(commsTimeout); + siteInfoProvider.setCachedContentsExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); siteInfoProvider.setProxy(config.getHttpProxy()); siteInfoProvider.setLocalAddress(config.getLocalAddress()); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java new file mode 100644 index 000000000000..a72ae6d117d2 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.remote.client; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CachingRemoteGroupContentsManager implements RemoteGroupContentsManager { + private static transient final Logger logger = LoggerFactory.getLogger(CachingRemoteGroupContentsManager.class); + + private final transient ConcurrentMap contentsMap = new ConcurrentHashMap<>(); + private final transient long refreshMillis; + private volatile transient long lastPruneTimestamp = System.currentTimeMillis(); + + public CachingRemoteGroupContentsManager(final long refreshMillis) { + this.refreshMillis = refreshMillis; + } + + + @Override + public ControllerDTO getRemoteContents(final String url, final SiteToSiteRestApiClient client) throws IOException { + // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed + // from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. + if (System.currentTimeMillis() > lastPruneTimestamp + 4 * refreshMillis) { + prune(); + } + + final String internedUrl = url.intern(); + synchronized (internedUrl) { + final RemoteGroupContents groupContents = contentsMap.get(url); + + if (groupContents == null || groupContents.getContents() == null || groupContents.isOlderThan(refreshMillis)) { + logger.debug("No Contents for remote group at URL {} or contents have expired; will refresh contents", url); + + final ControllerDTO refreshedContents; + try { + refreshedContents = client.getController(url); + } catch (final Exception e) { + // we failed to refresh contents, but we don't want to constantly poll the remote instance, failing. + // So we put the ControllerDTO back but use a new RemoteGroupContents so that we get a new timestamp. + final ControllerDTO existingController = groupContents == null ? null : groupContents.getContents(); + final RemoteGroupContents updatedContents = new RemoteGroupContents(existingController); + contentsMap.put(url, updatedContents); + throw e; + } + + logger.debug("Successfully retrieved contents for remote group at URL {}", url); + + final RemoteGroupContents updatedContents = new RemoteGroupContents(refreshedContents); + contentsMap.put(url, updatedContents); + return refreshedContents; + } + + logger.debug("Contents for remote group at URL {} have already been fetched and have not yet expired. Will return the cached value.", url); + return groupContents.getContents(); + } + } + + private void prune() { + for (final Map.Entry entry : contentsMap.entrySet()) { + final String url = entry.getKey(); + final RemoteGroupContents contents = entry.getValue(); + + // If any entry in the map is more than 4 times as old as the refresh period, + // then we can go ahead and remove it from the map. We use 4 * refreshMillis + // just to ensure that we don't have any race condition with the above #getRemoteContents. + if (contents.isOlderThan(refreshMillis * 4)) { + contentsMap.remove(url, contents); + } + } + } + + + private static class RemoteGroupContents implements Serializable { + private final ControllerDTO contents; + private final long timestamp; + + public RemoteGroupContents(final ControllerDTO contents) { + this.contents = contents; + this.timestamp = System.currentTimeMillis(); + } + + public ControllerDTO getContents() { + return contents; + } + + public boolean isOlderThan(final long millis) { + final long millisSinceRefresh = System.currentTimeMillis() - timestamp; + return millisSinceRefresh > millis; + } + } +} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java new file mode 100644 index 000000000000..1cc239e1ae28 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.remote.client; + +import java.io.IOException; + +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.web.api.dto.ControllerDTO; + +public class NopRemoteGroupContentsManager implements RemoteGroupContentsManager { + + @Override + public ControllerDTO getRemoteContents(final String url, final SiteToSiteRestApiClient client) throws IOException { + return client.getController(url); + } + +} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index a7bd0945f02e..a4439673d217 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -22,10 +22,10 @@ import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.util.PeerStatusCache; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java new file mode 100644 index 000000000000..58ce553db841 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.remote.client; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.web.api.dto.ControllerDTO; + +public interface RemoteGroupContentsManager extends Serializable { + ControllerDTO getRemoteContents(String url, SiteToSiteRestApiClient client) throws IOException; +} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java index 06b939dff1d6..f8151da7be2b 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java @@ -59,6 +59,7 @@ public class SiteInfoProvider { private SSLContext sslContext; private int connectTimeoutMillis; private int readTimeoutMillis; + private long cachedContentExpirationMillis = TimeUnit.SECONDS.toMillis(30L); private ControllerDTO refreshRemoteInfo() throws IOException { @@ -104,6 +105,7 @@ protected SiteToSiteRestApiClient createSiteToSiteRestApiClient(final SSLContext apiClient.setConnectTimeoutMillis(connectTimeoutMillis); apiClient.setReadTimeoutMillis(readTimeoutMillis); apiClient.setLocalAddress(localAddress); + apiClient.setCacheExpirationMillis(cachedContentExpirationMillis); return apiClient; } @@ -272,6 +274,10 @@ public void setReadTimeoutMillis(int readTimeoutMillis) { this.readTimeoutMillis = readTimeoutMillis; } + public void setCachedContentsExpirationMillis(long expirationMillis) { + this.cachedContentExpirationMillis = expirationMillis; + } + public void setProxy(HttpProxy proxy) { this.proxy = proxy; } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index daff70d6625f..39dbda516e59 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -154,6 +154,7 @@ public static class Builder implements Serializable { private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); + private long contentsCacheExpirationMillis = TimeUnit.SECONDS.toMillis(30L); private SSLContext sslContext; private String keystoreFilename; private String keystorePass; @@ -184,6 +185,7 @@ public Builder fromConfig(final SiteToSiteClientConfig config) { this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); + this.contentsCacheExpirationMillis = config.getCacheExpiration(TimeUnit.MILLISECONDS); this.sslContext = config.getSslContext(); this.keystoreFilename = config.getKeystoreFilename(); this.keystorePass = config.getKeystorePassword(); @@ -273,6 +275,19 @@ public Builder timeout(final long timeout, final TimeUnit unit) { return this; } + /** + * Specifies how long the contents of a remote NiFi instance should be cached before making + * another web request to the remote instance. + * + * @param expirationPeriod the amount of time that an entry in the cache should expire + * @param unit unit of time over which to interpret the given expirationPeriod + * @return the builder + */ + public Builder cacheExpiration(final long expirationPeriod, final TimeUnit unit) { + this.contentsCacheExpirationMillis = unit.toMillis(expirationPeriod); + return this; + } + /** * Specifies the amount of time that a connection can remain idle in the * connection pool before it is "expired" and shutdown. The default @@ -722,6 +737,7 @@ class StandardSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializ private final long timeoutNanos; private final long penalizationNanos; private final long idleExpirationNanos; + private final long contentsCacheExpirationMillis; private final SSLContext sslContext; private final String keystoreFilename; private final String keystorePass; @@ -746,6 +762,7 @@ private StandardSiteToSiteClientConfig() { this.timeoutNanos = 0; this.penalizationNanos = 0; this.idleExpirationNanos = 0; + this.contentsCacheExpirationMillis = 30000L; this.sslContext = null; this.keystoreFilename = null; this.keystorePass = null; @@ -773,6 +790,7 @@ private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { this.timeoutNanos = builder.timeoutNanos; this.penalizationNanos = builder.penalizationNanos; this.idleExpirationNanos = builder.idleExpirationNanos; + this.contentsCacheExpirationMillis = builder.contentsCacheExpirationMillis; this.sslContext = builder.sslContext; this.keystoreFilename = builder.keystoreFilename; this.keystorePass = builder.keystorePass; @@ -816,6 +834,11 @@ public long getTimeout(final TimeUnit timeUnit) { return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); } + @Override + public long getCacheExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(contentsCacheExpirationMillis, TimeUnit.MILLISECONDS); + } + @Override public long getIdleConnectionExpiration(final TimeUnit timeUnit) { return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 83e832847180..8da5e706b155 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -162,6 +162,17 @@ public interface SiteToSiteClientConfig extends Serializable { */ int getPreferredBatchCount(); + /** + * When the contents of a remote NiFi instance are fetched, that information is cached + * so that many calls that are made in a short period of time do not overwhelm the remote + * NiFi instance. This method will indicate the number of milliseconds that this information + * can be cached. + * + * @param unit the desired time unit + * @return the number of milliseconds that the contents of a remote NiFi instance will be cached + */ + long getCacheExpiration(TimeUnit unit); + /** * @return the EventReporter that is to be used by clients to report events */ diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index f0bdcf143e68..4213dac67a5d 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -106,6 +106,7 @@ public Set fetchRemotePeerStatuses(PeerDescription peerDescription) final int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS); apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis); + apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(config.getLocalAddress()); final Collection peers = apiClient.getPeers(); @@ -154,6 +155,7 @@ public Transaction createTransaction(final TransferDirection direction) throws H apiClient.setBaseUrl(peer.getUrl()); apiClient.setConnectTimeoutMillis(timeoutMillis); apiClient.setReadTimeoutMillis(timeoutMillis); + apiClient.setCacheExpirationMillis(config.getCacheExpiration(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(config.getLocalAddress()); apiClient.setCompress(config.isUseCompression()); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index f4de3713e9de..1670a532a758 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -76,8 +76,6 @@ import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.reporting.Severity; import org.apache.nifi.security.util.CertificateUtils; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.remote.PeerDTO; @@ -90,6 +88,9 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -110,8 +111,11 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -176,11 +180,14 @@ public class SiteToSiteRestApiClient implements Closeable { private int connectTimeoutMillis; private int readTimeoutMillis; + private long cacheExpirationMillis = 30000L; private static final Pattern HTTP_ABS_URL = Pattern.compile("^https?://.+$"); private Future postResult; private CountDownLatch transferDataLatch = new CountDownLatch(1); + private static final ConcurrentMap contentsMap = new ConcurrentHashMap<>(); + private volatile long lastPruneTimestamp = System.currentTimeMillis(); public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy, final EventReporter eventReporter) { this.sslContext = sslContext; @@ -268,7 +275,7 @@ private void setupClient() { final HttpClientBuilder clientBuilder = HttpClients.custom(); if (sslContext != null) { - clientBuilder.setSslcontext(sslContext); + clientBuilder.setSSLContext(sslContext); clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor()); } @@ -368,10 +375,48 @@ public ControllerDTO getController(final Set clusterUrls) throws IOExcep } private ControllerDTO getController() throws IOException { + // first check cache and prune any old values. + // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed + // from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. + if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) { + pruneCache(); + } + + final String internedUrl = baseUrl.intern(); + synchronized (internedUrl) { + final RemoteGroupContents groupContents = contentsMap.get(internedUrl); + + if (groupContents == null || groupContents.getContents() == null || groupContents.isOlderThan(cacheExpirationMillis)) { + logger.debug("No Contents for remote group at URL {} or contents have expired; will refresh contents", internedUrl); + + final ControllerDTO refreshedContents; + try { + refreshedContents = fetchController(); + } catch (final Exception e) { + // we failed to refresh contents, but we don't want to constantly poll the remote instance, failing. + // So we put the ControllerDTO back but use a new RemoteGroupContents so that we get a new timestamp. + final ControllerDTO existingController = groupContents == null ? null : groupContents.getContents(); + final RemoteGroupContents updatedContents = new RemoteGroupContents(existingController); + contentsMap.put(internedUrl, updatedContents); + throw e; + } + + logger.debug("Successfully retrieved contents for remote group at URL {}", internedUrl); + + final RemoteGroupContents updatedContents = new RemoteGroupContents(refreshedContents); + contentsMap.put(internedUrl, updatedContents); + return refreshedContents; + } + + logger.debug("Contents for remote group at URL {} have already been fetched and have not yet expired. Will return the cached value.", internedUrl); + return groupContents.getContents(); + } + } + + private ControllerDTO fetchController() throws IOException { try { final HttpGet get = createGetControllerRequest(); return execute(get, ControllerEntity.class).getController(); - } catch (final HttpGetFailedException e) { if (RESPONSE_CODE_NOT_FOUND == e.getResponseCode()) { logger.debug("getController received NOT_FOUND, trying to access the old NiFi version resource url..."); @@ -382,6 +427,20 @@ private ControllerDTO getController() throws IOException { } } + private void pruneCache() { + for (final Map.Entry entry : contentsMap.entrySet()) { + final String url = entry.getKey(); + final RemoteGroupContents contents = entry.getValue(); + + // If any entry in the map is more than 4 times as old as the refresh period, + // then we can go ahead and remove it from the map. We use 4 * refreshMillis + // just to ensure that we don't have any race condition with the above #getRemoteContents. + if (contents.isOlderThan(TimeUnit.MINUTES.toMillis(5))) { + contentsMap.remove(url, contents); + } + } + } + private HttpGet createGetControllerRequest() { final HttpGet get = createGet("/site-to-site"); get.setHeader(HttpHeaders.PROTOCOL_VERSION, String.valueOf(transportProtocolVersionNegotiator.getVersion())); @@ -1211,6 +1270,10 @@ public void setReadTimeoutMillis(final int readTimeoutMillis) { this.readTimeoutMillis = readTimeoutMillis; } + public void setCacheExpirationMillis(final long expirationMillis) { + this.cacheExpirationMillis = expirationMillis; + } + public static String getFirstUrl(final String clusterUrlStr) { if (clusterUrlStr == null) { return null; @@ -1452,4 +1515,22 @@ public TransactionResultEntity commitTransferFlowFiles(final String transactionU } + private static class RemoteGroupContents { + private final ControllerDTO contents; + private final long timestamp; + + public RemoteGroupContents(final ControllerDTO contents) { + this.contents = contents; + this.timestamp = System.currentTimeMillis(); + } + + public ControllerDTO getContents() { + return contents; + } + + public boolean isOlderThan(final long millis) { + final long millisSinceRefresh = System.currentTimeMillis() - timestamp; + return millisSinceRefresh > millis; + } + } } diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index b0e68d5b605c..12a50a94641c 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -2964,6 +2964,9 @@ Remote Process Groups can choose transport protocol from RAW and HTTP. Propertie Whether a Site-to-Site client uses HTTP or HTTPS is determined by `nifi.remote.input.secure`. If it is set to `true`, then requests are sent as HTTPS to `nifi.web.https.port`. If set to `false`, HTTP requests are sent to `nifi.web.http.port`. |nifi.remote.input.http.transaction.ttl|Specifies how long a transaction can stay alive on the server. By default, it is set to `30 secs`. + If a Site-to-Site client hasn’t proceeded to the next action after this period of time, the transaction is discarded from the remote NiFi instance. For example, when a client creates a transaction but doesn’t send or receive flow files, or when a client sends or receives flow files but doesn’t confirm that transaction. +|nifi.remote.contents.cache.expiration|Specifies how long NiFi should cache information about a remote NiFi instance when communicating via Site-to-Site. By default, NiFi will cache the + +responses from the remote system for `30 secs`. This allows NiFi to avoid constantly making HTTP requests to the remote system, which is particularly important when this instance of NiFi + +has many instances of Remote Process Groups. |==== === Web Properties diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 2b2e1fb974af..e7db254e5394 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -98,6 +98,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final ProcessScheduler scheduler; private final EventReporter eventReporter; private final NiFiProperties nifiProperties; + private final long remoteContentsCacheExpiration; private final AtomicReference name = new AtomicReference<>(); private final AtomicReference position = new AtomicReference<>(new Position(0D, 0D)); @@ -152,6 +153,9 @@ public StandardRemoteProcessGroup(final String id, final String targetUris, fina this.scheduler = flowController.getProcessScheduler(); this.authorizationIssue = "Establishing connection to " + targetUris; + final String expirationPeriod = nifiProperties.getProperty(NiFiProperties.REMOTE_CONTENTS_CACHE_EXPIRATION, "30 secs"); + remoteContentsCacheExpiration = FormatUtils.getTimeDuration(expirationPeriod, TimeUnit.MILLISECONDS); + final BulletinRepository bulletinRepository = flowController.getBulletinRepository(); eventReporter = new EventReporter() { private static final long serialVersionUID = 1L; @@ -947,7 +951,7 @@ private SiteToSiteRestApiClient getSiteToSiteRestApiClient() { apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setLocalAddress(getLocalAddress()); - + apiClient.setCacheExpirationMillis(remoteContentsCacheExpiration); return apiClient; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index f867ed3ae19e..624f2f1883f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -124,6 +124,7 @@ nifi.remote.input.secure=false nifi.remote.input.socket.port= nifi.remote.input.http.enabled=true nifi.remote.input.http.transaction.ttl=30 sec +nifi.remote.contents.cache.expiration=30 secs # web properties # nifi.web.war.directory=${nifi.web.war.directory} From 7a7642cd1fb33dc7769e5f9e6401549a032e6ada Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 13 Nov 2017 14:15:10 -0500 Subject: [PATCH 2/2] NIFI-4598: Deleted unused classes --- .../CachingRemoteGroupContentsManager.java | 115 ------------------ .../client/NopRemoteGroupContentsManager.java | 32 ----- .../client/RemoteGroupContentsManager.java | 28 ----- 3 files changed, 175 deletions(-) delete mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java delete mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java delete mode 100644 nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java deleted file mode 100644 index a72ae6d117d2..000000000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/CachingRemoteGroupContentsManager.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.remote.client; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.nifi.remote.util.SiteToSiteRestApiClient; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CachingRemoteGroupContentsManager implements RemoteGroupContentsManager { - private static transient final Logger logger = LoggerFactory.getLogger(CachingRemoteGroupContentsManager.class); - - private final transient ConcurrentMap contentsMap = new ConcurrentHashMap<>(); - private final transient long refreshMillis; - private volatile transient long lastPruneTimestamp = System.currentTimeMillis(); - - public CachingRemoteGroupContentsManager(final long refreshMillis) { - this.refreshMillis = refreshMillis; - } - - - @Override - public ControllerDTO getRemoteContents(final String url, final SiteToSiteRestApiClient client) throws IOException { - // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed - // from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. - if (System.currentTimeMillis() > lastPruneTimestamp + 4 * refreshMillis) { - prune(); - } - - final String internedUrl = url.intern(); - synchronized (internedUrl) { - final RemoteGroupContents groupContents = contentsMap.get(url); - - if (groupContents == null || groupContents.getContents() == null || groupContents.isOlderThan(refreshMillis)) { - logger.debug("No Contents for remote group at URL {} or contents have expired; will refresh contents", url); - - final ControllerDTO refreshedContents; - try { - refreshedContents = client.getController(url); - } catch (final Exception e) { - // we failed to refresh contents, but we don't want to constantly poll the remote instance, failing. - // So we put the ControllerDTO back but use a new RemoteGroupContents so that we get a new timestamp. - final ControllerDTO existingController = groupContents == null ? null : groupContents.getContents(); - final RemoteGroupContents updatedContents = new RemoteGroupContents(existingController); - contentsMap.put(url, updatedContents); - throw e; - } - - logger.debug("Successfully retrieved contents for remote group at URL {}", url); - - final RemoteGroupContents updatedContents = new RemoteGroupContents(refreshedContents); - contentsMap.put(url, updatedContents); - return refreshedContents; - } - - logger.debug("Contents for remote group at URL {} have already been fetched and have not yet expired. Will return the cached value.", url); - return groupContents.getContents(); - } - } - - private void prune() { - for (final Map.Entry entry : contentsMap.entrySet()) { - final String url = entry.getKey(); - final RemoteGroupContents contents = entry.getValue(); - - // If any entry in the map is more than 4 times as old as the refresh period, - // then we can go ahead and remove it from the map. We use 4 * refreshMillis - // just to ensure that we don't have any race condition with the above #getRemoteContents. - if (contents.isOlderThan(refreshMillis * 4)) { - contentsMap.remove(url, contents); - } - } - } - - - private static class RemoteGroupContents implements Serializable { - private final ControllerDTO contents; - private final long timestamp; - - public RemoteGroupContents(final ControllerDTO contents) { - this.contents = contents; - this.timestamp = System.currentTimeMillis(); - } - - public ControllerDTO getContents() { - return contents; - } - - public boolean isOlderThan(final long millis) { - final long millisSinceRefresh = System.currentTimeMillis() - timestamp; - return millisSinceRefresh > millis; - } - } -} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java deleted file mode 100644 index 1cc239e1ae28..000000000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/NopRemoteGroupContentsManager.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.remote.client; - -import java.io.IOException; - -import org.apache.nifi.remote.util.SiteToSiteRestApiClient; -import org.apache.nifi.web.api.dto.ControllerDTO; - -public class NopRemoteGroupContentsManager implements RemoteGroupContentsManager { - - @Override - public ControllerDTO getRemoteContents(final String url, final SiteToSiteRestApiClient client) throws IOException { - return client.getController(url); - } - -} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java deleted file mode 100644 index 58ce553db841..000000000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/RemoteGroupContentsManager.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.remote.client; - -import java.io.IOException; -import java.io.Serializable; - -import org.apache.nifi.remote.util.SiteToSiteRestApiClient; -import org.apache.nifi.web.api.dto.ControllerDTO; - -public interface RemoteGroupContentsManager extends Serializable { - ControllerDTO getRemoteContents(String url, SiteToSiteRestApiClient client) throws IOException; -}