From 7d7401add4699cd7d543ce18d11d7c9d6ab6e20c Mon Sep 17 00:00:00 2001 From: joewitt Date: Mon, 15 Aug 2016 20:18:47 -0700 Subject: [PATCH 1/4] NIFI-2574 Changed NiFiProperties to avoid static initializer and updated all references to it. --- ...ontrollerServiceInitializationContext.java | 3 +- .../apache/nifi/kerberos/KerberosContext.java | 51 ++++ .../ProcessorInitializationContext.java | 11 +- .../ReportingInitializationContext.java | 3 +- nifi-commons/nifi-hadoop-utils/pom.xml | 4 - .../nifi/hadoop/KerberosProperties.java | 36 +-- .../nifi/hadoop/TestKerberosProperties.java | 12 +- .../org/apache/nifi/util/NiFiProperties.java | 236 +++++++++------- .../apache/nifi/util/NiFiPropertiesTest.java | 52 +--- ...ontrollerServiceInitializationContext.java | 16 ++ .../MockProcessorInitializationContext.java | 16 ++ .../MockReportingInitializationContext.java | 16 ++ ...ontrollerServiceInitializationContext.java | 15 + .../MockProcessorInitializationContext.java | 16 ++ .../MockReportingInitializationContext.java | 16 ++ .../nifi/documentation/DocGeneratorTest.java | 35 ++- .../authorization/FileAuthorizerTest.java | 2 +- .../util/IdentityMappingUtil.java | 2 +- ...dardClusterCoordinationProtocolSender.java | 8 +- .../heartbeat/AbstractHeartbeatMonitor.java | 39 ++- .../ClusterProtocolHeartbeatMonitor.java | 40 ++- .../http/StandardHttpResponseMerger.java | 34 ++- .../StatusHistoryEndpointMerger.java | 18 +- .../ThreadPoolRequestReplicator.java | 12 +- .../node/CuratorNodeProtocolSender.java | 18 +- .../node/NodeClusterCoordinator.java | 100 +++---- ...hreadPoolRequestReplicatorFactoryBean.java | 18 +- .../TestAbstractHeartbeatMonitor.java | 14 +- .../StandardHttpResponseMergerSpec.groovy | 7 +- .../StatusHistoryEndpointMergerSpec.groovy | 5 +- .../TestThreadPoolRequestReplicator.java | 32 +-- .../node/TestNodeClusterCoordinator.java | 24 +- .../nifi/cluster/integration/Cluster.java | 10 +- .../apache/nifi/cluster/integration/Node.java | 27 +- .../nifi/remote/protocol/ServerProtocol.java | 12 +- .../nifi/connectable/StandardConnection.java | 19 +- .../controller/FileSystemSwapManager.java | 53 ++-- .../nifi/controller/FlowController.java | 80 +++--- .../nifi/controller/StandardFlowService.java | 103 +++---- .../controller/StandardFlowSynchronizer.java | 61 ++-- .../controller/StandardProcessorNode.java | 10 +- .../cluster/ClusterProtocolHeartbeater.java | 20 +- .../cluster/ZooKeeperClientConfig.java | 15 +- .../CuratorLeaderElectionManager.java | 31 +-- ...tandardReportingInitializationContext.java | 25 +- .../repository/FileSystemRepository.java | 86 +++--- .../repository/VolatileContentRepository.java | 44 ++- .../WriteAheadFlowFileRepository.java | 50 +++- .../scheduling/StandardProcessScheduler.java | 29 +- .../TimerDrivenSchedulingAgent.java | 11 +- ...ontrollerServiceInitializationContext.java | 24 +- .../StandardControllerServiceProvider.java | 7 +- .../VolatileComponentStatusRepository.java | 11 +- .../apache/nifi/encrypt/StringEncryptor.java | 21 +- .../StandardXMLFlowConfigurationDAO.java | 8 +- ...tandardProcessorInitializationContext.java | 24 +- .../remote/StandardRemoteProcessGroup.java | 78 +++--- .../StandardFlowSynchronizerSpec.groovy | 8 +- .../controller/StandardFlowServiceTest.java | 2 +- .../controller/TestFileSystemSwapManager.java | 10 +- .../nifi/controller/TestFlowController.java | 26 +- .../controller/TestStandardProcessorNode.java | 5 +- .../repository/TestFileSystemRepository.java | 101 +++---- .../TestStandardProcessSession.java | 260 +++++++++--------- .../TestVolatileContentRepository.java | 33 +-- .../TestWriteAheadFlowFileRepository.java | 30 +- .../scheduling/TestProcessorLifecycle.java | 73 +++-- .../TestStandardProcessScheduler.java | 64 +++-- ...StandardControllerServiceProviderTest.java | 14 +- ...TestStandardControllerServiceProvider.java | 40 +-- .../zookeeper/TestZooKeeperStateProvider.java | 17 +- .../TestContinuallyRunProcessorTask.java | 5 +- .../nifi/nar/NarThreadContextClassLoader.java | 31 ++- .../org/apache/nifi/nar/NarUnpackerTest.java | 58 ++-- .../src/main/java/org/apache/nifi/NiFi.java | 2 +- .../security/util/SslServerSocketFactory.java | 4 +- .../security/util/SslSocketFactory.java | 4 +- .../nifi/remote/HttpRemoteSiteListener.java | 30 +- .../nifi/remote/RemoteResourceFactory.java | 4 +- .../nifi/remote/SocketRemoteSiteListener.java | 19 +- .../nifi/remote/StandardRemoteGroupPort.java | 52 ++-- .../AbstractFlowFileServerProtocol.java | 9 +- .../remote/protocol/FlowFileTransaction.java | 2 +- .../remote/protocol/HandshakeProperties.java | 1 - .../StandardHttpFlowFileServerProtocol.java | 19 +- .../socket/SocketFlowFileServerProtocol.java | 25 +- .../remote/TestHttpRemoteSiteListener.java | 11 +- .../remote/TestStandardRemoteGroupPort.java | 10 +- .../io/socket/TestSocketChannelStreams.java | 2 +- .../io/socket/ssl/TestSSLSocketChannel.java | 12 +- .../http/TestHttpFlowFileServerProtocol.java | 10 +- .../nifi/web/server/JettyServerTest.java | 33 +-- .../nifi/web/api/DataTransferResource.java | 130 +++++---- .../nifi/web/api/SiteToSiteResource.java | 43 ++- .../accesscontrol/AccessControlHelper.java | 10 +- .../accesscontrol/ITAccessTokenEndpoint.java | 5 +- .../nifi/integration/util/NiFiTestServer.java | 19 +- .../web/api/TestDataTransferResource.java | 18 +- .../nifi/web/api/TestSiteToSiteResource.java | 4 +- .../OcspCertificateValidatorGroovyTest.groovy | 12 +- .../NiFiAuthenticationProviderTest.java | 2 +- .../hadoop/AbstractHadoopProcessor.java | 9 +- .../processors/hadoop/AbstractHadoopTest.java | 5 +- .../nifi/processors/hadoop/GetHDFSTest.java | 2 +- .../nifi/processors/hadoop/PutHDFSTest.java | 2 +- .../hadoop/TestCreateHadoopSequenceFile.java | 2 +- .../nifi/processors/hadoop/TestFetchHDFS.java | 2 +- .../nifi/processors/hadoop/TestListHDFS.java | 2 +- .../hadoop/inotify/TestGetHDFSEvents.java | 2 +- .../nifi/dbcp/hive/HiveConnectionPool.java | 29 +- .../processors/hive/PutHiveStreaming.java | 6 +- .../processors/hive/TestPutHiveStreaming.java | 16 +- .../PersistentProvenanceRepository.java | 218 +++++++++------ .../VolatileProvenanceRepository.java | 45 +-- .../TestVolatileProvenanceRepository.java | 7 +- .../script/InvokeScriptedProcessor.java | 115 +++++--- .../nifi/controller/MonitorDiskUsage.java | 62 ++--- .../nifi/controller/MonitorMemoryTest.java | 25 +- .../nifi/hbase/HBase_1_1_2_ClientService.java | 10 +- .../hbase/TestHBase_1_1_2_ClientService.java | 9 +- 120 files changed, 2034 insertions(+), 1573 deletions(-) create mode 100644 nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java index 3486621c7633..f6ac9e768c9a 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java @@ -17,9 +17,10 @@ package org.apache.nifi.controller; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; -public interface ControllerServiceInitializationContext { +public interface ControllerServiceInitializationContext extends KerberosContext { /** * @return the identifier associated with the {@link ControllerService} with diff --git a/nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java b/nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java new file mode 100644 index 000000000000..8f7ba192d90c --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/kerberos/KerberosContext.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kerberos; + +import java.io.File; + +public interface KerberosContext { + + /** + * The Kerberos service principal used by NiFi to communicate with the KDC + * in order to obtain tickets on behalf of NiFi. Typically of the form + * NIFI/fully.qualified.domain@REALM. + * + * @return the principal, or null if this NiFi instance is not configured + * with a NiFi Kerberos service principal + */ + public String getKerberosServicePrincipal(); + + /** + * The File instance for the Kerberos service keytab. The service principal + * and service keytab will be used to communicate with the KDC to obtain + * tickets on behalf of NiFi. + * + * @return the File instance of the service keytab, or null if this NiFi + * instance is not configured with a NiFi Kerberos service keytab + */ + public File getKerberosServiceKeytab(); + + /** + * The Kerberos configuration file (typically krb5.conf) that will be used + * by this JVM during all Kerberos operations. + * + * @return the File instance for the Kerberos configuration file, or null if + * this NiFi instance is not configured with a Kerberos configuration file + */ + public File getKerberosConfigurationFile(); +} diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java index 726b3fa6b321..df1819376a85 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java @@ -18,16 +18,17 @@ import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.NodeTypeProvider; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; /** *

* The ProcessorInitializationContext provides - * {@link org.apache.nifi.processor.Processor Processor}s access to objects that may be of - * use throughout the life of the Processor. + * {@link org.apache.nifi.processor.Processor Processor}s access to objects that + * may be of use throughout the life of the Processor. *

*/ -public interface ProcessorInitializationContext { +public interface ProcessorInitializationContext extends KerberosContext { /** * @return the unique identifier for this processor @@ -47,7 +48,9 @@ public interface ProcessorInitializationContext { ControllerServiceLookup getControllerServiceLookup(); /** - * @return the {@link NodeTypeProvider} which can be used to detect the node type of this NiFi instance. + * @return the {@link NodeTypeProvider} which can be used to detect the node + * type of this NiFi instance. */ NodeTypeProvider getNodeTypeProvider(); + } diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java index d014b26023a3..df64e03493a7 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -26,7 +27,7 @@ * A ReportingConfiguration provides configuration information to a * ReportingTask at the time of initialization */ -public interface ReportingInitializationContext { +public interface ReportingInitializationContext extends KerberosContext { /** * @return the identifier for this ReportingTask diff --git a/nifi-commons/nifi-hadoop-utils/pom.xml b/nifi-commons/nifi-hadoop-utils/pom.xml index 5f184008a006..8de849e99bf5 100644 --- a/nifi-commons/nifi-hadoop-utils/pom.xml +++ b/nifi-commons/nifi-hadoop-utils/pom.xml @@ -34,10 +34,6 @@ org.apache.nifi nifi-processor-utils - - org.apache.nifi - nifi-properties - org.apache.hadoop hadoop-common diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java index 5e8fb7dd889f..c7743f4ef299 100644 --- a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java @@ -23,20 +23,20 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.StringUtils; import java.io.File; import java.util.ArrayList; import java.util.List; /** - * All processors and controller services that need properties for Kerberos Principal and Keytab - * should obtain them through this class by calling: + * All processors and controller services that need properties for Kerberos + * Principal and Keytab should obtain them through this class by calling: * - * KerberosProperties props = KerberosProperties.create(NiFiProperties.getInstance()) + * KerberosProperties props = + * KerberosProperties.create(NiFiProperties.getInstance()) * - * The properties can be accessed from the resulting KerberosProperties instance. + * The properties can be accessed from the resulting KerberosProperties + * instance. */ public class KerberosProperties { @@ -45,7 +45,14 @@ public class KerberosProperties { private final PropertyDescriptor kerberosPrincipal; private final PropertyDescriptor kerberosKeytab; - private KerberosProperties(final File kerberosConfigFile) { + /** + * Instantiate a KerberosProperties object but keep in mind it is + * effectively a singleton because the krb5.conf file needs to be set as a + * system property which this constructor will take care of. + * + * @param kerberosConfigFile file of krb5.conf + */ + public KerberosProperties(final File kerberosConfigFile) { this.kerberosConfigFile = kerberosConfigFile; if (this.kerberosConfigFile != null) { @@ -91,13 +98,6 @@ public ValidationResult validate(String subject, String input, ValidationContext .build(); } - public static KerberosProperties create(final NiFiProperties niFiProperties) { - if (niFiProperties == null) { - throw new IllegalArgumentException("NiFiProperties can not be null"); - } - return new KerberosProperties(niFiProperties.getKerberosConfigurationFile()); - } - public File getKerberosConfigFile() { return kerberosConfigFile; } @@ -120,7 +120,8 @@ public static List validatePrincipalAndKeytab(final String sub // if security is enabled then the keytab and principal are required final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(config); - if (isSecurityEnabled && StringUtils.isBlank(principal)) { + final boolean blankPrincipal = (principal == null || principal.isEmpty()); + if (isSecurityEnabled && blankPrincipal) { results.add(new ValidationResult.Builder() .valid(false) .subject(subject) @@ -128,7 +129,8 @@ public static List validatePrincipalAndKeytab(final String sub .build()); } - if (isSecurityEnabled && StringUtils.isBlank(keytab)) { + final boolean blankKeytab = (keytab == null || keytab.isEmpty()); + if (isSecurityEnabled && blankKeytab) { results.add(new ValidationResult.Builder() .valid(false) .subject(subject) @@ -136,7 +138,7 @@ public static List validatePrincipalAndKeytab(final String sub .build()); } - if (!isSecurityEnabled && (!StringUtils.isBlank(principal) || !StringUtils.isBlank(keytab))) { + if (!isSecurityEnabled && (!blankPrincipal || !blankKeytab)) { logger.warn("Configuration does not have security enabled, Keytab and Principal will be ignored"); } diff --git a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java index 131fe656e903..8cd1ea1d3561 100644 --- a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java +++ b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java @@ -19,7 +19,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.util.NiFiProperties; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -27,17 +26,13 @@ import java.io.File; import java.util.List; -import static org.mockito.Mockito.when; - public class TestKerberosProperties { @Test public void testWithKerberosConfigFile() { final File file = new File("src/test/resources/krb5.conf"); - final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); - when(niFiProperties.getKerberosConfigurationFile()).thenReturn(file); - final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + final KerberosProperties kerberosProperties = new KerberosProperties(file); Assert.assertNotNull(kerberosProperties); Assert.assertNotNull(kerberosProperties.getKerberosConfigFile()); @@ -51,10 +46,9 @@ public void testWithKerberosConfigFile() { @Test public void testWithoutKerberosConfigFile() { - final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); - when(niFiProperties.getKerberosConfigurationFile()).thenReturn(null); + final File file = new File("src/test/resources/krb5.conf"); - final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + final KerberosProperties kerberosProperties = new KerberosProperties(null); Assert.assertNotNull(kerberosProperties); Assert.assertNull(kerberosProperties.getKerberosConfigFile()); diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index bbb3998a626e..d381f740a883 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; @@ -29,17 +26,22 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; - -public class NiFiProperties extends Properties { - - private static final long serialVersionUID = 2119177359005492702L; - - private static final Logger LOG = LoggerFactory.getLogger(NiFiProperties.class); - private static NiFiProperties instance = null; +import java.util.Set; + +/** + * The NiFiProperties class holds all properties which are needed for various + * values to be available at runtime. It is strongly tied to the startup + * properties needed and is often refer to as the 'nifi.properties' file. The + * properties contains keys and values. Great care should be taken in leveraging + * this class or passing it along. It's use should be refactored and minimized + * over time. + */ +public abstract class NiFiProperties { // core properties public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path"; @@ -174,7 +176,6 @@ public class NiFiProperties extends Properties { public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout"; public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node"; - // kerberos properties public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file"; public static final String KERBEROS_SERVICE_PRINCIPAL = "nifi.kerberos.service.principal"; @@ -243,68 +244,20 @@ public class NiFiProperties extends Properties { // Kerberos defaults public static final String DEFAULT_KERBEROS_AUTHENTICATION_EXPIRATION = "12 hours"; - private NiFiProperties() { - super(); - } - - public NiFiProperties copy() { - final NiFiProperties copy = new NiFiProperties(); - copy.putAll(this); - return copy; - } + /** + * Retrieves the property value for the given property key + * + * @param key the key of property value to lookup. + * @return value of property at given key or null if not found + */ + public abstract String getProperty(String key); /** - * Factory method to create an instance of the {@link NiFiProperties}. This - * method employs a standard singleton pattern by caching the instance if it - * was already obtained + * Retrieves all known property keys. * - * @return instance of {@link NiFiProperties} + * @return all known property keys. */ - public static synchronized NiFiProperties getInstance() { - // NOTE: unit tests can set instance to null (with reflection) to effectively create a new singleton. - // changing the below as a check for whether the instance was initialized will break those - // unit tests. - if (null == instance) { - final NiFiProperties suspectInstance = new NiFiProperties(); - final String nfPropertiesFilePath = System - .getProperty(NiFiProperties.PROPERTIES_FILE_PATH); - if (null == nfPropertiesFilePath || nfPropertiesFilePath.trim().length() == 0) { - throw new RuntimeException("Requires a system property called \'" - + NiFiProperties.PROPERTIES_FILE_PATH - + "\' and this is not set or has no value"); - } - final File propertiesFile = new File(nfPropertiesFilePath); - if (!propertiesFile.exists()) { - throw new RuntimeException("Properties file doesn't exist \'" - + propertiesFile.getAbsolutePath() + "\'"); - } - if (!propertiesFile.canRead()) { - throw new RuntimeException("Properties file exists but cannot be read \'" - + propertiesFile.getAbsolutePath() + "\'"); - } - InputStream inStream = null; - try { - inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); - suspectInstance.load(inStream); - } catch (final Exception ex) { - LOG.error("Cannot load properties file due to " + ex.getLocalizedMessage()); - throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); - } finally { - if (null != inStream) { - try { - inStream.close(); - } catch (final Exception ex) { - /** - * do nothing * - */ - } - } - } - instance = suspectInstance; - } - return instance; - } + public abstract Set getPropertyKeys(); // getters for core properties // public File getFlowConfigurationFile() { @@ -395,7 +348,8 @@ public String getAdministrativeYieldDuration() { } /** - * The host name that will be given out to clients to connect to the Remote Input Port. + * The host name that will be given out to clients to connect to the Remote + * Input Port. * * @return the remote input host name or null if not configured */ @@ -443,7 +397,9 @@ public Boolean isSiteToSiteHttpEnabled() { /** * The HTTP or HTTPS Web API port for a Remote Input Port. - * @return the remote input port for HTTP(S) communication, or null if HTTP(S) Site-to-Site is not enabled + * + * @return the remote input port for HTTP(S) communication, or null if + * HTTP(S) Site-to-Site is not enabled */ public Integer getRemoteInputHttpPort() { if (!isSiteToSiteHttpEnabled()) { @@ -479,7 +435,8 @@ public String getFlowServiceWriteDelay() { } /** - * Returns whether the processors should be started automatically when the application loads. + * Returns whether the processors should be started automatically when the + * application loads. * * @return Whether to auto start the processors or not */ @@ -490,7 +447,8 @@ public boolean getAutoResumeState() { } /** - * Returns the number of partitions that should be used for the FlowFile Repository + * Returns the number of partitions that should be used for the FlowFile + * Repository * * @return the number of partitions */ @@ -501,7 +459,8 @@ public int getFlowFileRepositoryPartitions() { } /** - * Returns the number of milliseconds between FlowFileRepository checkpointing + * Returns the number of milliseconds between FlowFileRepository + * checkpointing * * @return the number of milliseconds between checkpoint events */ @@ -608,7 +567,7 @@ public List getNarLibraryDirectories() { List narLibraryPaths = new ArrayList<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (String propertyName : getPropertyKeys()) { // determine if the property is a nar library path if (StringUtils.startsWith(propertyName, NAR_LIBRARY_DIRECTORY_PREFIX) || NAR_LIBRARY_DIRECTORY.equals(propertyName)) { @@ -684,7 +643,6 @@ public File getPersistentStateDirectory() { return file; } - // getters for cluster node properties // public boolean isNode() { return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); @@ -719,7 +677,6 @@ public int getClusterNodeProtocolThreads() { } } - public boolean isClustered() { return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE)); } @@ -779,7 +736,8 @@ public String getKerberosAuthenticationExpiration() { } /** - * Returns true if the Kerberos service principal and keytab location properties are populated. + * Returns true if the Kerberos service principal and keytab location + * properties are populated. * * @return true if Kerberos service support is enabled */ @@ -788,12 +746,14 @@ public boolean isKerberosServiceSupportEnabled() { } /** - * Returns true if client certificates are required for REST API. Determined if the following conditions are all true: + * Returns true if client certificates are required for REST API. Determined + * if the following conditions are all true: * - * - login identity provider is not populated - * - Kerberos service support is not enabled + * - login identity provider is not populated - Kerberos service support is + * not enabled * - * @return true if client certificates are required for access to the REST API + * @return true if client certificates are required for access to the REST + * API */ public boolean isClientAuthRequiredForRestApi() { return StringUtils.isBlank(getProperty(NiFiProperties.SECURITY_USER_LOGIN_IDENTITY_PROVIDER)) && !isKerberosServiceSupportEnabled(); @@ -839,7 +799,8 @@ public InetSocketAddress getNodeApiAddress() { } /** - * Returns the database repository path. It simply returns the value configured. No directories will be created as a result of this operation. + * Returns the database repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. * * @return database repository path * @throws InvalidPathException If the configured path is invalid @@ -849,7 +810,8 @@ public Path getDatabaseRepositoryPath() { } /** - * Returns the flow file repository path. It simply returns the value configured. No directories will be created as a result of this operation. + * Returns the flow file repository path. It simply returns the value + * configured. No directories will be created as a result of this operation. * * @return database repository path * @throws InvalidPathException If the configured path is invalid @@ -859,8 +821,10 @@ public Path getFlowFileRepositoryPath() { } /** - * Returns the content repository paths. This method returns a mapping of file repository name to file repository paths. It simply returns the values configured. No directories will be created as - * a result of this operation. + * Returns the content repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. * * @return file repositories paths * @throws InvalidPathException If any of the configured paths are invalid @@ -869,7 +833,7 @@ public Map getContentRepositoryPaths() { final Map contentRepositoryPaths = new HashMap<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (String propertyName : getPropertyKeys()) { // determine if the property is a file repository path if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) { // get the repository key @@ -884,8 +848,10 @@ public Map getContentRepositoryPaths() { } /** - * Returns the provenance repository paths. This method returns a mapping of file repository name to file repository paths. It simply returns the values configured. No directories will be created - * as a result of this operation. + * Returns the provenance repository paths. This method returns a mapping of + * file repository name to file repository paths. It simply returns the + * values configured. No directories will be created as a result of this + * operation. * * @return the name and paths of all provenance repository locations */ @@ -893,7 +859,7 @@ public Map getProvenanceRepositoryPaths() { final Map provenanceRepositoryPaths = new HashMap<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (String propertyName : getPropertyKeys()) { // determine if the property is a file repository path if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) { // get the repository key @@ -919,17 +885,9 @@ public String getMaxAppendableClaimSize() { return getProperty(MAX_APPENDABLE_CLAIM_SIZE); } - @Override public String getProperty(final String key, final String defaultValue) { - final String value = super.getProperty(key, defaultValue); - if (value == null) { - return null; - } - - if (value.trim().isEmpty()) { - return defaultValue; - } - return value; + final String value = getProperty(key); + return (value == null || value.trim().isEmpty()) ? defaultValue : value; } public String getBoredYieldDuration() { @@ -973,7 +931,7 @@ public String getFlowConfigurationArchiveMaxStorage() { return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE); } - public String getVariableRegistryProperties(){ + public String getVariableRegistryProperties() { return getProperty(VARIABLE_REGISTRY_PROPERTIES); } @@ -981,18 +939,88 @@ public Path[] getVariableRegistryPropertiesPaths() { final List vrPropertiesPaths = new ArrayList<>(); final String vrPropertiesFiles = getVariableRegistryProperties(); - if(!StringUtils.isEmpty(vrPropertiesFiles)) { + if (!StringUtils.isEmpty(vrPropertiesFiles)) { final List vrPropertiesFileList = Arrays.asList(vrPropertiesFiles.split(",")); - for(String propertiesFile : vrPropertiesFileList){ + for (String propertiesFile : vrPropertiesFileList) { vrPropertiesPaths.add(Paths.get(propertiesFile)); } - return vrPropertiesPaths.toArray( new Path[vrPropertiesPaths.size()]); + return vrPropertiesPaths.toArray(new Path[vrPropertiesPaths.size()]); } else { return new Path[]{}; } } + public int size() { + return getPropertyKeys().size(); + } + + /** + * Creates an instance of NiFiProperties. This should likely not be called + * by any classes outside of the NiFi framework but can be useful by the + * framework for default property loading behavior or helpful in tests + * needing to create specific instances of NiFiProperties. If properties + * file specified cannot be found/read a runtime exception will be thrown. + * If one is not specified no properties will be loaded by default. + * + * @param propertiesFilePath if provided properties will be loaded from + * given file; else will be loaded from System property. Can be null. + * @param additionalProperties allows overriding of properties with the + * supplied values. these will be applied after loading from any properties + * file. Can be null or empty. + * @return NiFiProperties + */ + public static NiFiProperties createBasicNiFiProperties(final String propertiesFilePath, final Map additionalProperties) { + final Map addProps = (additionalProperties == null) ? Collections.EMPTY_MAP : additionalProperties; + final Properties properties = new Properties(); + final String nfPropertiesFilePath = (propertiesFilePath == null) + ? System.getProperty(NiFiProperties.PROPERTIES_FILE_PATH) + : propertiesFilePath; + if (nfPropertiesFilePath != null) { + final File propertiesFile = new File(nfPropertiesFilePath.trim()); + if (!propertiesFile.exists()) { + throw new RuntimeException("Properties file doesn't exist \'" + + propertiesFile.getAbsolutePath() + "\'"); + } + if (!propertiesFile.canRead()) { + throw new RuntimeException("Properties file exists but cannot be read \'" + + propertiesFile.getAbsolutePath() + "\'"); + } + InputStream inStream = null; + try { + inStream = new BufferedInputStream(new FileInputStream(propertiesFile)); + properties.load(inStream); + } catch (final Exception ex) { + throw new RuntimeException("Cannot load properties file due to " + + ex.getLocalizedMessage(), ex); + } finally { + if (null != inStream) { + try { + inStream.close(); + } catch (final Exception ex) { + /** + * do nothing * + */ + } + } + } + } + addProps.entrySet().stream().forEach((entry) -> { + properties.setProperty(entry.getKey(), entry.getValue()); + }); + return new NiFiProperties() { + @Override + public String getProperty(String key) { + return properties.getProperty(key); + } + + @Override + public Set getPropertyKeys() { + return properties.stringPropertyNames(); + } + }; + } + } diff --git a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java index 96a618e10ebc..3547b92822d1 100644 --- a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java +++ b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/NiFiPropertiesTest.java @@ -19,10 +19,7 @@ import org.junit.Assert; import org.junit.Test; -import java.io.BufferedInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; import java.net.URISyntaxException; import java.nio.file.Path; import java.util.HashSet; @@ -36,7 +33,7 @@ public class NiFiPropertiesTest { @Test public void testProperties() { - NiFiProperties properties = loadSpecifiedProperties("/NiFiProperties/conf/nifi.properties"); + NiFiProperties properties = loadNiFiProperties("/NiFiProperties/conf/nifi.properties"); assertEquals("UI Banner Text", properties.getBannerText()); @@ -58,7 +55,7 @@ public void testProperties() { @Test public void testMissingProperties() { - NiFiProperties properties = loadSpecifiedProperties("/NiFiProperties/conf/nifi.missing.properties"); + NiFiProperties properties = loadNiFiProperties("/NiFiProperties/conf/nifi.missing.properties"); List directories = properties.getNarLibraryDirectories(); @@ -72,7 +69,7 @@ public void testMissingProperties() { @Test public void testBlankProperties() { - NiFiProperties properties = loadSpecifiedProperties("/NiFiProperties/conf/nifi.blank.properties"); + NiFiProperties properties = loadNiFiProperties("/NiFiProperties/conf/nifi.blank.properties"); List directories = properties.getNarLibraryDirectories(); @@ -83,45 +80,14 @@ public void testBlankProperties() { } - private NiFiProperties loadSpecifiedProperties(String propertiesFile) { - - String filePath; + private NiFiProperties loadNiFiProperties(final String propsPath) { + String realPath = null; try { - filePath = NiFiPropertiesTest.class.getResource(propertiesFile).toURI().getPath(); - } catch (URISyntaxException ex) { - throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); - } - - System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, filePath); - - NiFiProperties properties = NiFiProperties.getInstance(); - - // clear out existing properties - for (String prop : properties.stringPropertyNames()) { - properties.remove(prop); + realPath = NiFiPropertiesTest.class.getResource(propsPath).toURI().getPath(); + } catch (final URISyntaxException ex) { + throw new RuntimeException(ex); } - - InputStream inStream = null; - try { - inStream = new BufferedInputStream(new FileInputStream(filePath)); - properties.load(inStream); - } catch (final Exception ex) { - throw new RuntimeException("Cannot load properties file due to " - + ex.getLocalizedMessage(), ex); - } finally { - if (null != inStream) { - try { - inStream.close(); - } catch (final Exception ex) { - /** - * do nothing * - */ - } - } - } - - return properties; + return NiFiProperties.createBasicNiFiProperties(realPath, null); } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java index f8f4f5995d40..ffa3546e897c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util; +import java.io.File; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; @@ -68,4 +69,19 @@ public ComponentLog getLogger() { public StateManager getStateManager() { return stateManager; } + + @Override + public String getKerberosServicePrincipal() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosServiceKeytab() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosConfigurationFile() { + return null; //this needs to be wired in. + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java index 132869c4094c..e44e7311deb6 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util; +import java.io.File; import java.util.Set; import java.util.UUID; @@ -86,4 +87,19 @@ public boolean isControllerServiceEnabling(final String serviceIdentifier) { public NodeTypeProvider getNodeTypeProvider() { return context; } + + @Override + public String getKerberosServicePrincipal() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosServiceKeytab() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosConfigurationFile() { + return null; //this needs to be wired in. + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java index 0aea00afadec..454b74293656 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util; +import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -86,4 +87,19 @@ public SchedulingStrategy getSchedulingStrategy() { public ComponentLog getLogger() { return logger; } + + @Override + public String getKerberosServicePrincipal() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosServiceKeytab() { + return null; //this needs to be wired in. + } + + @Override + public File getKerberosConfigurationFile() { + return null; //this needs to be wired in. + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java index abbde61191c1..195934b4cd44 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.documentation.mock; +import java.io.File; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceLookup; @@ -49,4 +50,18 @@ public StateManager getStateManager() { return null; } + @Override + public String getKerberosServicePrincipal() { + return null; + } + + @Override + public File getKerberosServiceKeytab() { + return null; + } + + @Override + public File getKerberosConfigurationFile() { + return null; + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java index 3935124be76d..d86ea4c9bd24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.documentation.mock; +import java.io.File; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; @@ -48,4 +49,19 @@ public ControllerServiceLookup getControllerServiceLookup() { public NodeTypeProvider getNodeTypeProvider() { return new MockNodeTypeProvider(); } + + @Override + public String getKerberosServicePrincipal() { + return null; + } + + @Override + public File getKerberosServiceKeytab() { + return null; + } + + @Override + public File getKerberosConfigurationFile() { + return null; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java index ebf59d61dfe6..49d7933a5c64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.documentation.mock; +import java.io.File; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerServiceLookup; @@ -64,4 +65,19 @@ public SchedulingStrategy getSchedulingStrategy() { public ComponentLog getLogger() { return new MockComponentLogger(); } + + @Override + public String getKerberosServicePrincipal() { + return null; + } + + @Override + public File getKerberosServiceKeytab() { + return null; + } + + @Override + public File getKerberosConfigurationFile() { + return null; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java index afc3d50db3c3..b7c4db93c48f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/DocGeneratorTest.java @@ -21,6 +21,8 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Properties; +import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.nifi.nar.ExtensionManager; @@ -38,8 +40,9 @@ public void testProcessorLoadsNarResources() throws IOException, ClassNotFoundEx TemporaryFolder temporaryFolder = new TemporaryFolder(); temporaryFolder.create(); - NiFiProperties properties = loadSpecifiedProperties("/conf/nifi.properties"); - properties.setProperty(NiFiProperties.COMPONENT_DOCS_DIRECTORY, temporaryFolder.getRoot().getAbsolutePath()); + NiFiProperties properties = loadSpecifiedProperties("/conf/nifi.properties", + NiFiProperties.COMPONENT_DOCS_DIRECTORY, + temporaryFolder.getRoot().getAbsolutePath()); NarUnpacker.unpackNars(properties); @@ -60,22 +63,16 @@ public void testProcessorLoadsNarResources() throws IOException, ClassNotFoundEx Assert.assertTrue(generatedHtml.contains("resources")); } - private NiFiProperties loadSpecifiedProperties(String propertiesFile) { + private NiFiProperties loadSpecifiedProperties(final String propertiesFile, final String key, final String value) { String file = DocGeneratorTest.class.getResource(propertiesFile).getFile(); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, file); - NiFiProperties properties = NiFiProperties.getInstance(); - - // clear out existing properties - for (String prop : properties.stringPropertyNames()) { - properties.remove(prop); - } - + final Properties props = new Properties(); InputStream inStream = null; try { inStream = new BufferedInputStream(new FileInputStream(file)); - properties.load(inStream); + props.load(inStream); } catch (final Exception ex) { throw new RuntimeException("Cannot load properties file due to " + ex.getLocalizedMessage(), ex); @@ -91,6 +88,20 @@ private NiFiProperties loadSpecifiedProperties(String propertiesFile) { } } - return properties; + if (key != null && value != null) { + props.setProperty(key, value); + } + + return new NiFiProperties() { + @Override + public String getProperty(String key) { + return props.getProperty(key); + } + + @Override + public Set getPropertyKeys() { + return props.stringPropertyNames(); + } + }; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java index 565e55a559eb..fcedfc82c428 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java @@ -1468,7 +1468,7 @@ private static boolean deleteFile(final File file) { private NiFiProperties getNiFiProperties(final Properties properties) { final NiFiProperties nifiProperties = Mockito.mock(NiFiProperties.class); - when(nifiProperties.stringPropertyNames()).thenReturn(properties.stringPropertyNames()); + when(nifiProperties.getPropertyKeys()).thenReturn(properties.stringPropertyNames()); when(nifiProperties.getProperty(anyString())).then(new Answer() { @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java index 2485db59f859..f7087ac28448 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java @@ -43,7 +43,7 @@ public static List getIdentityMappings(final NiFiProperties pro final List mappings = new ArrayList<>(); // go through each property - for (String propertyName : properties.stringPropertyNames()) { + for (String propertyName : properties.getPropertyKeys()) { if (StringUtils.startsWith(propertyName, NiFiProperties.SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX)) { final String key = StringUtils.substringAfter(propertyName, NiFiProperties.SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX); final String identityPattern = properties.getProperty(propertyName); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java index b9ff0829f0a6..167ddec93284 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java @@ -48,7 +48,6 @@ import org.apache.nifi.io.socket.SocketUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +62,7 @@ * */ public class StandardClusterCoordinationProtocolSender implements ClusterCoordinationProtocolSender { + private static final Logger logger = LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class); private final ProtocolContext protocolContext; @@ -70,10 +70,6 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin private final int maxThreadsPerRequest; private int handshakeTimeoutSeconds; - public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext protocolContext) { - this(socketConfiguration, protocolContext, NiFiProperties.getInstance().getClusterNodeProtocolThreads()); - } - public StandardClusterCoordinationProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext protocolContext, final int maxThreadsPerRequest) { if (socketConfiguration == null) { throw new IllegalArgumentException("Socket configuration may not be null."); @@ -90,7 +86,6 @@ public StandardClusterCoordinationProtocolSender(final SocketConfiguration socke public void setBulletinRepository(final BulletinRepository bulletinRepository) { } - /** * Requests a node to reconnect to the cluster. The configured value for * handshake timeout is applied to the socket before making the request. @@ -158,7 +153,6 @@ public void disconnect(final DisconnectMessage msg) throws ProtocolException { } } - private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout if (handshakeTimeoutSeconds >= 0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index c216ed32c29a..d2db59e31139 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.heartbeat; import org.apache.nifi.cluster.coordination.ClusterCoordinator; @@ -31,7 +30,6 @@ import org.slf4j.LoggerFactory; import java.util.Map; -import java.util.Properties; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -45,11 +43,10 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { private volatile ScheduledFuture future; private volatile boolean stopped = true; - - public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) { + public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final NiFiProperties nifiProperties) { this.clusterCoordinator = clusterCoordinator; - final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, - NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); + final String heartbeatInterval = nifiProperties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, + NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL); this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS); } @@ -118,8 +115,8 @@ protected long getHeartbeatInterval(final TimeUnit timeUnit) { } /** - * Fetches all of the latest heartbeats and updates the Cluster Coordinator as appropriate, - * based on the heartbeats received. + * Fetches all of the latest heartbeats and updates the Cluster Coordinator + * as appropriate, based on the heartbeats received. * * Visible for testing. */ @@ -145,7 +142,7 @@ protected synchronized void monitorHeartbeats() { processHeartbeat(heartbeat); } catch (final Exception e) { clusterCoordinator.reportEvent(null, Severity.ERROR, - "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e); + "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e); logger.error("Failed to process heartbeat from {} due to {}", heartbeat.getNodeIdentifier(), e.toString()); logger.error("", e); } @@ -162,7 +159,7 @@ protected synchronized void monitorHeartbeats() { final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp()); clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT, - "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); + "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); try { removeHeartbeat(heartbeat.getNodeIdentifier()); @@ -201,8 +198,8 @@ private void processHeartbeat(final NodeHeartbeat heartbeat) { if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) { // Cluster Coordinator believes that node is connected, but node does not believe so. clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster," - + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState() - + "). Marking as Disconnected and requesting that Node reconnect to cluster"); + + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState() + + "). Marking as Disconnected and requesting that Node reconnect to cluster"); clusterCoordinator.requestNodeConnect(nodeId, null); return; } @@ -220,7 +217,7 @@ private void processHeartbeat(final NodeHeartbeat heartbeat) { case NOT_YET_CONNECTED: case STARTUP_FAILURE: { clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously " - + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); + + "disconnected due to " + disconnectionCode + ". Issuing reconnection request."); clusterCoordinator.requestNodeConnect(nodeId, null); break; @@ -260,23 +257,25 @@ private void processHeartbeat(final NodeHeartbeat heartbeat) { clusterCoordinator.updateNodeRoles(nodeId, heartbeat.getRoles()); } - /** - * @return the most recent heartbeat information for each node in the cluster + * @return the most recent heartbeat information for each node in the + * cluster */ protected abstract Map getLatestHeartbeats(); /** - * This method does nothing in the abstract class but is meant for subclasses to - * override in order to provide functionality when the monitor is started. + * This method does nothing in the abstract class but is meant for + * subclasses to override in order to provide functionality when the monitor + * is started. */ protected void onStart() { } /** - * This method does nothing in the abstract class but is meant for subclasses to - * override in order to provide functionality when the monitor is stopped. + * This method does nothing in the abstract class but is meant for + * subclasses to override in order to provide functionality when the monitor + * is stopped. */ protected void onStop() { } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index d2d81d1da46e..95b2045e4ec4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.heartbeat; import java.nio.charset.StandardCharsets; @@ -22,7 +21,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -54,10 +52,12 @@ import org.slf4j.LoggerFactory; /** - * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and then relies on the NiFi Cluster - * Protocol to receive heartbeat messages from nodes in the cluster. + * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and + * then relies on the NiFi Cluster Protocol to receive heartbeat messages from + * nodes in the cluster. */ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler { + protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class); private static final String COORDINATOR_ZNODE_NAME = "coordinator"; @@ -81,30 +81,29 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im } } - - public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final ProtocolListener protocolListener, final Properties properties) { - super(clusterCoordinator, properties); + public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final ProtocolListener protocolListener, final NiFiProperties nifiProperties) { + super(clusterCoordinator, nifiProperties); protocolListener.addHandler(this); - this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); + this.zkClientConfig = ZooKeeperClientConfig.createConfig(nifiProperties); this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes"); - String hostname = properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); + String hostname = nifiProperties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); if (hostname == null || hostname.trim().isEmpty()) { hostname = "localhost"; } - final String port = properties.getProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT); + final String port = nifiProperties.getProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT); if (port == null || port.trim().isEmpty()) { throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the '" - + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is not set"); + + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is not set"); } try { Integer.parseInt(port); } catch (final NumberFormatException nfe) { throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the '" - + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is set to '" + port + "', which is not a valid port number."); + + NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT + "' property is set to '" + port + "', which is not a valid port number."); } heartbeatAddress = hostname + ":" + port; @@ -114,12 +113,12 @@ public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinat public void onStart() { final RetryPolicy retryPolicy = new RetryForever(5000); curatorClient = CuratorFrameworkFactory.builder() - .connectString(zkClientConfig.getConnectString()) - .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis()) - .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis()) - .retryPolicy(retryPolicy) - .defaultData(new byte[0]) - .build(); + .connectString(zkClientConfig.getConnectString()) + .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis()) + .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis()) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); curatorClient.start(); // We don't know what the heartbeats look like for the nodes, since we were just elected to monitoring @@ -130,7 +129,7 @@ public void onStart() { heartbeatMessages.clear(); for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) { final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - clusterCoordinator.getConnectionStatus(nodeId), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis()); + clusterCoordinator.getConnectionStatus(nodeId), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis()); heartbeatMessages.put(nodeId, heartbeat); } @@ -199,7 +198,6 @@ protected Set getClusterNodeIds() { return new HashSet<>(clusterNodeIds.values()); } - @Override public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { if (msg.getType() != MessageType.HEARTBEAT) { @@ -220,7 +218,7 @@ public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolExceptio final long systemStartTime = payload.getSystemStartTime(); final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - connectionStatus, roles, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); + connectionStatus, roles, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat); logger.debug("Received new heartbeat from {}", nodeId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java index 784eb2f7a8e1..7b1d8f696798 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.http; import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger; @@ -71,13 +70,25 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; public class StandardHttpResponseMerger implements HttpResponseMerger { + private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class); - private static final List endpointMergers = new ArrayList<>(); - static { + private final List endpointMergers = new ArrayList<>(); + + public StandardHttpResponseMerger(final NiFiProperties nifiProperties) { + final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + long snapshotMillis; + try { + snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); + } endpointMergers.add(new ControllerStatusEndpointMerger()); endpointMergers.add(new ControllerBulletinsEndpointMerger()); endpointMergers.add(new GroupStatusEndpointMerger()); @@ -108,7 +119,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { endpointMergers.add(new ListFlowFilesEndpointMerger()); endpointMergers.add(new ComponentStateEndpointMerger()); endpointMergers.add(new BulletinBoardEndpointMerger()); - endpointMergers.add(new StatusHistoryEndpointMerger()); + endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis)); endpointMergers.add(new SystemDiagnosticsEndpointMerger()); endpointMergers.add(new CountersEndpointMerger()); endpointMergers.add(new FlowMerger()); @@ -122,9 +133,6 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { endpointMergers.add(new FunnelsEndpointMerger()); } - public StandardHttpResponseMerger() { - } - @Override public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set nodeResponses) { if (nodeResponses.size() == 1) { @@ -170,7 +178,6 @@ public NodeResponse mergeResponses(final URI uri, final String httpMethod, final return response; } - @Override public Set getProblematicNodeResponses(final Set allResponses) { // Check if there are any 2xx responses @@ -190,7 +197,7 @@ public boolean isResponseInterpreted(final URI uri, final String httpMethod) { return getEndpointResponseMerger(uri, httpMethod) != null; } - private static EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) { + private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) { return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null); } @@ -198,13 +205,12 @@ private boolean hasSuccessfulResponse(final Set allResponses) { return allResponses.stream().anyMatch(p -> p.is2xx()); } - private void drainResponses(final Set responses, final NodeResponse exclude) { responses.stream() - .parallel() // parallelize the draining of the responses, since we have multiple streams to consume - .filter(response -> response != exclude) // don't include the explicitly excluded node - .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content - .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out + .parallel() // parallelize the draining of the responses, since we have multiple streams to consume + .filter(response -> response != exclude) // don't include the explicitly excluded node + .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content + .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out } private void drainResponse(final NodeResponse response) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java index 507c1fb9ad56..ddd8759fa114 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.http.endpoints; import java.net.URI; @@ -26,7 +25,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; @@ -40,14 +38,13 @@ import org.apache.nifi.controller.status.history.StandardStatusSnapshot; import org.apache.nifi.controller.status.history.StatusHistoryUtil; import org.apache.nifi.controller.status.history.StatusSnapshot; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; import org.apache.nifi.web.api.entity.StatusHistoryEntity; public class StatusHistoryEndpointMerger implements EndpointResponseMerger { + public static final Pattern PROCESSOR_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/processors/[a-f0-9\\-]{36}/status/history"); public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history"); public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history"); @@ -55,17 +52,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { private final long componentStatusSnapshotMillis; - - public StatusHistoryEndpointMerger() { - final NiFiProperties properties = NiFiProperties.getInstance(); - final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); - long snapshotMillis; - try { - snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); - } - componentStatusSnapshotMillis = snapshotMillis; + public StatusHistoryEndpointMerger(final long componentStatusSnapshotMillis) { + this.componentStatusSnapshotMillis = componentStatusSnapshotMillis; } private Map> getMetricDescriptors(final URI uri) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 07cefbbaf7c8..bd4e277804b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -75,6 +75,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.nifi.util.NiFiProperties; public class ThreadPoolRequestReplicator implements RequestReplicator { @@ -107,10 +108,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. + * @param nifiProperties properties */ public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, - final RequestCompletionCallback callback, final EventReporter eventReporter) { - this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter); + final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { + this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); } /** @@ -123,9 +125,11 @@ public ThreadPoolRequestReplicator(final int numThreads, final Client client, fi * @param readTimeout the read timeout specified in milliseconds * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. + * @param nifiProperties properties */ public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator, - final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, final EventReporter eventReporter) { + final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, + final EventReporter eventReporter, final NiFiProperties nifiProperties) { if (numThreads <= 0) { throw new IllegalArgumentException("The number of threads must be greater than zero."); } else if (client == null) { @@ -136,7 +140,7 @@ public ThreadPoolRequestReplicator(final int numThreads, final Client client, fi this.clusterCoordinator = clusterCoordinator; this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); - this.responseMerger = new StandardHttpResponseMerger(); + this.responseMerger = new StandardHttpResponseMerger(nifiProperties); this.eventReporter = eventReporter; this.callback = callback; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java index daa3e5c8121b..18474615a556 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java @@ -14,13 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.node; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; -import java.util.Properties; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -33,6 +31,7 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.util.NiFiProperties; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -40,19 +39,20 @@ import org.slf4j.LoggerFactory; /** - * Uses Apache Curator to determine the address of the current cluster coordinator + * Uses Apache Curator to determine the address of the current cluster + * coordinator */ public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender { + private static final Logger logger = LoggerFactory.getLogger(CuratorNodeProtocolSender.class); private final String coordinatorPath; private final ZooKeeperClientConfig zkConfig; private InetSocketAddress coordinatorAddress; - - public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext protocolContext, final Properties properties) { + public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext protocolContext, final NiFiProperties nifiProperties) { super(socketConfig, protocolContext); - zkConfig = ZooKeeperClientConfig.createConfig(properties); + zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties); coordinatorPath = zkConfig.resolvePath("cluster/nodes/coordinator"); } @@ -64,7 +64,7 @@ protected synchronized InetSocketAddress getServiceAddress() throws IOException final RetryPolicy retryPolicy = new RetryNTimes(0, 0); final CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); + zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); curatorClient.start(); try { @@ -85,7 +85,7 @@ public void process(final WatchedEvent event) { final String[] splits = address.split(":"); if (splits.length != 2) { final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates " - + "that address is %s, but this is not in the expected format of :", address); + + "that address is %s, but this is not in the expected format of :", address); logger.error(message); throw new ProtocolException(message); } @@ -101,7 +101,7 @@ public void process(final WatchedEvent event) { } } catch (final NumberFormatException nfe) { final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates " - + "that address is %s, but the port is not a valid port number", address); + + "that address is %s, but the port is not a valid port number", address); logger.error(message); throw new ProtocolException(message); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index ac39dc58b07c..70338c1e37de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.node; import java.io.IOException; @@ -25,7 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -72,6 +70,7 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; import org.apache.nifi.services.FlowService; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.revision.RevisionManager; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -80,6 +79,7 @@ import org.slf4j.LoggerFactory; public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback { + private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class); private static final String EVENT_CATEGORY = "Clustering"; @@ -97,6 +97,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final CuratorFramework curatorClient; private final String nodesPathPrefix; private final String coordinatorPath; + private final NiFiProperties nifiProperties; private volatile FlowService flowService; private volatile boolean connected; @@ -107,18 +108,19 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final ConcurrentMap> nodeEvents = new ConcurrentHashMap<>(); public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, - final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final Properties nifiProperties) { + final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties) { this.senderListener = senderListener; this.flowService = null; this.eventReporter = eventReporter; this.firewall = firewall; this.revisionManager = revisionManager; + this.nifiProperties = nifiProperties; final RetryPolicy retryPolicy = new RetryNTimes(10, 500); final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties); curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); + zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); curatorClient.start(); nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); @@ -226,12 +228,15 @@ public void resetNodeStatuses(final Map st } /** - * Attempts to update the nodeStatuses map by changing the value for the given node id from the current status to the new status, as in - * ConcurrentMap.replace(nodeId, currentStatus, newStatus) but with the difference that this method can handle a null value - * for currentStatus + * Attempts to update the nodeStatuses map by changing the value for the + * given node id from the current status to the new status, as in + * ConcurrentMap.replace(nodeId, currentStatus, newStatus) but with the + * difference that this method can handle a null value for + * currentStatus * * @param nodeId the node id - * @param currentStatus the current status, or null if there is no value currently + * @param currentStatus the current status, or null if there is + * no value currently * @param newStatus the new status to set * @return true if the map was updated, false otherwise */ @@ -296,7 +301,6 @@ public void finishNodeConnection(final NodeIdentifier nodeId) { updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, getRoles(nodeId))); } - @Override public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { final Set connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED); @@ -363,7 +367,6 @@ private NodeConnectionState getConnectionState(final NodeIdentifier nodeId) { return status == null ? null : status.getState(); } - @Override public Map> getConnectionStates() { final Map> connectionStates = new HashMap<>(); @@ -521,18 +524,18 @@ public Set getNodeIdentifiers(final NodeConnectionState... state } return nodeStatuses.entrySet().stream() - .filter(entry -> statesOfInterest.contains(entry.getValue().getState())) - .map(entry -> entry.getKey()) - .collect(Collectors.toSet()); + .filter(entry -> statesOfInterest.contains(entry.getValue().getState())) + .map(entry -> entry.getKey()) + .collect(Collectors.toSet()); } @Override public NodeIdentifier getPrimaryNode() { return nodeStatuses.values().stream() - .filter(status -> status.getRoles().contains(ClusterRoles.PRIMARY_NODE)) - .findFirst() - .map(status -> status.getNodeIdentifier()) - .orElse(null); + .filter(status -> status.getRoles().contains(ClusterRoles.PRIMARY_NODE)) + .findFirst() + .map(status -> status.getNodeIdentifier()) + .orElse(null); } @Override @@ -582,13 +585,13 @@ private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError final Set connectedNodeIds = getNodeIdentifiers(); final NodeIdentifier electedNodeId = connectedNodeIds.stream() - .filter(nodeId -> nodeId.getSocketAddress().equals(electedNodeHostname) && nodeId.getSocketPort() == electedNodePort) - .findFirst() - .orElse(null); + .filter(nodeId -> nodeId.getSocketAddress().equals(electedNodeHostname) && nodeId.getSocketPort() == electedNodePort) + .findFirst() + .orElse(null); if (electedNodeId == null && warnOnError) { logger.debug("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}," - + "but there is no node with this address. Will attempt to communicate with node to determine its information", electedNodeAddress); + + "but there is no node with this address. Will attempt to communicate with node to determine its information", electedNodeAddress); try { final NodeConnectionStatus connectionStatus = senderListener.requestNodeConnectionStatus(electedNodeHostname, electedNodePort); @@ -606,7 +609,7 @@ private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError } } catch (final Exception e) { logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address. " - + "Attempted to determine the node's information but failed to retrieve its information due to {}", electedNodeAddress, e.toString()); + + "Attempted to determine the node's information but failed to retrieve its information due to {}", electedNodeAddress, e.toString()); if (logger.isDebugEnabled()) { logger.warn("", e); @@ -656,8 +659,9 @@ private void addNodeEvent(final NodeIdentifier nodeId, final Severity severity, } /** - * Updates the status of the node with the given ID to the given status and returns true - * if successful, false if no node exists with the given ID + * Updates the status of the node with the given ID to the given status and + * returns true if successful, false if no node + * exists with the given ID * * @param status the new status of the node */ @@ -705,7 +709,8 @@ void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) { * Notifies other nodes that the status of a node changed * * @param updatedStatus the updated status for a node in the cluster - * @param notifyAllNodes if true will notify all nodes. If false, will notify only the cluster coordinator + * @param notifyAllNodes if true will notify all nodes. If + * false, will notify only the cluster coordinator */ void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes, final boolean waitForCoordinator) { // If this node is the active cluster coordinator, then we are going to replicate to all nodes. @@ -770,7 +775,7 @@ public void run() { Thread.sleep(100L); } catch (final InterruptedException ie) { logger.info("Could not send Reconnection request to {} because thread was " - + "interrupted before FlowService was made available", request.getNodeId()); + + "interrupted before FlowService was made available", request.getNodeId()); Thread.currentThread().interrupt(); return; } @@ -797,7 +802,7 @@ public void run() { } catch (final Exception e) { logger.warn("Problem encountered issuing reconnection request to node " + request.getNodeId(), e); eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Problem encountered issuing reconnection request to node " - + request.getNodeId() + " due to: " + e); + + request.getNodeId() + " due to: " + e); } try { @@ -810,7 +815,7 @@ public void run() { // We failed to reconnect too many times. We must now mark node as disconnected. if (NodeConnectionState.CONNECTING == getConnectionState(request.getNodeId())) { requestNodeDisconnect(request.getNodeId(), DisconnectionCode.UNABLE_TO_COMMUNICATE, - "Attempted to request that node reconnect to cluster but could not communicate with node"); + "Attempted to request that node reconnect to cluster but could not communicate with node"); } } }, "Reconnect " + request.getNodeId()); @@ -944,10 +949,10 @@ private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) { } else { // there is a node with that ID and it's a different node resolvedNodeId = new NodeIdentifier(UUID.randomUUID().toString(), proposedIdentifier.getApiAddress(), proposedIdentifier.getApiPort(), - proposedIdentifier.getSocketAddress(), proposedIdentifier.getSocketPort(), proposedIdentifier.getSiteToSiteAddress(), - proposedIdentifier.getSiteToSitePort(), proposedIdentifier.getSiteToSiteHttpApiPort(), proposedIdentifier.isSiteToSiteSecure()); + proposedIdentifier.getSocketAddress(), proposedIdentifier.getSocketPort(), proposedIdentifier.getSiteToSiteAddress(), + proposedIdentifier.getSiteToSitePort(), proposedIdentifier.getSiteToSiteHttpApiPort(), proposedIdentifier.isSiteToSiteSecure()); logger.debug("A node already exists with ID {}. Proposed Node Identifier was {}; existing Node Identifier is {}; Resolved Node Identifier is {}", - proposedIdentifier.getId(), proposedIdentifier, getNodeIdentifier(proposedIdentifier.getId()), resolvedNodeId); + proposedIdentifier.getId(), proposedIdentifier, getNodeIdentifier(proposedIdentifier.getId()), resolvedNodeId); } return resolvedNodeId; @@ -989,7 +994,7 @@ private ConnectionResponse createConnectionResponse(final ConnectionRequest requ dataFlow = flowService.createDataFlow(); } catch (final IOException ioe) { logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to " - + resolvedNodeIdentifier + ". Will tell node to try again later", ioe); + + resolvedNodeIdentifier + ". Will tell node to try again later", ioe); } } @@ -998,37 +1003,37 @@ private ConnectionResponse createConnectionResponse(final ConnectionRequest requ // the flow management service a chance to retrieve a current flow final int tryAgainSeconds = 5; addNodeEvent(resolvedNodeIdentifier, Severity.WARNING, "Connection requested from node, but manager was unable to obtain current flow. " - + "Instructing node to try again in " + tryAgainSeconds + " seconds."); + + "Instructing node to try again in " + tryAgainSeconds + " seconds."); // return try later response return new ConnectionResponse(tryAgainSeconds); } return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, instanceId, new ArrayList<>(nodeStatuses.values()), - revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); + revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); } private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(), - nodeId.getSocketAddress(), nodeId.getSocketPort(), - nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), - nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), dn); + nodeId.getSocketAddress(), nodeId.getSocketPort(), + nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), + nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), dn); } @Override public boolean canHandle(final ProtocolMessage msg) { return MessageType.CONNECTION_REQUEST == msg.getType() || MessageType.NODE_STATUS_CHANGE == msg.getType() - || MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType(); + || MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType(); } private boolean isMutableRequest(final String method) { return "DELETE".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method); } - /** - * Callback that is called after an HTTP Request has been replicated to nodes in the cluster. - * This allows us to disconnect nodes that did not complete the request, if applicable. + * Callback that is called after an HTTP Request has been replicated to + * nodes in the cluster. This allows us to disconnect nodes that did not + * complete the request, if applicable. */ @Override public void afterRequest(final String uriPath, final String method, final Set nodeResponses) { @@ -1047,7 +1052,7 @@ public void afterRequest(final String uriPath, final String method, final Set problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses); // all nodes failed @@ -1055,7 +1060,7 @@ public void afterRequest(final String uriPath, final String method, final Set problematicNodeResponses, final String uriPath) { if (COUNTER_URI_PATTERN.matcher(uriPath).matches()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java index fc0eaf2658c7..31c3b1d43309 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java @@ -33,24 +33,24 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean, ApplicationContextAware { private ApplicationContext applicationContext; - private NiFiProperties properties; + private NiFiProperties nifiProperties; private ThreadPoolRequestReplicator replicator = null; @Override public ThreadPoolRequestReplicator getObject() throws Exception { - if (replicator == null && properties.isNode()) { + if (replicator == null && nifiProperties.isNode()) { final EventReporter eventReporter = applicationContext.getBean("eventReporter", EventReporter.class); final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class); final RequestCompletionCallback requestCompletionCallback = applicationContext.getBean("clusterCoordinator", RequestCompletionCallback.class); - final int numThreads = properties.getClusterNodeProtocolThreads(); - final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties)); - final String connectionTimeout = properties.getClusterNodeConnectionTimeout(); - final String readTimeout = properties.getClusterNodeReadTimeout(); + final int numThreads = nifiProperties.getClusterNodeProtocolThreads(); + final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties)); + final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout(); + final String readTimeout = nifiProperties.getClusterNodeReadTimeout(); replicator = new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator, - connectionTimeout, readTimeout, requestCompletionCallback, eventReporter); + connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties); } return replicator; @@ -71,8 +71,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws this.applicationContext = applicationContext; } - public void setProperties(NiFiProperties properties) { - this.properties = properties; + public void setProperties(final NiFiProperties properties) { + this.nifiProperties = properties; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 5086dc004dfc..2307538e4395 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -27,7 +27,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -52,6 +51,7 @@ public class TestAbstractHeartbeatMonitor { @Before public void setup() throws Exception { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties"); nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, null, false); } @@ -179,10 +179,10 @@ private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coor return monitor; } - private Properties createProperties() { - final Properties properties = new Properties(); - properties.setProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, "10 ms"); - return properties; + private NiFiProperties createProperties() { + final Map addProps = new HashMap<>(); + addProps.put(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, "10 ms"); + return NiFiProperties.createBasicNiFiProperties(null, addProps); } private static class ClusterCoordinatorAdapter implements ClusterCoordinator { @@ -328,8 +328,8 @@ private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonit private Map heartbeats = new HashMap<>(); private final Object mutex = new Object(); - public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, Properties properties) { - super(clusterCoordinator, properties); + public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, NiFiProperties nifiProperties) { + super(clusterCoordinator, nifiProperties); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy index bd9b265527d3..03aa08ad0d60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy @@ -46,7 +46,8 @@ import spock.lang.Unroll class StandardHttpResponseMergerSpec extends Specification { def setup() { - System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties" + def propFile = StandardHttpResponseMergerSpec.class.getResource("/conf/nifi.properties").getFile() + System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile } def cleanup() { @@ -55,7 +56,7 @@ class StandardHttpResponseMergerSpec extends Specification { def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() { given: - def responseMerger = new StandardHttpResponseMerger() + def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null)) def requestUri = new URI('http://server/resource') def requestId = UUID.randomUUID().toString() def Map> mockToRequestEntity = [:] @@ -93,7 +94,7 @@ class StandardHttpResponseMergerSpec extends Specification { mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); and: "setup of the data to be used in the test" - def responseMerger = new StandardHttpResponseMerger() + def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null)) def requestUri = new URI("http://server/$requestUriPart") def requestId = UUID.randomUUID().toString() def Map mockToRequestEntity = [:] diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy index 69dd82a9a501..350269d7f614 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy @@ -32,7 +32,8 @@ import spock.lang.Unroll class StatusHistoryEndpointMergerSpec extends Specification { def setup() { - System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties" + def propFile = StatusHistoryEndpointMergerSpec.class.getResource("/conf/nifi.properties").getFile() + System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile } def cleanup() { @@ -48,7 +49,7 @@ class StatusHistoryEndpointMergerSpec extends Specification { mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); and: "setup of the data to be used in the test" - def merger = new StatusHistoryEndpointMerger() + def merger = new StatusHistoryEndpointMerger(2) def requestUri = new URI("http://server/$requestUriPart") def requestId = UUID.randomUUID().toString() def Map mockToRequestEntity = [:] diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 5eac84616caa..ebefce2eb7c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.http.replication; import static org.junit.Assert.assertEquals; @@ -75,8 +74,9 @@ public static void setupClass() { } /** - * If we replicate a request, whenever we obtain the merged response from the AsyncClusterResponse object, - * the response should no longer be available and should be cleared from internal state. This test is to + * If we replicate a request, whenever we obtain the merged response from + * the AsyncClusterResponse object, the response should no longer be + * available and should be cleared from internal state. This test is to * verify that this behavior occurs. */ @Test @@ -105,7 +105,6 @@ public void testResponseRemovedWhenCompletedAndFetched() { }); } - @Test(timeout = 15000) public void testLongWaitForResponse() { withReplicator(replicator -> { @@ -132,7 +131,7 @@ public void testLongWaitForResponse() { assertTrue(response.isComplete()); assertNotNull(response.getMergedResponse()); assertNull(replicator.getClusterResponse(response.getRequestIdentifier())); - } , Status.OK, 1000, new ClientHandlerException(new SocketTimeoutException())); + }, Status.OK, 1000, new ClientHandlerException(new SocketTimeoutException())); } @Test(timeout = 15000) @@ -153,10 +152,9 @@ public void testCompleteOnError() { final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS)); - } , null, 0L, new IllegalArgumentException("Exception created for unit test")); + }, null, 0L, new IllegalArgumentException("Exception created for unit test")); } - @Test(timeout = 15000) public void testMultipleRequestWithTwoPhaseCommit() { final Set nodeIds = new HashSet<>(); @@ -167,7 +165,8 @@ public void testMultipleRequestWithTwoPhaseCommit() { Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet())); final AtomicInteger requestCount = new AtomicInteger(0); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { + final ThreadPoolRequestReplicator replicator + = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. @@ -191,7 +190,7 @@ protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilde try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); clusterResponse.awaitMergedResponse(); // Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not. @@ -233,7 +232,8 @@ public void testMutableRequestRequiresAllNodesConnected() throws URISyntaxExcept nodeMap.put(NodeConnectionState.CONNECTING, otherState); Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { + final ThreadPoolRequestReplicator replicator + = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated, boolean verify) { @@ -278,7 +278,6 @@ public AsyncClusterResponse replicate(Set nodeIds, String method } } - @Test(timeout = 15000) public void testOneNodeRejectsTwoPhaseCommit() { final Set nodeIds = new HashSet<>(); @@ -287,7 +286,8 @@ public void testOneNodeRejectsTwoPhaseCommit() { final ClusterCoordinator coordinator = createClusterCoordinator(); final AtomicInteger requestCount = new AtomicInteger(0); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { + final ThreadPoolRequestReplicator replicator + = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. @@ -309,7 +309,7 @@ protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilde try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); clusterResponse.awaitMergedResponse(); Assert.fail("Expected to get an IllegalClusterStateException but did not"); @@ -322,15 +322,14 @@ protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilde } } - - private void withReplicator(final WithReplicator function) { withReplicator(function, ClientResponse.Status.OK, 0L, null); } private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure) { final ClusterCoordinator coordinator = createClusterCoordinator(); - final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { + final ThreadPoolRequestReplicator replicator + = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { if (delayMillis > 0L) { @@ -362,6 +361,7 @@ protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilde } private interface WithReplicator { + void withReplicator(ThreadPoolRequestReplicator replicator) throws Exception; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 2f034b34b81b..aaa9dca3290b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.cluster.coordination.node; import static org.junit.Assert.assertEquals; @@ -27,9 +26,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -47,6 +46,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.events.EventReporter; import org.apache.nifi.services.FlowService; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.revision.RevisionManager; import org.junit.Assert; import org.junit.Before; @@ -56,18 +56,21 @@ import org.mockito.stubbing.Answer; public class TestNodeClusterCoordinator { + private NodeClusterCoordinator coordinator; private ClusterCoordinationProtocolSenderListener senderListener; private List nodeStatuses; - private Properties createProperties() { - final Properties props = new Properties(); - props.put("nifi.zookeeper.connect.string", "localhost:2181"); - return props; + private NiFiProperties createProperties() { + final Map addProps = new HashMap<>(); + addProps.put("nifi.zookeeper.connect.string", "localhost:2181"); + return NiFiProperties.createBasicNiFiProperties(null, addProps); } @Before public void setup() throws IOException { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties"); + senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class); nodeStatuses = Collections.synchronizedList(new ArrayList<>()); @@ -113,7 +116,7 @@ public void testConnectionResponseIndicatesAllNodes() throws IOException { assertNotNull(statuses); assertEquals(6, statuses.size()); final Map statusMap = statuses.stream().collect( - Collectors.toMap(status -> status.getNodeIdentifier(), status -> status)); + Collectors.toMap(status -> status.getNodeIdentifier(), status -> status)); assertEquals(DisconnectionCode.LACK_OF_HEARTBEAT, statusMap.get(createNodeId(1)).getDisconnectCode()); assertEquals(NodeConnectionState.DISCONNECTING, statusMap.get(createNodeId(2)).getState()); @@ -258,7 +261,6 @@ public void testStatusChangesReplicated() throws InterruptedException, IOExcepti assertEquals("Unit Test", statusChange.getDisconnectReason()); } - @Test public void testGetConnectionStates() throws IOException { // Add a disconnected node @@ -316,7 +318,6 @@ public void testGetNodeIdentifiers() throws IOException { assertTrue(disconnectedIds.contains(createNodeId(1))); } - @Test(timeout = 5000) public void testRequestNodeDisconnect() throws InterruptedException { // Add a connected node @@ -341,7 +342,6 @@ public void testRequestNodeDisconnect() throws InterruptedException { assertEquals(NodeConnectionState.DISCONNECTED, status.getState()); } - @Test(timeout = 5000) public void testCannotDisconnectLastNode() throws InterruptedException { // Add a connected node @@ -369,7 +369,6 @@ public void testCannotDisconnectLastNode() throws InterruptedException { coordinator.requestNodeDisconnect(nodeId2, DisconnectionCode.USER_DISCONNECTED, "Unit Test"); } - @Test(timeout = 5000) public void testUpdateNodeStatusOutOfOrder() throws InterruptedException { // Add a connected node @@ -386,7 +385,7 @@ public void testUpdateNodeStatusOutOfOrder() throws InterruptedException { nodeStatuses.clear(); final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED, - DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null); + DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null); final NodeStatusChangeMessage msg = new NodeStatusChangeMessage(); msg.setNodeId(nodeId1); msg.setNodeConnectionStatus(oldStatus); @@ -452,7 +451,6 @@ public void testUpdateNodeRoles() throws InterruptedException { assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getRoles()); } - @Test public void testProposedIdentifierResolvedIfConflict() { final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java index dbd8c004113b..58096250e533 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java @@ -18,7 +18,9 @@ package org.apache.nifi.cluster.integration; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -91,11 +93,11 @@ public Set getNodes() { public Node createNode() { - NiFiProperties.getInstance().setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); - NiFiProperties.getInstance().setProperty(NiFiProperties.CLUSTER_IS_NODE, "true"); + final Map addProps = new HashMap<>(); + addProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); + addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true"); - final NiFiProperties properties = NiFiProperties.getInstance().copy(); - final Node node = new Node(properties); + final Node node = new Node(NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps)); node.start(); nodes.add(node); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index 5bfe83c6383b..899f3122a69b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -21,6 +21,7 @@ import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; @@ -90,10 +91,26 @@ public Node(final NiFiProperties properties) { public Node(final NodeIdentifier nodeId, final NiFiProperties properties) { this.nodeId = nodeId; - this.nodeProperties = properties; + this.nodeProperties = new NiFiProperties() { + @Override + public String getProperty(String key) { + if(key.equals(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT)){ + return String.valueOf(nodeId.getSocketPort()); + }else if(key.equals(NiFiProperties.WEB_HTTP_PORT)){ + return String.valueOf(nodeId.getApiPort()); + }else { + return properties.getProperty(key); + } + } - nodeProperties.setProperty(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT, String.valueOf(nodeId.getSocketPort())); - nodeProperties.setProperty(NiFiProperties.WEB_HTTP_PORT, String.valueOf(nodeId.getApiPort())); + @Override + public Set getPropertyKeys() { + final Set keys = new HashSet<>(properties.getPropertyKeys()); + keys.add(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT); + keys.add(NiFiProperties.WEB_HTTP_PORT); + return keys; + } + }; revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections. emptyList()); @@ -110,7 +127,7 @@ public synchronized void start() { final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor(); flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties, - null, null, StringEncryptor.createEncryptor(), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY); + null, null, StringEncryptor.createEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator, heartbeatMonitor, VariableRegistry.EMPTY_REGISTRY); try { flowController.initializeFlow(); @@ -123,7 +140,7 @@ public synchronized void start() { flowController.getStateManagerProvider().getStateManager("Cluster Node Configuration").setState(Collections.singletonMap("Node UUID", nodeId.getId()), Scope.LOCAL); flowService = StandardFlowService.createClusteredInstance(flowController, nodeProperties, senderListener, clusterCoordinator, - StringEncryptor.createEncryptor(), revisionManager, Mockito.mock(Authorizer.class)); + StringEncryptor.createEncryptor(nodeProperties), revisionManager, Mockito.mock(Authorizer.class)); flowService.start(); flowService.load(null); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java index 4f860016c5dd..43660688c722 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java @@ -134,10 +134,20 @@ public interface ServerProtocol extends VersionedRemoteResource { * * @param peer peer * @param clusterNodeInfo the cluster information + * @param remoteInputHost the remote input host + * @param remoteInputPort the remote input port + * @param remoteInputHttpPort the remote input http port + * @param isSiteToSiteSecure whether site to site is secure * * @throws java.io.IOException ioe */ - void sendPeerList(Peer peer, Optional clusterNodeInfo) throws IOException; + void sendPeerList( + Peer peer, + Optional clusterNodeInfo, + String remoteInputHost, + int remoteInputPort, + int remoteInputHttpPort, + boolean isSiteToSiteSecure) throws IOException; void shutdown(Peer peer); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 5ad9a3c482c4..2a0f0dec102e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -40,7 +40,6 @@ import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRepository; -import org.apache.nifi.util.NiFiProperties; import java.util.ArrayList; import java.util.Collection; @@ -55,7 +54,9 @@ import java.util.stream.Collectors; /** - * Models a connection between connectable components. A connection may contain one or more relationships that map the source component to the destination component. + * Models a connection between connectable components. A connection may contain + * one or more relationships that map the source component to the destination + * component. */ public final class StandardConnection implements Connection { @@ -82,7 +83,7 @@ private StandardConnection(final Builder builder) { relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager, - scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); + scheduler, builder.swapManager, builder.eventReporter, builder.queueSwapThreshold); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -307,8 +308,10 @@ public String toString() { } /** - * Gives this Connection ownership of the given FlowFile and allows the Connection to hold on to the FlowFile but NOT provide the FlowFile to consumers. This allows us to ensure that the - * Connection is not deleted during the middle of a Session commit. + * Gives this Connection ownership of the given FlowFile and allows the + * Connection to hold on to the FlowFile but NOT provide the FlowFile to + * consumers. This allows us to ensure that the Connection is not deleted + * during the middle of a Session commit. * * @param flowFile to add */ @@ -338,6 +341,7 @@ public static class Builder { private FlowFileRepository flowFileRepository; private ProvenanceEventRepository provenanceRepository; private ResourceClaimManager resourceClaimManager; + private int queueSwapThreshold; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -409,6 +413,11 @@ public Builder resourceClaimManager(final ResourceClaimManager resourceClaimMana return this; } + public Builder queueSwapThreshold(final int queueSwapThreshold) { + this.queueSwapThreshold = queueSwapThreshold; + return this; + } + public StandardConnection build() { if (source == null) { throw new IllegalStateException("Cannot build a Connection without a Source"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index a4c267c1db34..a61d7fe96704 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -63,7 +63,8 @@ /** *

- * An implementation of the {@link FlowFileSwapManager} that swaps FlowFiles to/from local disk + * An implementation of the {@link FlowFileSwapManager} that swaps FlowFiles + * to/from local disk *

*/ public class FileSystemSwapManager implements FlowFileSwapManager { @@ -83,9 +84,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private EventReporter eventReporter; private ResourceClaimManager claimManager; + /** + * Default no args constructor for service loading only. + */ public FileSystemSwapManager() { - final NiFiProperties properties = NiFiProperties.getInstance(); - final Path flowFileRepoPath = properties.getFlowFileRepositoryPath(); + storageDirectory = null; + } + + public FileSystemSwapManager(final NiFiProperties nifiProperties) { + final Path flowFileRepoPath = nifiProperties.getFlowFileRepositoryPath(); this.storageDirectory = flowFileRepoPath.resolve("swap").toFile(); if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { @@ -93,7 +100,6 @@ public FileSystemSwapManager() { } } - @Override public synchronized void initialize(final SwapManagerInitializationContext initializationContext) { this.claimManager = initializationContext.getResourceClaimManager(); @@ -129,7 +135,6 @@ public String swapOut(final List toSwap, final FlowFileQueue flo return swapLocation; } - @Override public SwapContents swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { final File swapFile = new File(swapLocation); @@ -152,15 +157,14 @@ public SwapContents peek(final String swapLocation, final FlowFileQueue flowFile final SwapContents swapContents; try (final InputStream fis = new FileInputStream(swapFile); - final InputStream bis = new BufferedInputStream(fis); - final DataInputStream in = new DataInputStream(bis)) { + final InputStream bis = new BufferedInputStream(fis); + final DataInputStream in = new DataInputStream(bis)) { swapContents = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager); } return swapContents; } - @Override public void purge() { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { @@ -177,7 +181,6 @@ public boolean accept(final File dir, final String name) { } } - @Override public List recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { @@ -217,13 +220,13 @@ public boolean accept(final File dir, final String name) { // Read the queue identifier from the swap file to check if the swap file is for this queue try (final InputStream fis = new FileInputStream(swapFile); - final InputStream bufferedIn = new BufferedInputStream(fis); - final DataInputStream in = new DataInputStream(bufferedIn)) { + final InputStream bufferedIn = new BufferedInputStream(fis); + final DataInputStream in = new DataInputStream(bufferedIn)) { final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; + + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); throw new IOException(errMsg); @@ -246,13 +249,13 @@ public SwapSummary getSwapSummary(final String swapLocation) throws IOException // read record from disk via the swap file try (final InputStream fis = new FileInputStream(swapFile); - final InputStream bufferedIn = new BufferedInputStream(fis); - final DataInputStream in = new DataInputStream(bufferedIn)) { + final InputStream bufferedIn = new BufferedInputStream(fis); + final DataInputStream in = new DataInputStream(bufferedIn)) { final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; + + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); throw new IOException(errMsg); @@ -348,7 +351,7 @@ public static int serializeFlowFiles(final List toSwap, final Fl out.flush(); } - logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[] {toSwap.size(), queue, swapLocation}); + logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{toSwap.size(), queue, swapLocation}); return toSwap.size(); } @@ -376,13 +379,13 @@ static SwapContents deserializeFlowFiles(final DataInputStream in, final String final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); + + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); } final String connectionId = in.readUTF(); // Connection ID if (!connectionId.equals(queue.getIdentifier())) { - throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation + - " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier()); + throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation + + " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier()); } int numRecords = 0; @@ -396,8 +399,8 @@ static SwapContents deserializeFlowFiles(final DataInputStream in, final String } } catch (final EOFException eof) { final QueueSize queueSize = new QueueSize(numRecords, contentSize); - final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections. emptyList()); - final SwapContents partialContents = new StandardSwapContents(summary, Collections. emptyList()); + final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.emptyList()); + final SwapContents partialContents = new StandardSwapContents(summary, Collections.emptyList()); throw new IncompleteSwapFileException(swapLocation, partialContents); } @@ -406,7 +409,7 @@ static SwapContents deserializeFlowFiles(final DataInputStream in, final String } private static SwapContents deserializeFlowFiles(final DataInputStream in, final QueueSize queueSize, final Long maxRecordId, - final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager, final String location) throws IOException { + final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager, final String location) throws IOException { final List flowFiles = new ArrayList<>(queueSize.getObjectCount()); final List resourceClaims = new ArrayList<>(queueSize.getObjectCount()); Long maxId = maxRecordId; @@ -432,7 +435,7 @@ private static SwapContents deserializeFlowFiles(final DataInputStream in, final if (serializationVersion > 1) { // Lineage information was added in version 2 - if(serializationVersion < 10){ + if (serializationVersion < 10) { final int numLineageIdentifiers = in.readInt(); for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) { in.readUTF(); //skip each identifier @@ -590,7 +593,6 @@ private static void fillBuffer(final InputStream in, final byte[] buffer, final } } - private void error(final String error) { logger.error(error); if (eventReporter != null) { @@ -605,9 +607,8 @@ private void warn(final String warning) { } } - - private static class SwapFileComparator implements Comparator { + @Override public int compare(final String o1, final String o2) { if (o1 == o2) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 77c3dd7b5f01..abb4b78bc2b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -275,7 +275,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final SnippetManager snippetManager; private final long gracefulShutdownSeconds; private final ExtensionManager extensionManager; - private final NiFiProperties properties; + private final NiFiProperties nifiProperties; private final SSLContext sslContext; private final Set externalSiteListeners = new HashSet<>(); private final AtomicReference counterRepositoryRef; @@ -420,7 +420,7 @@ public static FlowController createClusteredInstance( private FlowController( final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, + final NiFiProperties nifiProperties, final Authorizer authorizer, final AuditService auditService, final StringEncryptor encryptor, @@ -435,16 +435,16 @@ private FlowController( maxEventDrivenThreads = new AtomicInteger(5); this.encryptor = encryptor; - this.properties = properties; + this.nifiProperties = nifiProperties; this.heartbeatMonitor = heartbeatMonitor; - sslContext = SslContextFactory.createSslContext(properties, false); + sslContext = SslContextFactory.createSslContext(nifiProperties, false); extensionManager = new ExtensionManager(); this.clusterCoordinator = clusterCoordinator; timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); - final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager); + final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, resourceClaimManager); flowFileRepository = flowFileRepo; flowFileEventRepository = flowFileEventRepo; counterRepositoryRef = new AtomicReference<>(new StandardCounterRepository()); @@ -453,25 +453,25 @@ private FlowController( this.variableRegistry = variableRegistry == null ? VariableRegistry.EMPTY_REGISTRY : variableRegistry; try { - this.provenanceRepository = createProvenanceRepository(properties); + this.provenanceRepository = createProvenanceRepository(nifiProperties); this.provenanceRepository.initialize(createEventReporter(bulletinRepository), authorizer, this); } catch (final Exception e) { throw new RuntimeException("Unable to create Provenance Repository", e); } try { - this.contentRepository = createContentRepository(properties); + this.contentRepository = createContentRepository(nifiProperties); } catch (final Exception e) { throw new RuntimeException("Unable to create Content Repository", e); } try { - this.stateManagerProvider = StandardStateManagerProvider.create(properties, this.variableRegistry); + this.stateManagerProvider = StandardStateManagerProvider.create(nifiProperties, this.variableRegistry); } catch (final IOException e) { throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.variableRegistry); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.variableRegistry, this.nifiProperties); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); @@ -479,7 +479,7 @@ private FlowController( eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, this.variableRegistry)); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); - final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); + final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); @@ -490,7 +490,7 @@ private FlowController( this.authorizer = authorizer; this.auditService = auditService; - final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); + final String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); long shutdownSecs; try { shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal); @@ -502,27 +502,26 @@ private FlowController( } gracefulShutdownSeconds = shutdownSecs; - remoteInputSocketPort = properties.getRemoteInputPort(); - remoteInputHttpPort = properties.getRemoteInputHttpPort(); - isSiteToSiteSecure = properties.isSiteToSiteSecure(); + remoteInputSocketPort = nifiProperties.getRemoteInputPort(); + remoteInputHttpPort = nifiProperties.getRemoteInputHttpPort(); + isSiteToSiteSecure = nifiProperties.isSiteToSiteSecure(); if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) { throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured"); } this.configuredForClustering = configuredForClustering; - this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); + this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); this.snippetManager = new SnippetManager(); - final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, - properties, encryptor, this, this.variableRegistry); + nifiProperties, encryptor, this, this.variableRegistry); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); rootGroupRef.set(rootGroup); instanceId = ComponentIdGenerator.generateId().toString(); - controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry); + controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry, this.nifiProperties); if (remoteInputSocketPort == null) { LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set"); @@ -534,13 +533,13 @@ private FlowController( RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class); final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null; - externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nodeInformant)); + externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nifiProperties, nodeInformant)); } if (remoteInputHttpPort == null) { LOG.info("Not enabling HTTP(S) Site-to-Site functionality because the '" + NiFiProperties.SITE_TO_SITE_HTTP_ENABLED + "' property is not true"); } else { - externalSiteListeners.add(HttpRemoteSiteListener.getInstance()); + externalSiteListeners.add(HttpRemoteSiteListener.getInstance(nifiProperties)); } for (final RemoteSiteListener listener : externalSiteListeners) { @@ -548,7 +547,7 @@ private FlowController( } // Determine frequency for obtaining component status snapshots - final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); long snapshotMillis; try { snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); @@ -557,9 +556,9 @@ private FlowController( } // Initialize the Embedded ZooKeeper server, if applicable - if (properties.isStartEmbeddedZooKeeper() && configuredForClustering) { + if (nifiProperties.isStartEmbeddedZooKeeper() && configuredForClustering) { try { - zooKeeperStateServer = ZooKeeperStateServer.create(properties); + zooKeeperStateServer = ZooKeeperStateServer.create(nifiProperties); zooKeeperStateServer.start(); } catch (final IOException | ConfigException e) { throw new IllegalStateException("Unable to initailize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e); @@ -580,8 +579,8 @@ public void run() { heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); if (configuredForClustering) { - leaderElectionManager = new CuratorLeaderElectionManager(4, properties); - heartbeater = new ClusterProtocolHeartbeater(protocolSender, properties); + leaderElectionManager = new CuratorLeaderElectionManager(4, nifiProperties); + heartbeater = new ClusterProtocolHeartbeater(protocolSender, nifiProperties); // Check if there is already a cluster coordinator elected. If not, go ahead // and register for coordinator role. If there is already one elected, do not register until @@ -624,7 +623,7 @@ private static FlowFileRepository createFlowFileRepository(final NiFiProperties } try { - final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class); + final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class, properties); synchronized (created) { created.initialize(contentClaimManager); } @@ -641,7 +640,7 @@ private static FlowFileSwapManager createSwapManager(final NiFiProperties proper } try { - return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class); + return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class, properties); } catch (final Exception e) { throw new RuntimeException(e); } @@ -820,7 +819,7 @@ private ContentRepository createContentRepository(final NiFiProperties propertie } try { - final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class); + final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class, properties); synchronized (contentRepo) { contentRepo.initialize(resourceClaimManager); } @@ -838,21 +837,21 @@ private ProvenanceRepository createProvenanceRepository(final NiFiProperties pro } try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceRepository.class); + return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceRepository.class, properties); } catch (final Exception e) { throw new RuntimeException(e); } } private ComponentStatusRepository createComponentStatusRepository() { - final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); + final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); if (implementationClassName == null) { throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); } try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); + return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class, nifiProperties); } catch (final Exception e) { throw new RuntimeException(e); } @@ -883,7 +882,7 @@ public Connection createConnection(final String id, final String name, final Con } // Create and initialize a FlowFileSwapManager for this connection - final FlowFileSwapManager swapManager = createSwapManager(properties); + final FlowFileSwapManager swapManager = createSwapManager(nifiProperties); final EventReporter eventReporter = createEventReporter(getBulletinRepository()); try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -984,7 +983,7 @@ public Port createLocalOutputPort(String id, String name) { * @throws NullPointerException if the argument is null */ public ProcessGroup createProcessGroup(final String id) { - return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor, this, variableRegistry); + return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, nifiProperties, encryptor, this, variableRegistry); } /** @@ -1040,11 +1039,11 @@ public ProcessorNode createProcessor(final String type, String id, final boolean final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ProcessorNode procNode; if (creationSuccessful) { - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties); } final LogRepository logRepository = LogRepositoryFactory.getRepository(id); @@ -1087,7 +1086,7 @@ private Processor instantiateProcessor(final String type, final String identifie final Class processorClass = rawClass.asSubclass(Processor.class); processor = processorClass.newInstance(); final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor); - final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this, this); + final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this, this, nifiProperties); processor.initialize(ctx); LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger); @@ -1191,7 +1190,7 @@ public Port createRemoteOutputPort(String id, String name) { * @throws IllegalArgumentException if uri is not a valid URI. */ public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) { - return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext); + return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext, nifiProperties); } public ProcessGroup getRootGroup() { @@ -2845,7 +2844,7 @@ public ReportingTaskNode createReportingTask(final String type, final String id, if (firstTimeAdded) { final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), - SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this); + SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this, nifiProperties); try { task.initialize(config); @@ -3305,7 +3304,6 @@ public boolean isConfiguredForClustering() { return configuredForClustering; } - private void registerForClusterCoordinator() { leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override @@ -3320,7 +3318,6 @@ public synchronized void onLeaderRelinquish() { // call start() when we become the leader, and this will ensure that initialization is handled. The heartbeat monitor // then will check the zookeeper znode to check if it is the cluster coordinator before kicking any nodes out of the // cluster. - if (clusterCoordinator != null) { clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR); } @@ -3357,7 +3354,8 @@ public void onLeaderRelinquish() { * either connected or trying to connect to the cluster. * * @param clustered true if clustered - * @param clusterInstanceId if clustered is true, indicates the InstanceID of the Cluster Manager + * @param clusterInstanceId if clustered is true, indicates the InstanceID + * of the Cluster Manager */ public void setClustered(final boolean clustered, final String clusterInstanceId) { writeLock.lock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 091e59ca096c..6ad997677877 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -135,6 +135,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { */ private NodeIdentifier nodeId; + private final NiFiProperties nifiProperties; + // guardedBy rwLock private boolean firstControllerInitialization = true; @@ -142,44 +144,45 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class); public static StandardFlowService createStandaloneInstance( - final FlowController controller, - final NiFiProperties properties, - final StringEncryptor encryptor, - final RevisionManager revisionManager, - final Authorizer authorizer) throws IOException { + final FlowController controller, + final NiFiProperties nifiProperties, + final StringEncryptor encryptor, + final RevisionManager revisionManager, + final Authorizer authorizer) throws IOException { - return new StandardFlowService(controller, properties, null, encryptor, false, null, revisionManager, authorizer); + return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer); } public static StandardFlowService createClusteredInstance( - final FlowController controller, - final NiFiProperties properties, - final NodeProtocolSenderListener senderListener, - final ClusterCoordinator coordinator, - final StringEncryptor encryptor, - final RevisionManager revisionManager, - final Authorizer authorizer) throws IOException { - - return new StandardFlowService(controller, properties, senderListener, encryptor, true, coordinator, revisionManager, authorizer); + final FlowController controller, + final NiFiProperties nifiProperties, + final NodeProtocolSenderListener senderListener, + final ClusterCoordinator coordinator, + final StringEncryptor encryptor, + final RevisionManager revisionManager, + final Authorizer authorizer) throws IOException { + + return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer); } private StandardFlowService( - final FlowController controller, - final NiFiProperties properties, - final NodeProtocolSenderListener senderListener, - final StringEncryptor encryptor, - final boolean configuredForClustering, - final ClusterCoordinator clusterCoordinator, - final RevisionManager revisionManager, - final Authorizer authorizer) throws IOException { - + final FlowController controller, + final NiFiProperties nifiProperties, + final NodeProtocolSenderListener senderListener, + final StringEncryptor encryptor, + final boolean configuredForClustering, + final ClusterCoordinator clusterCoordinator, + final RevisionManager revisionManager, + final Authorizer authorizer) throws IOException { + + this.nifiProperties = nifiProperties; this.controller = controller; - flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); + flowXml = Paths.get(nifiProperties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); - gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); - autoResumeState = properties.getAutoResumeState(); + gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); + autoResumeState = nifiProperties.getAutoResumeState(); - dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor); + dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor, nifiProperties); this.clusterCoordinator = clusterCoordinator; if (clusterCoordinator != null) { clusterCoordinator.setFlowService(this); @@ -193,8 +196,8 @@ private StandardFlowService( this.senderListener = senderListener; senderListener.addHandler(this); - final InetSocketAddress nodeApiAddress = properties.getNodeApiAddress(); - final InetSocketAddress nodeSocketAddress = properties.getClusterNodeProtocolAddress(); + final InetSocketAddress nodeApiAddress = nifiProperties.getNodeApiAddress(); + final InetSocketAddress nodeSocketAddress = nifiProperties.getClusterNodeProtocolAddress(); String nodeUuid = null; final StateManager stateManager = controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG); @@ -208,10 +211,10 @@ private StandardFlowService( // use a random UUID as the proposed node identifier this.nodeId = new NodeIdentifier(nodeUuid, - nodeApiAddress.getHostName(), nodeApiAddress.getPort(), - nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(), - properties.getRemoteInputHost(), properties.getRemoteInputPort(), - properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure()); + nodeApiAddress.getHostName(), nodeApiAddress.getPort(), + nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(), + nifiProperties.getRemoteInputHost(), nifiProperties.getRemoteInputPort(), + nifiProperties.getRemoteInputHttpPort(), nifiProperties.isSiteToSiteSecure()); } else { this.configuredForClustering = false; @@ -244,7 +247,7 @@ public void saveFlowChanges(final OutputStream outStream) throws IOException { public void overwriteFlow(final InputStream is) throws IOException { writeLock.lock(); try (final OutputStream output = Files.newOutputStream(flowXml, StandardOpenOption.WRITE, StandardOpenOption.CREATE); - final OutputStream gzipOut = new GZIPOutputStream(output);) { + final OutputStream gzipOut = new GZIPOutputStream(output);) { FileUtils.copy(is, gzipOut); } finally { writeLock.unlock(); @@ -253,7 +256,7 @@ public void overwriteFlow(final InputStream is) throws IOException { @Override public void saveFlowChanges(final TimeUnit delayUnit, final long delay) { - final boolean archiveEnabled = NiFiProperties.getInstance().isFlowConfigurationArchiveEnabled(); + final boolean archiveEnabled = nifiProperties.isFlowConfigurationArchiveEnabled(); saveFlowChanges(delayUnit, delay, archiveEnabled); } @@ -584,12 +587,12 @@ private void handleReconnectionRequest(final ReconnectionRequestMessage request) // reconnect final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), - request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); + request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); loadFromConnectionResponse(connectionResponse); clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() - .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); + .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); controller.resumeHeartbeats(); // we are now connected, so resume sending heartbeats. logger.info("Node reconnected."); @@ -637,7 +640,7 @@ private void disconnect(final String explanation) { // write lock must already be acquired private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow) - throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { logger.trace("Loading flow from bytes"); // resolve the given flow (null means load flow from disk) @@ -695,16 +698,15 @@ private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmpty } /** - * In NiFi 0.x, templates were stored in a templates directory as separate files. They are - * now stored in the flow itself. If there already are templates in that directory, though, - * we want to restore them. + * In NiFi 0.x, templates were stored in a templates directory as separate + * files. They are now stored in the flow itself. If there already are + * templates in that directory, though, we want to restore them. * * @return the templates found in the templates directory * @throws IOException if unable to read from the file system */ public List