From 0688e93b2107e76675167ede6e74b061ed2de1c4 Mon Sep 17 00:00:00 2001
From: "Peter G. Horvath"
Date: Sat, 10 Nov 2018 22:22:01 +0100
Subject: [PATCH 1/6] NIFI-5318 Implement NiFi test harness: initial commit
---
nifi-testharness/.gitignore | 1 +
.../NIFI_TESTHARNESS_README.txt | 3 +
nifi-testharness/pom.xml | 234 +++++++
.../SimpleNiFiFlowDefinitionEditor.java | 202 ++++++
.../nifi/testharness/TestNiFiInstance.java | 616 ++++++++++++++++++
.../testharness/TestNiFiInstanceAware.java | 23 +
.../api/FlowFileEditorCallback.java | 46 ++
.../nifi/testharness/util/FileUtils.java | 88 +++
.../util/NiFiCoreLibClassLoader.java | 84 +++
.../nifi/testharness/util/XmlUtils.java | 67 ++
.../org/apache/nifi/testharness/util/Zip.java | 134 ++++
.../nifi/testharness/samples/Constants.java | 32 +
.../testharness/samples/NiFiFlowTest.java | 157 +++++
.../testharness/samples/NiFiMockFlowTest.java | 119 ++++
.../nifi/testharness/samples/TestUtils.java | 57 ++
.../testharness/samples/mock/GetHTTPMock.java | 90 +++
.../samples/mock/MockProcessor.java | 101 +++
nifi-testharness/src/test/resources/flow.xml | 154 +++++
.../src/test/resources/logback-test.xml | 15 +
.../test/resources/sample_technology_rss.xml | 28 +
pom.xml | 1 +
21 files changed, 2252 insertions(+)
create mode 100644 nifi-testharness/.gitignore
create mode 100644 nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
create mode 100644 nifi-testharness/pom.xml
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
create mode 100644 nifi-testharness/src/test/resources/flow.xml
create mode 100644 nifi-testharness/src/test/resources/logback-test.xml
create mode 100644 nifi-testharness/src/test/resources/sample_technology_rss.xml
diff --git a/nifi-testharness/.gitignore b/nifi-testharness/.gitignore
new file mode 100644
index 000000000000..17dff51989fa
--- /dev/null
+++ b/nifi-testharness/.gitignore
@@ -0,0 +1 @@
+nifi_testharness_nifi_home/*
\ No newline at end of file
diff --git a/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
new file mode 100644
index 000000000000..e2d4da0bb453
--- /dev/null
+++ b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
@@ -0,0 +1,3 @@
+This directory is used to mimic NiFi's own home directory: the JVM hosting the
+TestNiFiInstance has to be started here. Once started, TestNiFiInstance then
+creates symlinks to the actual NiFi installation directory.
\ No newline at end of file
diff --git a/nifi-testharness/pom.xml b/nifi-testharness/pom.xml
new file mode 100644
index 000000000000..f331209aaf55
--- /dev/null
+++ b/nifi-testharness/pom.xml
@@ -0,0 +1,234 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi
+ 1.9.0-SNAPSHOT
+
+ nifi-testharness
+ A test harness for running NiFi flow tests
+ jar
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
+ src/test/resources/sample_technology_rss.xml
+ src/test/resources/logback-test.xml
+ src/test/resources/flow.xml
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+ compile
+ testCompile
+
+
+
+
+
+ 1.8
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.20.1
+
+ 1
+ false
+ nifi_testharness_nifi_home
+
+
+
+
+
+
+
+
+ skip-testharness-tests
+
+ true
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ true
+
+
+ **/samples/*Test.class
+ **/samples/Test*.class
+ **/samples/*Spec.class
+
+ true
+ -Xmx1G
+ -Djava.net.preferIPv4Stack=true
+ ${maven.surefire.arguments}
+
+
+
+
+
+
+
+ run-testharness-tests
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ false
+ ${project.basedir}/nifi_testharness_nifi_home
+
+ true
+
+
+ **/*Test.class
+ **/Test*.class
+ **/*Spec.class
+
+ true
+ -Xmx1G
+ -Djava.net.preferIPv4Stack=true
+ ${maven.surefire.arguments}
+
+
+
+
+
+
+
+
+
+
+ UTF-8
+ 1.7.25
+ 9.4.3.v20170317
+
+
+
+
+ org.apache.nifi
+ nifi-runtime
+ ${project.version}
+ provided
+
+
+
+ org.apache.nifi
+ nifi-assembly
+ ${project.version}
+ test
+ pom
+
+
+
+ org.apache.nifi
+ nifi-framework-api
+ ${project.version}
+ provided
+
+
+
+ org.apache.nifi
+ nifi-framework-core
+ ${project.version}
+ provided
+
+
+
+
+ org.apache.nifi
+ nifi-bootstrap
+ ${project.version}
+ provided
+
+
+
+ org.apache.nifi
+ nifi-api
+ ${project.version}
+ provided
+
+
+
+ javax.servlet
+ javax.servlet-api
+ 3.1.0
+
+
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty.version}
+ provided
+
+
+
+
+ org.easymock
+ easymock
+ 3.4
+ test
+
+
+
+
+ org.easymock
+ easymockclassextension
+ 3.2
+ test
+
+
+
+
+ org.powermock
+ powermock-api-easymock
+ 1.7.1
+ test
+
+
+
+
+ org.powermock
+ powermock-module-junit4
+ 1.7.1
+ test
+
+
+
+
+
+
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
new file mode 100644
index 000000000000..d4f297d4c96d
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness;
+
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
+import java.util.LinkedList;
+import java.util.Objects;
+
+
+/**
+ *
+ * A facility to describe simple, common changes to a NiFi flow before it is installed to the test
+ * NiFi instance. Intended to be used by
+ * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}
+ *
+ *
+ *
+ * The desired edits can be configured via the {@link Builder} object returned by the {@link #builder()}
+ * method. Once fully configured, the {@link Builder#build()} emits a {@code FlowFileEditorCallback}
+ * object that can be passed to
+ * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}.
+ *
+ *
+ *
+ * CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ *
+ *
+ * @see TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)
+ *
+ */
+
+public final class SimpleNiFiFlowDefinitionEditor implements FlowFileEditorCallback, TestNiFiInstanceAware {
+
+
+ private final LinkedList delegateActions;
+ private TestNiFiInstance testNiFiInstance;
+
+ private SimpleNiFiFlowDefinitionEditor(LinkedList delegateActions) {
+ this.delegateActions = delegateActions;
+ }
+
+ @Override
+ public Document edit(Document document) throws Exception {
+
+ for (FlowFileEditorCallback change : delegateActions) {
+ if (change instanceof TestNiFiInstanceAware) {
+ ((TestNiFiInstanceAware)change).setTestNiFiInstance(testNiFiInstance);
+ }
+
+ document = change.edit(document);
+ }
+
+ return document;
+ }
+
+ @Override
+ public void setTestNiFiInstance(TestNiFiInstance testNiFiInstance) {
+ this.testNiFiInstance = Objects.requireNonNull(
+ testNiFiInstance, "argument testNiFiInstance cannot be null");
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private Builder() {
+ // no external instance
+ }
+
+ private XPath xpath = XPathFactory.newInstance().newXPath();
+ private final LinkedList actions = new LinkedList<>();
+
+ public Builder rawXmlChange(FlowFileEditorCallback flowFileEditorCallback) {
+ actions.addLast(flowFileEditorCallback);
+ return this;
+ }
+
+ public Builder setSingleProcessorProperty(String processorName, String propertyName, String newValue) {
+
+ return rawXmlChange(document -> {
+ String xpathString = "//processor[name/text() = '" + processorName
+ + "']/property[name/text() = '" + propertyName + "']/value";
+
+ Node propertyValueNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
+
+ if (propertyValueNode == null) {
+ throw new IllegalArgumentException("Reference to processor '"+ processorName +"' with property '"
+ + propertyName + "' not found: " + xpathString);
+ }
+
+ propertyValueNode.setTextContent(newValue);
+
+ return document;
+ });
+
+
+ }
+
+ public Builder setClassOfSingleProcessor(String processorName, Class> mockProcessor) {
+
+ return setClassOfSingleProcessor(processorName, mockProcessor.getName());
+ }
+
+ public Builder setClassOfSingleProcessor(String processorName, String newFullyQualifiedClassName) {
+
+ return rawXmlChange(document -> {
+ String xpathString = "//processor[name/text() = '" + processorName + "']/class";
+
+ Node classNameNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
+
+ if (classNameNode == null) {
+ throw new IllegalArgumentException("Reference to processor '"+ processorName +" not found: " +
+ xpathString);
+ }
+
+ classNameNode.setTextContent(newFullyQualifiedClassName);
+
+ return document;
+ });
+ }
+
+ public Builder updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion() {
+
+ return rawXmlChange(new UpdateFlowFileNiFiVersionFlowFileEditorCallback());
+ }
+
+
+ public SimpleNiFiFlowDefinitionEditor build() {
+ return new SimpleNiFiFlowDefinitionEditor(actions);
+ }
+
+ }
+
+
+ private static final class UpdateFlowFileNiFiVersionFlowFileEditorCallback
+ implements FlowFileEditorCallback, TestNiFiInstanceAware {
+
+ private TestNiFiInstance testNiFiInstance;
+
+ @Override
+ public Document edit(Document document) throws Exception {
+ String niFiVersion = getNiFiVersion();
+
+ XPath xpath = XPathFactory.newInstance().newXPath();
+
+ NodeList processorNodeVersionList = (NodeList)
+ xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version",
+ document, XPathConstants.NODESET);
+
+ final int length = processorNodeVersionList.getLength();
+ for (int i=0; i
+ * An API wrapper of a "test" NiFi instance to which a flow definition is installed for testing.
+ *
+ *
+ * Due to NiFi design restrictions, {@code TestNiFiInstance} has to take full command
+ * of the current working directory: it installs a full NiFi installation to there. To ensure
+ * this is desired, it will only run if the current directory is called
+ * "nifi_testharness_nifi_home". As such the JVM process has to be started inside a directory
+ * called "nifi_testharness_nifi_home" so that the following is true:
+ *
+ *
+ * new File(System.getProperty("user.dir")).getName().equals("nifi_testharness_nifi_home")
+ *
+ *
+ *
+ *
+ * Before {@code TestNiFiInstance} can be used, it has to be configured via its builder
+ * interface:
+ *
+ *
+ * {@link Builder#setNiFiBinaryDistributionZip(File)} specifies the location of the NiFi binary
+ * distribution ZIP file to be used.
+ *
+ *
+ * {@link Builder#setFlowXmlToInstallForTesting(File)} specifies the location of the NiFi flow
+ * to install.
+ *
+ *
+ * {@link Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)} allows on-the-fly
+ * changes to be performed to the Flow file before it is actually installed.
+ *
+ * If the current working directory is called "nifi_testharness_nifi_home", the caller can
+ * {@link #install()} this {@code TestNiFiInstance}, which will
+ *
+ *
+ * (as a first cleanup step) erase all content of the current working directory.
+ * (NOTE: this potentially destructive operation is the reason why we have the
+ * "nifi_testharness_nifi_home" directory name guard in place!)
+ *
+ *
+ * Extracts the contents of the NiFi binary distribution ZIP file specified in
+ * the configuration to a to a temporary directory.
+ *
+ * Symlinks all files from the temporary directory to the current working
+ * directory, causing the directory to hold a fully functional
+ * NiFi installation.
+ *
+ *
+ * Installs the flow definition files(s) to the NiFi instance specified in
+ * the configuration.
+ *
+ *
+ *
+ *
+ *
+ *
+ * The caller then can proceed to {@link #start()} this {@code TestNiFiInstance},
+ * which will bootstrap the NiFi engine, which in turn will pick up and start processing
+ * the flow definition supplied by the caller in the configuration.
+ *
+ *
+ *
+ * Once the previous step is done, the caller can perform asserts regarding the observed behaviour
+ * of the NiFi flow, just like one would do it with standard Java test cases.
+ *
+ *
+ *
+ * To perform a clean shutdown of the hosted NiFi instance, the caller is required to call
+ * {@link #stopAndCleanup()}, which will shut down NiFi and remove all temporary files, including
+ * the symlinks created in the current working directory.
+ *
+ *
+ *
+ *
NOTES
+ *
+ *
+ * {@code TestNiFiInstance} is NOT thread safe: if more than one thread uses it,
+ * external synchronisation is required.
+ *
+ *
+ * Only one {@code TestNiFiInstance} can be started in the same "nifi_testharness_nifi_home"
+ * directory at the same time.
+ *
+ *
+ * Currently, due to NiFi limitations, one {@code TestNiFiInstance} can be started per JVM process.
+ * If multiple test cases are required, launch a new JVM process per test case
+ * (in sequence, see the point above): Maven/Surefire has built-in support for this.
+ *
+ *
+ *
+ *
+ * CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ *
+ *
+ *
+ * @see TestNiFiInstance#builder()
+ *
+ *
+ */
+public class TestNiFiInstance {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestNiFiInstance.class);
+
+
+ private EmbeddedNiFi testNiFi;
+
+ private final File nifiHomeDir;
+ private final File bootstrapLibDir;
+
+ private File nifiProperties;
+
+ private final File flowXmlGz;
+
+ private final File placeholderNiFiHomeDir;
+
+ private String nifiVersion;
+
+
+ private enum State {
+ STOPPED,
+ STOP_FAILED,
+ START_FAILED(STOPPED),
+ STARTED(STOPPED, STOP_FAILED),
+ INSTALLATION_FAILED(),
+ FLOW_INSTALLED(STARTED, START_FAILED),
+ INSTALLED(FLOW_INSTALLED, INSTALLATION_FAILED),
+ CREATED(INSTALLED, INSTALLATION_FAILED);
+
+
+ private final Set allowedTransitions;
+
+ State(State... allowedTransitions) {
+ this.allowedTransitions = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedTransitions)));
+ }
+
+ private void checkCanTransition(State newState) {
+ if (!this.allowedTransitions.contains(newState)) {
+ throw new IllegalStateException("Cannot transition from " + this + " to " + newState);
+ }
+ }
+ }
+
+ private State currentState = State.CREATED;
+
+ private final File nifiBinaryZip;
+ private final File flowXml;
+ private final FlowFileEditorCallback editCallback;
+
+ private TestNiFiInstance(File nifiBinaryZip, File flowXml, FlowFileEditorCallback editCallback) {
+ this.nifiBinaryZip = Objects.requireNonNull(nifiBinaryZip, "nifiBinaryZip");
+ this.flowXml = Objects.requireNonNull(flowXml, "flowXml");
+ this.editCallback = editCallback;
+
+ nifiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
+
+ final File configDir = new File(nifiHomeDir, "conf");
+ final File libDir = new File(nifiHomeDir, "lib");
+
+ bootstrapLibDir = new File(libDir, "bootstrap");
+
+ nifiProperties = new File(configDir, "nifi.properties");
+
+ flowXmlGz = new File(configDir, "flow.xml.gz");
+
+ placeholderNiFiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
+ }
+
+ String getNifiVersion() {
+ switch (currentState) {
+ case INSTALLED:
+ case STOPPED:
+
+ return Objects.requireNonNull(nifiVersion, "nifiVersion is null");
+
+ default:
+ throw new IllegalStateException(
+ "NiFi version can only be retrieved after a successful installation, not in: "
+ + currentState);
+ }
+ }
+
+ public void install() throws IOException {
+
+ currentState.checkCanTransition(State.INSTALLED);
+
+ File[] staleInstallations = placeholderNiFiHomeDir.listFiles((dir, name) -> name.startsWith("nifi-"));
+ if (staleInstallations != null) {
+ Arrays.stream(staleInstallations).forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
+ }
+
+ Path tempDirectory = null;
+ try {
+ tempDirectory = Files.createTempDirectory("installable-flow");
+
+
+
+ LOGGER.info("Uncompressing NiFi archive {} to {} ...", nifiBinaryZip, placeholderNiFiHomeDir);
+
+ Zip.unzipFile(nifiBinaryZip, placeholderNiFiHomeDir, new Zip.StatusListenerAdapter() {
+ @Override
+ public void onUncompressDone(ZipEntry ze) {
+ LOGGER.debug("Uncompressed {}", ze.getName());
+ }
+ });
+
+ LOGGER.info("Uncompressing DONE");
+
+ File actualNiFiHomeDir = getActualNiFiHomeDir(placeholderNiFiHomeDir);
+
+ nifiVersion = getNiFiVersion(actualNiFiHomeDir);
+
+ currentState = State.INSTALLED;
+
+ File installableFlowFile = createInstallableFlowFile(tempDirectory);
+
+ validateNiFiVersionAgainstFlowVersion(nifiVersion, installableFlowFile);
+
+ FileUtils.createSymlinks(placeholderNiFiHomeDir, actualNiFiHomeDir);
+
+ installFlowFile(installableFlowFile);
+ } catch (Exception e) {
+
+ currentState = State.INSTALLATION_FAILED;
+
+ throw new RuntimeException("Installation failed: " + e.getMessage(), e);
+
+ } finally {
+ if (tempDirectory != null) {
+ FileUtils.deleteDirectoryRecursive(tempDirectory);
+ }
+ }
+
+ currentState = State.FLOW_INSTALLED;
+ }
+
+ private File createInstallableFlowFile(Path tempDirectory) throws IOException {
+
+ File flowXmlFile = new File(tempDirectory.toFile(), "flow.xml");
+
+ if (editCallback == null) {
+ Files.copy(flowXml.toPath(), flowXmlFile.toPath());
+ } else {
+ if (editCallback instanceof TestNiFiInstanceAware) {
+ ((TestNiFiInstanceAware)editCallback).setTestNiFiInstance(this);
+ }
+
+ XmlUtils.editXml(flowXml, flowXmlFile, editCallback);
+ }
+
+ return flowXmlFile;
+ }
+
+ private void installFlowFile(File fileToIncludeInGz) throws IOException {
+ Zip.gzipFile(fileToIncludeInGz, flowXmlGz);
+ }
+
+ private static String getNiFiVersion(File nifiInstallDir) {
+
+ File libDir = new File(nifiInstallDir, "lib");
+ if (!libDir.exists()) {
+ throw new IllegalStateException(
+ "No \"lib\" directory found in NiFi home directory: " + nifiInstallDir);
+ }
+
+ File[] nifiApiJarLookupResults =
+ libDir.listFiles((dir, name) -> name.startsWith("nifi-api-") && name.endsWith(".jar"));
+
+ if (nifiApiJarLookupResults == null) {
+ // since we check the existence before, this can only be null in case of an I/O error
+ throw new IllegalStateException(
+ "I/O error listing NiFi lib directory: " + libDir);
+ }
+
+ if (nifiApiJarLookupResults.length == 0) {
+ throw new IllegalStateException(
+ "No \"\"nifi-api-*.jar\" file found in NiFi lib directory: " + libDir);
+ }
+
+ if (nifiApiJarLookupResults.length != 1) {
+ throw new IllegalStateException(
+ "Multiple \"nifi-api-*.jar\" files found in NiFi lib directory: " + libDir);
+ }
+
+ File nifiApiJar = nifiApiJarLookupResults[0];
+
+
+ return nifiApiJar.getName()
+ .replace("nifi-api-", "")
+ .replace(".jar", "");
+ }
+
+ private static void validateNiFiVersionAgainstFlowVersion(String nifiVersion, File flowFile) {
+
+ String flowFileVersion = extractFlowFileVersion(flowFile);
+
+ if (flowFileVersion != null
+ && !flowFileVersion.equalsIgnoreCase(nifiVersion)) {
+
+ // prevent user errors and fail fast in case we detect that the flow file
+ // was created by a different version of NiFi. This can prevent a lot of confusion!
+
+ throw new RuntimeException(String.format(
+ "The NiFi version referenced in the flow file ('%s') does not match the version of NiFi being used ('%s')",
+ flowFileVersion, nifiVersion));
+ }
+ }
+
+ private static String extractFlowFileVersion(File flowFile) {
+
+ Document flowDocument = XmlUtils.getFileAsDocument(flowFile);
+
+ XPath xpath = XPathFactory.newInstance().newXPath();
+
+ try {
+ NodeList processorNodeVersion = (NodeList)
+ xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version/text()",
+ flowDocument, XPathConstants.NODESET);
+
+ HashSet versionNumbers = new HashSet<>();
+
+ final int length = processorNodeVersion.getLength();
+ for (int i=0; i 1) {
+ throw new RuntimeException(
+ "Multiple NiFi versions found in Flow file, this is unexpected: " + versionNumbers);
+ }
+
+ return versionNumbers.iterator().next();
+
+ } catch (XPathExpressionException e) {
+ throw new RuntimeException("Failure extracting version information from flow file: " + flowFile, e);
+ }
+ }
+
+
+ public void start() {
+
+ currentState.checkCanTransition(State.STARTED);
+
+ try {
+ if (!bootstrapLibDir.exists()) {
+ throw new IllegalStateException("Not found: " + bootstrapLibDir);
+ }
+
+
+
+ System.setProperty("org.apache.jasper.compiler.disablejsr199", "true");
+ System.setProperty("java.security.egd", "file:/dev/urandom");
+ System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
+ System.setProperty("java.net.preferIPv4Stack", "true");
+ System.setProperty("java.awt.headless", "true");
+ System.setProperty("java.protocol.handler.pkgs", "sun.net.www.protocol");
+
+ System.setProperty("nifi.properties.file.path", nifiProperties.getAbsolutePath());
+ System.setProperty("app", "NiFi");
+ System.setProperty("org.apache.nifi.bootstrap.config.log.dir", "./logs");
+
+ ClassLoader coreClassLoader = new NiFiCoreLibClassLoader(nifiHomeDir, ClassLoader.getSystemClassLoader());
+ Thread.currentThread().setContextClassLoader(coreClassLoader);
+
+
+
+ this.testNiFi = new EmbeddedNiFi(new String[0], coreClassLoader);
+
+ } catch (Exception ex) {
+
+ currentState = State.START_FAILED;
+
+ throw new RuntimeException("Startup failed", ex);
+
+ }
+
+ currentState = State.STARTED;
+
+
+ }
+
+
+ public void stopAndCleanup() {
+ currentState.checkCanTransition(State.STOPPED);
+
+ try {
+ testNiFi.shutdown();
+
+ removeNiFiFilesCreatedForTemporaryInstallation(placeholderNiFiHomeDir);
+
+ } catch (Exception e) {
+ currentState = State.STOP_FAILED;
+
+ throw new RuntimeException(e);
+ }
+
+ currentState = State.STOPPED;
+ }
+
+ private static File requireCurrentWorkingDirectoryIsCorrect() {
+
+ File currentWorkDir = new File(System.getProperty("user.dir"));
+ if (!currentWorkDir.getName().equals("nifi_testharness_nifi_home")) {
+
+ throw new IllegalStateException(
+ "The test's working directory has to be set to nifi_testharness_nifi_home, but was: " + currentWorkDir);
+ }
+ return currentWorkDir;
+ }
+
+ private static File getActualNiFiHomeDir(File currentDir) {
+ File[] files = currentDir.listFiles((dir, name) -> name.startsWith("nifi-"));
+
+ if (files == null || files.length == 0) {
+ throw new IllegalStateException(
+ "No \"nifi-*\" directory found in temporary NiFi home directory container: " + currentDir);
+ }
+
+ if (files.length != 1) {
+ throw new IllegalStateException(
+ "Multiple \"nifi-*\" directories found in temporary NiFi home directory container: " + currentDir);
+ }
+
+ return files[0];
+ }
+
+ private static void removeNiFiFilesCreatedForTemporaryInstallation(File directoryToClear) {
+
+ if (directoryToClear != null) {
+ File[] directoryContents = directoryToClear.listFiles();
+ if (directoryContents != null) {
+ Arrays.stream(directoryContents)
+ .filter(file -> !"NIFI_TESTHARNESS_README.txt".equals(file.getName()))
+ .forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
+ }
+ }
+ }
+
+ private static void deleteFileOrDirectoryRecursively(File file) {
+ if (file.isDirectory()) {
+ FileUtils.deleteDirectoryRecursive(file);
+ } else {
+ boolean deletedSuccessfully = file.delete();
+ if (!deletedSuccessfully) {
+ throw new RuntimeException("Could not delete: " + file);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "NiFi test instance(" + Integer.toHexString(hashCode())
+ + ") state: " + currentState + ", home: " + nifiHomeDir;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+
+ public static class Builder {
+
+ private boolean isDisposed = false;
+
+ private File nifiBinaryZip;
+ private File flowXml;
+ private FlowFileEditorCallback editCallback;
+
+ /**
+ * Sets the location of the NiFi binary distribution file, from which the test instance
+ * will be uncompressed and built.
+ *
+ * @param nifiBinaryZip
+ * the NiFi binary distribution file, from which the test instance will be built (never {@code null})
+ * @return {@code this} (for method chaining)
+ */
+ public Builder setNiFiBinaryDistributionZip(File nifiBinaryZip) {
+ if (!nifiBinaryZip.exists()) {
+ throw new IllegalArgumentException("File not found: " + nifiBinaryZip);
+ }
+
+ if (nifiBinaryZip.isDirectory()) {
+ throw new IllegalArgumentException("A ZIP file is expected to be specified, not a directory: "
+ + nifiBinaryZip);
+ }
+
+ this.nifiBinaryZip = nifiBinaryZip;
+ return this;
+ }
+
+ /**
+ * Sets the NiFi flow XML, which will be installed to the NiFi instance for testing.
+ *
+ * @param flowXml the NiFi flow file to install to the test instance for testing (never {@code null})
+ *
+ * @return {@code this} (for method chaining)
+ */
+ public Builder setFlowXmlToInstallForTesting(File flowXml) {
+ if (!flowXml.exists()) {
+ throw new IllegalArgumentException("File not found: " + flowXml);
+ }
+
+ this.flowXml = flowXml;
+ return this;
+ }
+
+ /**
+ *
+ * An optional callback to change the flow definition read from
+ * {@link #setFlowXmlToInstallForTesting(File)}, before it is actually installed for testing.
+ * (NOTE: The original file remains unchanged: changes are applied to a copy of it.)
+ *
+ *
+ * NOTE: {@link SimpleNiFiFlowDefinitionEditor} provides various common flow definition changes
+ * useful for testing.
+ *
+ *
+ * @param callback an optional callback to change the flow definition
+ *
+ * @return {@code this} (for method chaining)
+ *
+ * @see SimpleNiFiFlowDefinitionEditor
+ */
+ public Builder modifyFlowXmlBeforeInstalling(FlowFileEditorCallback callback) {
+ this.editCallback = callback;
+ return this;
+ }
+
+
+
+ public TestNiFiInstance build() {
+ if (isDisposed) {
+ throw new IllegalStateException("builder can only be used once");
+ }
+ isDisposed = true;
+
+ return new TestNiFiInstance(nifiBinaryZip, flowXml, editCallback);
+ }
+
+
+ }
+
+
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java
new file mode 100644
index 000000000000..31889edc26d6
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness;
+
+public interface TestNiFiInstanceAware {
+ void setTestNiFiInstance(TestNiFiInstance testNiFiInstance);
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
new file mode 100644
index 000000000000..dabf36148c9c
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.api;
+
+import org.w3c.dom.Document;
+
+/**
+ *
+ * An interface that allows programmatic access to the contents of a NiFi Flow XML,
+ * allowing changes to be performed before it
+ * is actually installed to the NiFi instance.
+ *
+ *
+ * CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ *
+ *
+ */
+public interface FlowFileEditorCallback {
+
+ /**
+ *
+ * @param document the document to change (never {@code null})
+ * @return the changed document (never {@code null})
+ * @throws Exception in case the editing fails
+ */
+ Document edit(Document document) throws Exception;
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
new file mode 100644
index 000000000000..e207f089cc8c
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+
+public final class FileUtils {
+
+
+ private static final String MAC_DS_STORE_NAME = ".DS_Store";
+
+ private FileUtils() {
+ // no instances
+ }
+
+ public static void deleteDirectoryRecursive(Path directory) throws IOException {
+ Files.walkFileTree(directory, new SimpleFileVisitor() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ public static void deleteDirectoryRecursive(File dir) {
+ try {
+ deleteDirectoryRecursive(dir.toPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void createLink(Path newLink, Path existingFile) {
+ try {
+ Files.createSymbolicLink(newLink, existingFile);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void createSymlinks(File newLinkDir, File existingDir) {
+ Arrays.stream(existingDir.list())
+ .filter(fileName -> !MAC_DS_STORE_NAME.equals(fileName))
+ .forEach(fileName -> {
+ Path newLink = Paths.get(newLinkDir.getAbsolutePath(), fileName);
+ Path existingFile = Paths.get(existingDir.getAbsolutePath(), fileName);
+
+ File symlinkFile = newLink.toFile();
+ if (symlinkFile.exists()) {
+ symlinkFile.delete();
+ }
+
+ createLink(newLink, existingFile);
+ });
+ }
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
new file mode 100644
index 000000000000..a3af3634e5d6
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public final class NiFiCoreLibClassLoader extends URLClassLoader {
+
+
+ public NiFiCoreLibClassLoader(File nifiHomeDir, ClassLoader parent) {
+ super(getURls(nifiHomeDir), parent);
+ }
+
+ private static URL[] getURls(File nifiHomeDir) {
+
+ try {
+ File libDir = new File(nifiHomeDir, "lib");
+ File bootstrapLibDir = new File(libDir, "bootstrap");
+
+
+ List libs = Files.list(libDir.toPath())
+ .filter(NiFiCoreLibClassLoader::isJarOrNarFile)
+ .map(NiFiCoreLibClassLoader::toURL)
+ .collect(Collectors.toList());
+ List bootstrapLibs = Files.list(bootstrapLibDir.toPath())
+ .filter(NiFiCoreLibClassLoader::isJarOrNarFile)
+ .map(NiFiCoreLibClassLoader::toURL)
+ .collect(Collectors.toList());
+
+ LinkedList urls = new LinkedList<>();
+ urls.addAll(libs);
+ urls.addAll(bootstrapLibs);
+
+ return urls.toArray(new URL[urls.size()]);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+
+
+ }
+
+ private static URL toURL(Path path) {
+ try {
+ return path.toUri().toURL();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private static boolean isJarOrNarFile(Path path) {
+ String fullPathString = path.getFileName().toString();
+
+ return path.toFile().isFile() && fullPathString.endsWith(".jar");
+ }
+
+
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
new file mode 100644
index 000000000000..3e225e7df287
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.File;
+import java.io.FileInputStream;
+
+public final class XmlUtils {
+
+ public static void editXml(File inputFile, File outputFile, FlowFileEditorCallback editCallback) {
+
+ try {
+ Document document = getFileAsDocument(inputFile);
+
+ document = editCallback.edit(document);
+
+ // save the result
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ Transformer transformer = transformerFactory.newTransformer();
+ transformer.transform(new DOMSource(document), new StreamResult(outputFile));
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to change XML document: " + e.getMessage(), e);
+ }
+ }
+
+ public static Document getFileAsDocument(File xmlFile) {
+ try(FileInputStream inputStream = new FileInputStream(xmlFile)) {
+
+ DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder documentBuilder = documentBuilderFactory.newDocumentBuilder();
+
+ return documentBuilder.parse(new InputSource(inputStream));
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse XML file: " + xmlFile, e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
new file mode 100644
index 000000000000..12ea4032c1f6
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public final class Zip {
+
+ private Zip() {
+ // no external instances allowed
+ }
+
+
+ public interface StatusListener {
+ void onUncompressStarted(ZipEntry ze);
+
+ void onUncompressDone(ZipEntry ze);
+ }
+
+ public static class StatusListenerAdapter implements StatusListener {
+
+ @Override
+ public void onUncompressStarted(ZipEntry ze) {
+
+ }
+
+ @Override
+ public void onUncompressDone(ZipEntry ze) {
+
+ }
+ }
+
+ private static final StatusListener NO_OP_STATUS_LISTENER = new StatusListenerAdapter();
+
+ public static void unzipFile(File zipFile, File targetDirectory) throws IOException {
+ unzipFile(zipFile, targetDirectory, NO_OP_STATUS_LISTENER);
+
+ }
+
+
+ public static void unzipFile(File zipFile, File targetDirectory,
+ StatusListener statusListener) throws IOException {
+
+ if (!targetDirectory.exists()) {
+ boolean mkdirs = targetDirectory.mkdirs();
+ if (!mkdirs) {
+ throw new IOException("Failed to create directory: " + targetDirectory);
+ }
+ }
+
+ try (ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile))) {
+
+ ZipEntry ze = zipInputStream.getNextEntry();
+
+ while (ze != null) {
+
+ if(ze.isDirectory()) {
+ ze = zipInputStream.getNextEntry();
+ continue;
+ }
+
+ statusListener.onUncompressStarted(ze);
+
+ String fileName = ze.getName();
+ File outputFile = new File(targetDirectory, fileName);
+
+
+ File parentDir = new File(outputFile.getParent());
+ if (!parentDir.exists()) {
+ boolean couldCreateParentDir = parentDir.mkdirs();
+ if (!couldCreateParentDir) {
+ throw new IllegalStateException("Could not create: " + parentDir);
+
+ }
+ }
+
+
+
+ Files.copy(zipInputStream, outputFile.toPath());
+
+ statusListener.onUncompressDone(ze);
+
+
+ ze = zipInputStream.getNextEntry();
+ }
+
+ zipInputStream.closeEntry();
+
+
+ }
+
+
+ }
+
+
+ public static void gzipFile(File inputFile, File gzipFile) throws IOException {
+
+ try (GZIPOutputStream gzos =
+ new GZIPOutputStream(new FileOutputStream(gzipFile))) {
+
+
+ Files.copy(inputFile.toPath(), gzos);
+
+ gzos.finish();
+ }
+ }
+
+
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
new file mode 100644
index 000000000000..83653f79f476
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+import java.io.File;
+
+public final class Constants {
+
+ static final File OUTPUT_DIR = new File("./NiFiTest/NiFiReadTest");
+
+ // NOTE: you will have to have the NiFi distribution ZIP placed into this directory.
+ // Its version must be the same as the one referenced in the flow.xml, otherwise it will not work!
+ static final File NIFI_ZIP_DIR = new File("../../nifi-assembly/target");
+
+ static final File FLOW_XML_FILE = new File(NiFiMockFlowTest.class.getResource("/flow.xml").getFile());
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
new file mode 100644
index 000000000000..484b78c69dc6
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+
+
+import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
+import org.apache.nifi.testharness.TestNiFiInstance;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test demonstrates how to mock the source data by starting a mock HTTP server (using Jetty)
+ * and rewriting the URL in flow definition.
+ */
+public class NiFiFlowTest {
+
+ private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
+ .updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion()
+ .setSingleProcessorProperty("GetHTTP", "URL", "http://localhost:12345")
+ .build();
+
+ // used by mocked GetHTTP; serves test data
+ private static Server testJettyServer;
+
+ private TestNiFiInstance testNiFiInstance;
+
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ NiFiFlowTest.testJettyServer = new Server(12345);
+
+
+ Handler handler = new TestHandler();
+ NiFiFlowTest.testJettyServer.setHandler(handler);
+ NiFiFlowTest.testJettyServer.start();
+ }
+
+
+ @Before
+ public void bootstrapNiFi() throws Exception {
+
+ if (Constants.OUTPUT_DIR.exists()) {
+ FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
+ }
+
+ File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
+
+ TestNiFiInstance testNiFi = TestNiFiInstance.builder()
+ .setNiFiBinaryDistributionZip(nifiZipFile)
+ .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
+ .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
+ .build();
+
+ testNiFi.install();
+ testNiFi.start();
+
+ // only assign testNiFi to the field in case it was started successfully
+ testNiFiInstance = testNiFi;
+ }
+
+ @Test
+ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
+
+ // We deleted the output directory: our NiFi flow should create it
+
+ assertTrue( "Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
+
+ File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
+
+ assertTrue("Output file not found: " + outputFile, outputFile.exists() );
+
+ List strings = Files.readAllLines(outputFile.toPath());
+
+ boolean atLeastOneLineContainsBBC = strings.stream().anyMatch(line -> line.toLowerCase().contains("bbc"));
+
+ assertTrue("There was no line containing BBC", atLeastOneLineContainsBBC);
+
+ boolean atLeastOneLineContainsIPhone = strings.stream().anyMatch(line -> line.toLowerCase().contains("iphone"));
+
+ assertTrue("There was no line containing IPhone", atLeastOneLineContainsIPhone);
+
+ }
+
+ @After
+ public void shutdownNiFi() {
+
+ if (testNiFiInstance != null) {
+ testNiFiInstance.stopAndCleanup();
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ NiFiFlowTest.testJettyServer.stop();
+ }
+
+
+ private static class TestHandler extends org.eclipse.jetty.server.handler.AbstractHandler {
+ @Override
+ public void handle(
+ String target,
+ Request baseRequest,
+ HttpServletRequest httpServletRequest,
+ HttpServletResponse response) throws IOException, ServletException {
+
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ baseRequest.setHandled(true);
+
+ InputStream resource = TestHandler.class.getResourceAsStream("/sample_technology_rss.xml");
+ ServletOutputStream outputStream = response.getOutputStream();
+
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = resource.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, len);
+ }
+ }
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
new file mode 100644
index 000000000000..20bfb736fbc8
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+
+import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
+import org.apache.nifi.testharness.TestNiFiInstance;
+import org.apache.nifi.testharness.samples.mock.GetHTTPMock;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test demonstrates how to mock the source data by mocking the processor
+ * itself in the flow definition.
+ */
+public class NiFiMockFlowTest {
+
+ private static final InputStream DEMO_DATA_AS_STREAM =
+ NiFiFlowTest.class.getResourceAsStream("/sample_technology_rss.xml");
+
+
+ // We have a dedicated class. It has to be public static
+ // so that NiFi engine can instantiate it.
+ public static class MockedGetHTTP extends GetHTTPMock {
+
+ public MockedGetHTTP() {
+ super("text/xml; charset=utf-8", () -> DEMO_DATA_AS_STREAM);
+ }
+ }
+
+
+ private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
+ .updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion()
+ .setClassOfSingleProcessor("GetHTTP", MockedGetHTTP.class)
+ .build();
+
+
+ private TestNiFiInstance testNiFiInstance;
+
+ @Before
+ public void bootstrapNiFi() throws Exception {
+
+ if (Constants.OUTPUT_DIR.exists()) {
+ FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
+ }
+
+ File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
+
+ TestNiFiInstance testNiFi = TestNiFiInstance.builder()
+ .setNiFiBinaryDistributionZip(nifiZipFile)
+ .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
+ .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
+ .build();
+
+ testNiFi.install();
+ testNiFi.start();
+
+ // only assign testNiFi to the field in case it was started successfully
+ testNiFiInstance = testNiFi;
+ }
+
+ @Test
+ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
+
+ // We deleted the output directory: our NiFi flow should create it
+
+ assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
+
+ File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
+
+ assertTrue("Output file not found: " + outputFile, outputFile.exists());
+
+ List strings = Files.readAllLines(outputFile.toPath());
+
+ boolean atLeastOneLineContainsBBC = strings.stream().anyMatch(line -> line.toLowerCase().contains("bbc"));
+
+ assertTrue("There was no line containing BBC", atLeastOneLineContainsBBC);
+
+ boolean atLeastOneLineContainsIPhone = strings.stream().anyMatch(line -> line.toLowerCase().contains("iphone"));
+
+ assertTrue("There was no line containing IPhone", atLeastOneLineContainsIPhone);
+
+ }
+
+ @After
+ public void shutdownNiFi() {
+
+ if (testNiFiInstance != null) {
+ testNiFiInstance.stopAndCleanup();
+ }
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
new file mode 100644
index 000000000000..affe66f53c68
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.samples;
+
+import java.io.File;
+
+final class TestUtils {
+
+ private TestUtils() {
+ // no instances allowed
+ }
+
+ static File getBinaryDistributionZipFile(File binaryDistributionZipDir) {
+
+ if (!binaryDistributionZipDir.exists()) {
+ throw new IllegalStateException("NiFi distribution ZIP file not found at the expected location: "
+ + binaryDistributionZipDir);
+ }
+
+ File[] files = binaryDistributionZipDir.listFiles((dir, name) ->
+ name.startsWith("nifi-") && name.endsWith("-bin.zip"));
+
+ if (files == null) {
+ throw new IllegalStateException(
+ "Not a directory or I/O error reading: " + binaryDistributionZipDir);
+ }
+
+ if (files.length == 0) {
+ throw new IllegalStateException(
+ "No NiFi distribution ZIP file is found in: " + binaryDistributionZipDir);
+ }
+
+ if (files.length > 1) {
+ throw new IllegalStateException(
+ "Multiple NiFi distribution ZIP files are found in: " + binaryDistributionZipDir);
+ }
+
+ return files[0];
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
new file mode 100644
index 000000000000..67055e497298
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples.mock;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class GetHTTPMock extends MockProcessor {
+
+ private final String contentType;
+ private final Supplier inputStreamSupplier;
+
+ public GetHTTPMock(String contentType, Supplier inputStreamSupplier) {
+ super("org.apache.nifi.processors.standard.GetHTTP");
+
+ this.contentType = contentType;
+ this.inputStreamSupplier = inputStreamSupplier;
+ }
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All files are transferred to the success relationship")
+ .build();
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory processSessionFactory) {
+
+ final ComponentLog logger = getLogger();
+
+ final StopWatch stopWatch = new StopWatch(true);
+
+ final ProcessSession session = processSessionFactory.createSession();
+
+ final String url = context.getProperty("URL").evaluateAttributeExpressions().getValue();
+ final URI uri;
+ String source = url;
+ try {
+ uri = new URI(url);
+ source = uri.getHost();
+ } catch (final URISyntaxException swallow) {
+ // this won't happen as the url has already been validated
+ }
+
+ FlowFile flowFile = session.create();
+
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty("Filename").evaluateAttributeExpressions().getValue());
+ flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source);
+ flowFile = session.importFrom(inputStreamSupplier.get(), flowFile);
+
+ flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), contentType);
+
+ final long flowFileSize = flowFile.getSize();
+ stopWatch.stop();
+ session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final String dataRate = stopWatch.calculateDataRate(flowFileSize);
+ logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
+ session.commit();
+
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
new file mode 100644
index 000000000000..cd62b2d35bd3
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.testharness.samples.mock;
+
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public abstract class MockProcessor implements Processor {
+
+ private final Processor delegate;
+ private ComponentLog logger;
+
+ protected MockProcessor(String delegateClassName) {
+ try {
+
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ final Class> delegateClass = Class.forName(delegateClassName, true, contextClassLoader);
+
+ delegate = (Processor) delegateClass.newInstance();
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException(e);
+ }
+
+
+ }
+
+ protected Processor getDelegate() {
+ return delegate;
+
+ }
+
+ protected final ComponentLog getLogger() {
+ return logger;
+ }
+
+ @Override
+ public void initialize(ProcessorInitializationContext processorInitializationContext) {
+ getDelegate().initialize(processorInitializationContext);
+ logger = processorInitializationContext.getLogger();
+ }
+
+ @Override
+ public Set getRelationships() {
+ return getDelegate().getRelationships();
+ }
+
+ @Override
+ public abstract void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory);
+
+ @Override
+ public Collection validate(ValidationContext validationContext) {
+ return getDelegate().validate(validationContext);
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(String s) {
+ return getDelegate().getPropertyDescriptor(s);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {
+ getDelegate().onPropertyModified(propertyDescriptor, s, s1);
+ }
+
+ @Override
+ public List getPropertyDescriptors() {
+ return getDelegate().getPropertyDescriptors();
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getDelegate().getIdentifier();
+ }
+}
diff --git a/nifi-testharness/src/test/resources/flow.xml b/nifi-testharness/src/test/resources/flow.xml
new file mode 100644
index 000000000000..66a1cf289347
--- /dev/null
+++ b/nifi-testharness/src/test/resources/flow.xml
@@ -0,0 +1,154 @@
+
+
+ 10
+ 5
+
+
+ 92b74849-0166-1000-28d3-4da912e34551
+ NiFi Flow
+
+
+
+ 92b9139c-0166-1000-04d5-1184adc0977a
+ PutFile
+
+
+
+ org.apache.nifi.processors.standard.PutFile
+
+ org.apache.nifi
+ nifi-standard-nar
+ 1.7.1
+
+ 1
+ 0 sec
+ 30 sec
+ 1 sec
+ WARN
+ false
+ RUNNING
+ TIMER_DRIVEN
+ ALL
+ 0
+
+ Directory
+ ./NiFiTest/NiFiReadTest
+
+
+ Conflict Resolution Strategy
+ ignore
+
+
+ Create Missing Directories
+ true
+
+
+ Maximum File Count
+
+
+ Last Modified Time
+
+
+ Permissions
+
+
+ Owner
+
+
+ Group
+
+ success
+ failure
+
+
+ 92b87553-0166-1000-527e-7ecdc888d91a
+ GetHTTP
+
+
+
+ org.apache.nifi.processors.standard.GetHTTP
+
+ org.apache.nifi
+ nifi-standard-nar
+ 1.7.1
+
+ 1
+ 0 sec
+ 30 sec
+ 1 sec
+ WARN
+ false
+ RUNNING
+ TIMER_DRIVEN
+ ALL
+ 0
+
+ URL
+ http://feeds.bbci.co.uk/news/technology/rss.xml?edition=uk#
+
+
+ Filename
+ bbc-world.rss.xml
+
+
+ SSL Context Service
+
+
+ Username
+
+
+ Password
+
+
+ Connection Timeout
+ 30 sec
+
+
+ Data Timeout
+ 30 sec
+
+
+ User Agent
+
+
+ Accept Content-Type
+
+
+ Follow Redirects
+ false
+
+
+ redirect-cookie-policy
+ default
+
+
+ proxy-configuration-service
+
+
+ Proxy Host
+
+
+ Proxy Port
+
+
+
+ 92b9380b-0166-1000-981d-c9e319f135e3
+
+
+ 1
+ 0
+ 92b87553-0166-1000-527e-7ecdc888d91a
+ 92b74849-0166-1000-28d3-4da912e34551
+ PROCESSOR
+ 92b9139c-0166-1000-04d5-1184adc0977a
+ 92b74849-0166-1000-28d3-4da912e34551
+ PROCESSOR
+ success
+ 10000
+ 1 GB
+ 0 sec
+
+
+
+
+
diff --git a/nifi-testharness/src/test/resources/logback-test.xml b/nifi-testharness/src/test/resources/logback-test.xml
new file mode 100644
index 000000000000..ab903af0702f
--- /dev/null
+++ b/nifi-testharness/src/test/resources/logback-test.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/nifi-testharness/src/test/resources/sample_technology_rss.xml b/nifi-testharness/src/test/resources/sample_technology_rss.xml
new file mode 100644
index 000000000000..74465657af39
--- /dev/null
+++ b/nifi-testharness/src/test/resources/sample_technology_rss.xml
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+
+ http://www.bbc.co.uk/news/
+
+ http://news.bbcimg.co.uk/nol/shared/img/bbc_news_120x60.gif
+ BBC News - Technology
+ http://www.bbc.co.uk/news/
+
+ RSS for Node
+ Fri, 29 Sep 2017 07:38:08 GMT
+
+
+ 15
+
+
+
+ http://www.bbc.co.uk/news/technology-41412560
+ http://www.bbc.co.uk/news/technology-41412560
+ Wed, 27 Sep 2017 16:37:39 GMT
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index cecfed27d226..649622edbc3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
nifi-mocknifi-nar-bundlesnifi-assembly
+ nifi-testharnessnifi-docsnifi-maven-archetypesnifi-external
From 20033de06a401b1e78a5ab9c62849eb412b842a5 Mon Sep 17 00:00:00 2001
From: "Peter G. Horvath"
Date: Sat, 24 Nov 2018 17:21:30 +0100
Subject: [PATCH 2/6] NIFI-5318 Implement NiFi test harness: replaced original
sample feed payload with synthetic content
---
.../testharness/samples/NiFiFlowTest.java | 12 ++++----
.../testharness/samples/NiFiMockFlowTest.java | 8 +++---
.../test/resources/sample_technology_rss.xml | 28 ++++++++-----------
3 files changed, 22 insertions(+), 26 deletions(-)
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
index 484b78c69dc6..9e13db797863 100644
--- a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
@@ -100,21 +100,21 @@ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
// We deleted the output directory: our NiFi flow should create it
- assertTrue( "Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
+ assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
- assertTrue("Output file not found: " + outputFile, outputFile.exists() );
+ assertTrue("Output file not found: " + outputFile, outputFile.exists());
List strings = Files.readAllLines(outputFile.toPath());
- boolean atLeastOneLineContainsBBC = strings.stream().anyMatch(line -> line.toLowerCase().contains("bbc"));
+ boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi"));
- assertTrue("There was no line containing BBC", atLeastOneLineContainsBBC);
+ assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi);
- boolean atLeastOneLineContainsIPhone = strings.stream().anyMatch(line -> line.toLowerCase().contains("iphone"));
+ boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version"));
- assertTrue("There was no line containing IPhone", atLeastOneLineContainsIPhone);
+ assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion);
}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
index 20bfb736fbc8..8329087666c5 100644
--- a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
@@ -99,13 +99,13 @@ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
List strings = Files.readAllLines(outputFile.toPath());
- boolean atLeastOneLineContainsBBC = strings.stream().anyMatch(line -> line.toLowerCase().contains("bbc"));
+ boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi"));
- assertTrue("There was no line containing BBC", atLeastOneLineContainsBBC);
+ assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi);
- boolean atLeastOneLineContainsIPhone = strings.stream().anyMatch(line -> line.toLowerCase().contains("iphone"));
+ boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version"));
- assertTrue("There was no line containing IPhone", atLeastOneLineContainsIPhone);
+ assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion);
}
diff --git a/nifi-testharness/src/test/resources/sample_technology_rss.xml b/nifi-testharness/src/test/resources/sample_technology_rss.xml
index 74465657af39..a95ba96442e2 100644
--- a/nifi-testharness/src/test/resources/sample_technology_rss.xml
+++ b/nifi-testharness/src/test/resources/sample_technology_rss.xml
@@ -3,26 +3,22 @@
-
-
- http://www.bbc.co.uk/news/
+ Sample Technology feed
+ Sample Technology feed
- http://news.bbcimg.co.uk/nol/shared/img/bbc_news_120x60.gif
- BBC News - Technology
- http://www.bbc.co.uk/news/
+ https://nifi.apache.org/assets/images/apache-nifi-logo.svg
+ NiFi sample
+ https://nifi.apache.org/
- RSS for Node
- Fri, 29 Sep 2017 07:38:08 GMT
-
-
+ en-gb15
-
-
- http://www.bbc.co.uk/news/technology-41412560
- http://www.bbc.co.uk/news/technology-41412560
- Wed, 27 Sep 2017 16:37:39 GMT
-
+ The latest NiFi version is out
+ The latest version of NiFi is released
+ https://nifi.apache.org/
+ https://nifi.apache.org/
+ Sat, 24 Sep 2018 17:10:10 GMT
+
\ No newline at end of file
From f82aec5c4ef28a8125170b1022faf268d14e7fce Mon Sep 17 00:00:00 2001
From: "Peter G. Horvath"
Date: Wed, 2 Jan 2019 21:56:56 +0100
Subject: [PATCH 3/6] NIFI-5318 Implement NiFi test harness: fixed test harness
run crash issue; better reporting of paths
---
nifi-testharness/pom.xml | 1 -
.../org/apache/nifi/testharness/samples/TestUtils.java | 8 ++++----
2 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/nifi-testharness/pom.xml b/nifi-testharness/pom.xml
index f331209aaf55..569ce4877acc 100644
--- a/nifi-testharness/pom.xml
+++ b/nifi-testharness/pom.xml
@@ -135,7 +135,6 @@
UTF-81.7.25
- 9.4.3.v20170317
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
index affe66f53c68..7d0b633f05bb 100644
--- a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
@@ -31,7 +31,7 @@ static File getBinaryDistributionZipFile(File binaryDistributionZipDir) {
if (!binaryDistributionZipDir.exists()) {
throw new IllegalStateException("NiFi distribution ZIP file not found at the expected location: "
- + binaryDistributionZipDir);
+ + binaryDistributionZipDir.getAbsolutePath());
}
File[] files = binaryDistributionZipDir.listFiles((dir, name) ->
@@ -39,17 +39,17 @@ static File getBinaryDistributionZipFile(File binaryDistributionZipDir) {
if (files == null) {
throw new IllegalStateException(
- "Not a directory or I/O error reading: " + binaryDistributionZipDir);
+ "Not a directory or I/O error reading: " + binaryDistributionZipDir.getAbsolutePath());
}
if (files.length == 0) {
throw new IllegalStateException(
- "No NiFi distribution ZIP file is found in: " + binaryDistributionZipDir);
+ "No NiFi distribution ZIP file is found in: " + binaryDistributionZipDir.getAbsolutePath());
}
if (files.length > 1) {
throw new IllegalStateException(
- "Multiple NiFi distribution ZIP files are found in: " + binaryDistributionZipDir);
+ "Multiple NiFi distribution ZIP files are found in: " + binaryDistributionZipDir.getAbsolutePath());
}
return files[0];
From 395bf5baa7fc829cdf154ac599f5a8268fd5343c Mon Sep 17 00:00:00 2001
From: "Peter G. Horvath"
Date: Sun, 13 Jan 2019 22:50:52 +0100
Subject: [PATCH 4/6] NIFI-5318 Implement NiFi test harness: added further
states where NiFi version can be queried
---
.../java/org/apache/nifi/testharness/TestNiFiInstance.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
index e821eb7063be..e8a1fc6f2d10 100644
--- a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
@@ -233,6 +233,10 @@ private TestNiFiInstance(File nifiBinaryZip, File flowXml, FlowFileEditorCallbac
String getNifiVersion() {
switch (currentState) {
case INSTALLED:
+ case FLOW_INSTALLED:
+ case STARTED:
+ case START_FAILED:
+ case STOP_FAILED:
case STOPPED:
return Objects.requireNonNull(nifiVersion, "nifiVersion is null");
From b0cdc31e3c30a8aeb638af4e11c6b0971bb5b66c Mon Sep 17 00:00:00 2001
From: "Peter G. Horvath"
Date: Thu, 17 Jan 2019 11:01:04 +0100
Subject: [PATCH 5/6] NIFI-5318 Implement NiFi test harness: fixed incorrect
class reference
---
.../org/apache/nifi/testharness/samples/NiFiMockFlowTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
index 8329087666c5..c5a7139dcdbe 100644
--- a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
@@ -43,7 +43,7 @@
public class NiFiMockFlowTest {
private static final InputStream DEMO_DATA_AS_STREAM =
- NiFiFlowTest.class.getResourceAsStream("/sample_technology_rss.xml");
+ NiFiMockFlowTest.class.getResourceAsStream("/sample_technology_rss.xml");
// We have a dedicated class. It has to be public static
From 09eacc149f656858590dbb1b0e72c80c19dd9a8b Mon Sep 17 00:00:00 2001
From: "Peter G. Horvath"
Date: Thu, 17 Jan 2019 11:14:55 +0100
Subject: [PATCH 6/6] NIFI-5318 Implement NiFi test harness: added type
parameter bounding to setClassOfSingleProcessor to prevent configuring
obviously incorrect classes
---
.../nifi/testharness/SimpleNiFiFlowDefinitionEditor.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
index d4f297d4c96d..27e6477552ea 100644
--- a/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
@@ -19,6 +19,7 @@
package org.apache.nifi.testharness;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.testharness.api.FlowFileEditorCallback;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
@@ -124,7 +125,7 @@ public Builder setSingleProcessorProperty(String processorName, String propertyN
}
- public Builder setClassOfSingleProcessor(String processorName, Class> mockProcessor) {
+ public
Builder setClassOfSingleProcessor(String processorName, Class