From ce94a60bc69ceaa59b2c90c256eb5098e3969828 Mon Sep 17 00:00:00 2001 From: patricker Date: Mon, 6 Feb 2017 20:49:28 -0700 Subject: [PATCH] NIFI-3446 --- .../org/apache/nifi/util/NiFiProperties.java | 1 + .../nifi/controller/FlowController.java | 31 ++++++++++++++++ .../nifi/controller/StandardFlowService.java | 26 +++----------- .../persistence/FlowConfigurationDAO.java | 23 ++++++++++++ .../StandardXMLFlowConfigurationDAO.java | 36 +++++++++++++++---- ...ache.nifi.persistence.FlowConfigurationDAO | 15 ++++++++ .../nifi-framework/nifi-resources/pom.xml | 1 + .../src/main/resources/conf/nifi.properties | 1 + 8 files changed, 106 insertions(+), 28 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.persistence.FlowConfigurationDAO diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 933d62d66842..a466992f593c 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -48,6 +48,7 @@ public abstract class NiFiProperties { // core properties public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path"; + public static final String FLOW_CONFIGURATION_IMPLEMENTATION = "nifi.flow.configuration.implementation"; public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file"; public static final String FLOW_CONFIGURATION_ARCHIVE_ENABLED = "nifi.flow.configuration.archive.enabled"; public static final String FLOW_CONFIGURATION_ARCHIVE_DIR = "nifi.flow.configuration.archive.dir"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 7fd85b96409e..8a5cf9ebc97f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -145,6 +145,7 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarThreadContextClassLoader; +import org.apache.nifi.persistence.FlowConfigurationDAO; import org.apache.nifi.processor.GhostProcessor; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -239,6 +240,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { // default repository implementations + public static final String DEFAULT_FLOW_CONFIGURATION_IMPLEMENTATION = "org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO"; public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository"; public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository"; @@ -267,6 +269,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final FlowFileEventRepository flowFileEventRepository; private final ProvenanceRepository provenanceRepository; private final BulletinRepository bulletinRepository; + private final FlowConfigurationDAO flowConfigurationDAO; private final StandardProcessScheduler processScheduler; private final SnippetManager snippetManager; private final long gracefulShutdownSeconds; @@ -471,6 +474,12 @@ private FlowController( throw new RuntimeException(e); } + try{ + this.flowConfigurationDAO = createFlowConfigurationDAO(nifiProperties); + } catch (final Exception e) { + throw new RuntimeException("Unable to create Flow Configuration DAO", e); + } + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.variableRegistry, this.nifiProperties); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); @@ -825,6 +834,24 @@ public void onFlowInitialized(final boolean startDelayedComponents) { } } + private FlowConfigurationDAO createFlowConfigurationDAO(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + final String implementationClassName = properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_IMPLEMENTATION, DEFAULT_FLOW_CONFIGURATION_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Flow Configuration DAO because the NiFi Properties is missing the following property: " + + NiFiProperties.FLOW_CONFIGURATION_IMPLEMENTATION); + } + + try { + final FlowConfigurationDAO flowConfiguration = NarThreadContextClassLoader.createInstance(implementationClassName, FlowConfigurationDAO.class, properties); + synchronized (flowConfiguration) { + flowConfiguration.initialize(encryptor, properties); + } + return flowConfiguration; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION); if (implementationClassName == null) { @@ -1174,6 +1201,10 @@ public SnippetManager getSnippetManager() { return snippetManager; } + public FlowConfigurationDAO getFlowConfigurationDAO() { + return flowConfigurationDAO; + } + public StateManagerProvider getStateManagerProvider() { return stateManagerProvider; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 2d35a63a1a6e..622b77b842d6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -25,10 +25,7 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; @@ -43,9 +40,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; @@ -82,13 +76,11 @@ import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.persistence.FlowConfigurationDAO; -import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO; import org.apache.nifi.persistence.TemplateDeserializer; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.services.FlowService; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.revision.RevisionManager; import org.slf4j.Logger; @@ -103,7 +95,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private static final String NODE_UUID = "Node UUID"; private final FlowController controller; - private final Path flowXml; private final FlowConfigurationDAO dao; private final int gracefulShutdownSeconds; private final boolean autoResumeState; @@ -177,12 +168,11 @@ private StandardFlowService( this.nifiProperties = nifiProperties; this.controller = controller; - flowXml = Paths.get(nifiProperties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); autoResumeState = nifiProperties.getAutoResumeState(); - dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor, nifiProperties); + dao = controller.getFlowConfigurationDAO(); this.clusterCoordinator = clusterCoordinator; if (clusterCoordinator != null) { clusterCoordinator.setFlowService(this); @@ -246,9 +236,8 @@ public void saveFlowChanges(final OutputStream outStream) throws IOException { @Override public void overwriteFlow(final InputStream is) throws IOException { writeLock.lock(); - try (final OutputStream output = Files.newOutputStream(flowXml, StandardOpenOption.WRITE, StandardOpenOption.CREATE); - final OutputStream gzipOut = new GZIPOutputStream(output);) { - FileUtils.copy(is, gzipOut); + try{ + dao.overwriteFlow(is); } finally { writeLock.unlock(); } @@ -909,14 +898,7 @@ private void loadFromConnectionResponse(final ConnectionResponse response) throw public void copyCurrentFlow(final OutputStream os) throws IOException { readLock.lock(); try { - if (!Files.exists(flowXml) || Files.size(flowXml) == 0) { - return; - } - - try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ); - final InputStream gzipIn = new GZIPInputStream(in)) { - FileUtils.copy(gzipIn, os); - } + dao.copyCurrentFlow(os); } finally { readLock.unlock(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java index 08658b19e940..8b87efe228ba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java @@ -25,11 +25,21 @@ import org.apache.nifi.controller.UninheritableFlowException; import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSynchronizationException; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.util.NiFiProperties; /** * Interface to define service methods for FlowController configuration. */ public interface FlowConfigurationDAO { + /** + * Performs any initialization needed. This should be called only by the + * framework. + * @param encryptor protected property encryptor/decryptor + * @param nifiProperties collection of NiFi properties + * @throws IOException thrown if Flow Configuration fails to initialize + */ + void initialize(StringEncryptor encryptor, NiFiProperties nifiProperties) throws IOException; /** * @return true if a file containing the flow is present, false otherwise @@ -110,4 +120,17 @@ void load(FlowController controller, DataFlow dataFlow) */ void save(FlowController flow, boolean archive) throws IOException; + /** + * Overwrites the existing Flow Configuration using the one provided in the input stream + * @param is to use to overwrite the existing flow + * @throws IOException if flow fails to be overwritten + */ + void overwriteFlow(final InputStream is) throws IOException; + + /** + * Copies the current Flow Configuration to the output stream + * @param os to copy the current flow too + * @throws IOException thrown if Flow fails to copy to output stream + */ + void copyCurrentFlow(final OutputStream os) throws IOException; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 5eceb522e87c..cc9dc134dfee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -42,15 +43,19 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationDAO { - private final Path flowXmlPath; - private final StringEncryptor encryptor; - private final FlowConfigurationArchiveManager archiveManager; - private final NiFiProperties nifiProperties; + private Path flowXmlPath; + private StringEncryptor encryptor; + private FlowConfigurationArchiveManager archiveManager; + private NiFiProperties nifiProperties; + private Path flowXml; private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class); - public StandardXMLFlowConfigurationDAO(final Path flowXml, final StringEncryptor encryptor, final NiFiProperties nifiProperties) throws IOException { + @Override + public void initialize(final StringEncryptor encryptor, final NiFiProperties nifiProperties) throws IOException { this.nifiProperties = nifiProperties; + + flowXml = Paths.get(nifiProperties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); final File flowXmlFile = flowXml.toFile(); if (!flowXmlFile.exists()) { // createDirectories would throw an exception if the directory exists but is a symbolic link @@ -118,7 +123,7 @@ public void load(final OutputStream os, final boolean compressed) throws IOExcep @Override public synchronized void save(final InputStream is) throws IOException { try (final OutputStream outStream = Files.newOutputStream(flowXmlPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE); - final OutputStream gzipOut = new GZIPOutputStream(outStream)) { + final OutputStream gzipOut = new GZIPOutputStream(outStream)) { FileUtils.copy(is, gzipOut); } } @@ -181,4 +186,23 @@ public synchronized void save(final FlowController controller, final boolean arc } } + @Override + public void overwriteFlow(InputStream is) throws IOException { + try (final OutputStream output = Files.newOutputStream(flowXml, StandardOpenOption.WRITE, StandardOpenOption.CREATE); + final OutputStream gzipOut = new GZIPOutputStream(output)) { + FileUtils.copy(is, gzipOut); + } + } + + @Override + public void copyCurrentFlow(OutputStream os) throws IOException { + if (!Files.exists(flowXml) || Files.size(flowXml) == 0) { + return; + } + + try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ); + final InputStream gzipIn = new GZIPInputStream(in)) { + FileUtils.copy(gzipIn, os); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.persistence.FlowConfigurationDAO b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.persistence.FlowConfigurationDAO new file mode 100644 index 000000000000..5a485669faf9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.persistence.FlowConfigurationDAO @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 9baa3eac2b23..11969b21c7bd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -37,6 +37,7 @@ 10 millis ./conf/flow.xml.gz + org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO true ./conf/archive/ 30 days diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index a768af47b91b..3af4eb2b6906 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -16,6 +16,7 @@ # Core Properties # nifi.version=${nifi.version} nifi.flow.configuration.file=${nifi.flow.configuration.file} +nifi.flow.configuration.implementation=${nifi.flow.configuration.implementation} nifi.flow.configuration.archive.enabled=${nifi.flow.configuration.archive.enabled} nifi.flow.configuration.archive.dir=${nifi.flow.configuration.archive.dir} nifi.flow.configuration.archive.max.time=${nifi.flow.configuration.archive.max.time}