From 7916daf9e3fcdc5a396ae7815069c3d8bfa168d6 Mon Sep 17 00:00:00 2001
From: "Peter G. Horvath"
Date: Sat, 7 Jul 2018 10:48:30 +0200
Subject: [PATCH] NIFI-5318 Implement NiFi test harness: initial commit of
nifi-testharness
---
.../src/main/asciidoc/developer-guide.adoc | 26 +
.../NIFI_TESTHARNESS_README.txt | 3 +
nifi-testharness/pom.xml | 176 +++++++
.../SimpleNiFiFlowDefinitionEditor.java | 144 ++++++
.../nifi/testharness/TestNiFiInstance.java | 486 ++++++++++++++++++
.../api/FlowFileEditorCallback.java | 46 ++
.../nifi/testharness/util/FileUtils.java | 88 ++++
.../util/NiFiCoreLibClassLoader.java | 84 +++
.../nifi/testharness/util/XmlUtils.java | 56 ++
.../org/apache/nifi/testharness/util/Zip.java | 134 +++++
.../nifi/testharness/samples/Constants.java | 28 +
.../testharness/samples/NiFiFlowTest.java | 161 ++++++
.../testharness/samples/NiFiMockFlowTest.java | 127 +++++
.../nifi/testharness/samples/TestUtils.java | 47 ++
.../testharness/samples/mock/GetHTTPMock.java | 90 ++++
.../samples/mock/MockProcessor.java | 101 ++++
nifi-testharness/src/test/resources/flow.xml | 150 ++++++
.../src/test/resources/logback-test.xml | 15 +
.../test/resources/sample_technology_rss.xml | 28 +
pom.xml | 1 +
20 files changed, 1991 insertions(+)
create mode 100644 nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
create mode 100644 nifi-testharness/pom.xml
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
create mode 100644 nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
create mode 100644 nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
create mode 100644 nifi-testharness/src/test/resources/flow.xml
create mode 100644 nifi-testharness/src/test/resources/logback-test.xml
create mode 100644 nifi-testharness/src/test/resources/sample_technology_rss.xml
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index 97ac234089ff..08b81e70297a 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -2296,6 +2296,32 @@ threads that should be used to run the Processor can
be set via the `setThreadCount(int)` method.
+=== Experimental NiFi Flow test harness
+
+NiFi now has an experimental feature for full end-to-end testing of flows. This allows us
+to take a NiFi flow, install it to a test NiFi instance, run it and make Java unit test
+like asserts regarding its behaviour.
+
+The class `org.apache.nifi.test.TestNiFiInstance` is a thin wrapper that allows us
+to manipulate a NiFi installation and deploy a flow with some adjustments
+to its configuration, including changing processor properties and replacing processor
+classes with mocks.
+
+In order to add the necessary classes to your project,
+you can use the Maven dependency:
+
+[source]
+----
+
+ org.apache.nifi
+ nifi-testharness
+ ${nifi version}
+
+----
+
+For further documentation, please consult the JavaDoc of
+`org.apache.nifi.test.TestNiFiInstance`. For samples, please take a look at
+link:https://github.com/apache/nifi/tree/master/nifi-testharness/src/test/java/org/apache/testharness/samples[samples on GitHub^].
diff --git a/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
new file mode 100644
index 000000000000..e2d4da0bb453
--- /dev/null
+++ b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
@@ -0,0 +1,3 @@
+This directory is used to mimic NiFi's own home directory: the JVM hosting the
+TestNiFiInstance has to be started here. Once started, TestNiFiInstance then
+creates symlinks to the actual NiFi installation directory.
\ No newline at end of file
diff --git a/nifi-testharness/pom.xml b/nifi-testharness/pom.xml
new file mode 100644
index 000000000000..165cfbf11922
--- /dev/null
+++ b/nifi-testharness/pom.xml
@@ -0,0 +1,176 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi
+ 1.8.0-SNAPSHOT
+
+ nifi-testharness
+ A test harness for running NiFi flow tests
+ pom
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt
+ src/test/resources/sample_technology_rss.xml
+ src/test/resources/logback-test.xml
+ src/test/resources/flow.xml
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+ compile
+ testCompile
+
+
+
+
+ 1.8
+ 1.8
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.20.1
+
+ 1
+ false
+ nifi_testharness_nifi_home
+
+
+
+
+
+
+
+
+
+ UTF-8
+ 1.7.25
+ 9.4.3.v20170317
+
+
+
+
+ org.apache.nifi
+ nifi-runtime
+ ${project.version}
+ provided
+
+
+
+ org.apache.nifi
+ nifi-framework-api
+ ${project.version}
+ provided
+
+
+
+ org.apache.nifi
+ nifi-framework-core
+ ${project.version}
+ provided
+
+
+
+
+ org.apache.nifi
+ nifi-bootstrap
+ ${project.version}
+ provided
+
+
+
+ org.apache.nifi
+ nifi-api
+ ${project.version}
+ provided
+
+
+
+ javax.servlet
+ javax.servlet-api
+ 3.1.0
+
+
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty.version}
+ provided
+
+
+
+
+
+ org.easymock
+ easymock
+ 3.4
+ test
+
+
+
+
+ org.easymock
+ easymockclassextension
+ 3.2
+ test
+
+
+
+
+
+ org.powermock
+ powermock-api-easymock
+ 1.7.1
+ test
+
+
+
+
+ org.powermock
+ powermock-module-testng
+ 1.7.1
+ test
+
+
+
+ org.testng
+ testng
+ 6.11
+ test
+
+
+
+
+
+
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
new file mode 100644
index 000000000000..233113418190
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness;
+
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
+import java.util.LinkedList;
+
+
+/**
+ *
+ * A facility to describe simple, common changes to a NiFi flow before it is installed to the test
+ * NiFi instance. Intended to be used by
+ * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}
+ *
+ *
+ *
+ * The desired edits can be configured via the {@link Builder} object returned by the {@link #builder()}
+ * method. Once fully configured, the {@link Builder#build()} emits a {@code FlowFileEditorCallback}
+ * object that can be passed to
+ * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}.
+ *
+ *
+ *
+ * CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ *
+ *
+ * @see TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)
+ *
+ * @author Peter G. Horvath
+ */
+
+public final class SimpleNiFiFlowDefinitionEditor implements FlowFileEditorCallback {
+
+
+ private final LinkedList delegateActions;
+
+ private SimpleNiFiFlowDefinitionEditor(LinkedList delegateActions) {
+ this.delegateActions = delegateActions;
+ }
+
+ @Override
+ public Document edit(Document document) throws Exception {
+
+ for (FlowFileEditorCallback change : delegateActions) {
+ document = change.edit(document);
+ }
+
+ return document;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private Builder() {
+ // no external instance
+ }
+
+ XPath xpath = XPathFactory.newInstance().newXPath();
+ private final LinkedList actions = new LinkedList<>();
+
+ public Builder rawXmlChange(FlowFileEditorCallback flowFileEditorCallback) {
+ actions.addLast(flowFileEditorCallback);
+ return this;
+ }
+
+ public Builder setSingleProcessorProperty(String processorName, String propertyName, String newValue) {
+
+ return rawXmlChange(document -> {
+ String xpathString = "//processor[name/text() = '" + processorName
+ + "']/property[name/text() = '" + propertyName + "']/value";
+
+ Node propertyValueNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
+
+ if (propertyValueNode == null) {
+ throw new IllegalArgumentException("Reference to processor '"+ processorName +"' with property '"
+ + propertyName + "' not found: " + xpathString);
+ }
+
+ propertyValueNode.setTextContent(newValue);
+
+ return document;
+ });
+
+
+ }
+
+ public Builder setClassOfSingleProcessor(String processorName, Class> mockProcessor) {
+
+ return setClassOfSingleProcessor(processorName, mockProcessor.getName());
+ }
+
+ public Builder setClassOfSingleProcessor(String processorName, String newFullyQualifiedClassName) {
+
+ return rawXmlChange(document -> {
+ String xpathString = "//processor[name/text() = '" + processorName + "']/class";
+
+ Node classNameNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
+
+ if (classNameNode == null) {
+ throw new IllegalArgumentException("Reference to processor '"+ processorName +" not found: " +
+ xpathString);
+ }
+
+ classNameNode.setTextContent(newFullyQualifiedClassName);
+
+ return document;
+ });
+ }
+
+
+ public SimpleNiFiFlowDefinitionEditor build() {
+ return new SimpleNiFiFlowDefinitionEditor(actions);
+ }
+
+ }
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
new file mode 100644
index 000000000000..77c2ee819f90
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness;
+
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.apache.nifi.EmbeddedNiFi;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.apache.nifi.testharness.util.NiFiCoreLibClassLoader;
+import org.apache.nifi.testharness.util.XmlUtils;
+import org.apache.nifi.testharness.util.Zip;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+
+/**
+ *
+ * An API wrapper of a "test" NiFi instance to which a flow definition is installed for testing.
+ *
+ *
+ * Due to NiFi design restrictions, {@code TestNiFiInstance} has to take full command
+ * of the current working directory: it installs a full NiFi installation to there. To ensure
+ * this is desired, it will only run if the current directory is called
+ * "nifi_testharness_nifi_home". As such the JVM process has to be started inside a directory
+ * called "nifi_testharness_nifi_home" so that the following is true:
+ *
+ *
+ * new File(System.getProperty("user.dir")).getName().equals("nifi_testharness_nifi_home")
+ *
+ *
+ *
+ *
+ * Before {@code TestNiFiInstance} can be used, it has to be configured via its builder
+ * interface:
+ *
+ *
+ * {@link Builder#setFlowXmlToInstallForTesting(File)} specifies the location of the NiFi binary
+ * distribution ZIP file to be used.
+ *
+ *
+ * {@link Builder#setFlowXmlToInstallForTesting(File)} specifies the location of the NiFi flow
+ * to install.
+ *
+ *
+ * {@link Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)} allows on-the-fly
+ * changes to be performed to the Flow file before it is actually installed.
+ *
+ * If the current working directory is called "nifi_testharness_nifi_home", the caller can
+ * {@link #install()} this {@code TestNiFiInstance}, which will
+ *
+ *
+ * (as a first cleanup step) erase all content of the current working directory.
+ * (NOTE: this potentially destructive operation is the reason why we have the
+ * "nifi_testharness_nifi_home" directory name guard in place!)
+ *
+ *
+ * Extracts the contents of the NiFi binary distribution ZIP file specified in
+ * the configuration to a to a temporary directory.
+ *
+ * Symlinks all files from the temporary directory to the current working
+ * directory, causing the to hold a fully functional
+ * NiFi installation.
+ *
+ *
+ * Installs the flow definition files(s) to the NiFi instance specified in
+ * the configuration.
+ *
+ *
+ *
+ *
+ *
+ *
+ * The caller then can proceed to {@link #start()}} this {@code TestNiFiInstance},
+ * which will bootstrap the NiFi engine, which in turn will pick up and start processing
+ * the flow definition supplied by the caller in the configuration.
+ *
+ *
+ *
+ * Once the previous step is done, the caller can perform asserts regarding the observed behaviour
+ * of the NiFi flow, just like one would do it with standard Java test cases.
+ *
+ *
+ *
+ * To perform a clean shutdown of the hosted NiFi instance, the caller is required to call
+ * {@link #stopAndCleanup()}, which will shut down NiFi and remove all temporary files, including
+ * the symlinks created int current working directory.
+ *
+ *
+ *
+ *
NOTES
+ *
+ *
+ * {@code TestNiFiInstance} is NOT thread safe: if more than one thread uses it,
+ * external synchronisation is required.
+ *
+ *
+ * Only one {@code TestNiFiInstance} can be started in the same "nifi_testharness_nifi_home"
+ * directory at the same time.
+ *
+ *
+ * Currently, due to NiFi limitations, one {@TestNiFiInstance} can be started per JVM process.
+ * If multiple test cases are required, launch a new JVM process per test case
+ * (in sequence, see the point above).
+ *
+ *
+ *
+ *
+ * CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ *
+ *
+ *
+ * @see TestNiFiInstance#builder()
+ *
+ * @author Peter G. Horvath
+ *
+ */
+public class TestNiFiInstance {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestNiFiInstance.class);
+
+
+ private EmbeddedNiFi testNiFi;
+
+ private final File nifiHomeDir;
+ private final File bootstrapLibDir;
+
+ private File nifiProperties;
+
+ private final File flowXmlGz;
+
+ private final File placeholderNiFiHomeDir;
+
+
+ private enum State {
+ STOPPED,
+ STOP_FAILED,
+ START_FAILED(STOPPED),
+ STARTED(STOPPED, STOP_FAILED),
+ INSTALLATION_FAILED(),
+ INSTALLED(STARTED, START_FAILED),
+ CREATED(INSTALLED, INSTALLATION_FAILED);
+
+
+ private final Set allowedTransitions;
+
+ State(State... allowedTransitions) {
+ this.allowedTransitions = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedTransitions)));
+ }
+
+ private State checkCanTransition(State newState) {
+ if (!this.allowedTransitions.contains(newState)) {
+ throw new IllegalStateException("Cannot transition from " + this + " to " + newState);
+ }
+
+ return newState;
+ }
+ }
+
+ private State currentState = State.CREATED;
+
+ private final File nifiBinaryZip;
+ private final File flowXml;
+ private final FlowFileEditorCallback editCallback;
+
+ private TestNiFiInstance(File nifiBinaryZip, File flowXml, FlowFileEditorCallback editCallback) {
+ this.nifiBinaryZip = Objects.requireNonNull(nifiBinaryZip, "nifiBinaryZip");
+ this.flowXml = Objects.requireNonNull(flowXml, "flowXml");
+ this.editCallback = editCallback;
+
+ nifiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
+
+ final File configDir = new File(nifiHomeDir, "conf");
+ final File libDir = new File(nifiHomeDir, "lib");
+
+ bootstrapLibDir = new File(libDir, "bootstrap");
+
+ nifiProperties = new File(configDir, "nifi.properties");
+
+ flowXmlGz = new File(configDir, "flow.xml.gz");
+
+ placeholderNiFiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
+ }
+
+ public void install() throws IOException {
+
+ currentState.checkCanTransition(State.INSTALLED);
+
+ File[] staleInstallations = placeholderNiFiHomeDir.listFiles((dir, name) -> name.startsWith("nifi-"));
+ Arrays.stream(staleInstallations).forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
+
+ Path tempDirectory = null;
+ try {
+ tempDirectory = Files.createTempDirectory("installable-flow");
+
+ File installableFlowFile = createInstallableFlowFile(tempDirectory);
+
+ LOGGER.info("Uncompressing NiFi archive {} to {} ...", nifiBinaryZip, placeholderNiFiHomeDir);
+
+ Zip.unzipFile(nifiBinaryZip, placeholderNiFiHomeDir, new Zip.StatusListenerAdapter() {
+ @Override
+ public void onUncompressDone(ZipEntry ze) {
+ LOGGER.debug("Uncompressed {}", ze.getName());
+ }
+ });
+
+ LOGGER.info("Uncompressing DONE");
+
+ File actualNiFiHomeDir = getActualNiFiHomeDir(placeholderNiFiHomeDir);
+
+ FileUtils.createSymlinks(placeholderNiFiHomeDir, actualNiFiHomeDir);
+
+ installFlowFile(installableFlowFile);
+ } catch (Exception e) {
+
+ currentState = State.INSTALLATION_FAILED;
+
+ throw new RuntimeException("Installation failed: " + e.getMessage(), e);
+
+ } finally {
+ if (tempDirectory != null) {
+ FileUtils.deleteDirectoryRecursive(tempDirectory);
+ }
+ }
+
+ currentState = State.INSTALLED;
+ }
+
+ private File createInstallableFlowFile(Path tempDirectory) throws IOException {
+
+ File flowXmlFile = new File(tempDirectory.toFile(), "flow.xml");
+
+ if (editCallback == null) {
+ Files.copy(flowXml.toPath(), flowXmlFile.toPath());
+ } else {
+ XmlUtils.editXml(flowXml, flowXmlFile, editCallback);
+ }
+
+ return flowXmlFile;
+ }
+
+ private void installFlowFile(File fileToIncludeInGz) throws IOException {
+ Zip.gzipFile(fileToIncludeInGz, flowXmlGz);
+ }
+
+ public void start() {
+
+ currentState.checkCanTransition(State.STARTED);
+
+ try {
+ if (!bootstrapLibDir.exists()) {
+ throw new IllegalStateException("Not found: " + bootstrapLibDir);
+ }
+
+
+
+ System.setProperty("org.apache.jasper.compiler.disablejsr199", "true");
+ System.setProperty("java.security.egd", "file:/dev/urandom");
+ System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
+ System.setProperty("java.net.preferIPv4Stack", "true");
+ System.setProperty("java.awt.headless", "true");
+ System.setProperty("java.protocol.handler.pkgs", "sun.net.www.protocol");
+
+ System.setProperty("nifi.properties.file.path", nifiProperties.getAbsolutePath());
+ System.setProperty("app", "NiFi");
+ System.setProperty("org.apache.nifi.bootstrap.config.log.dir", "./logs");
+
+ ClassLoader coreClassLoader = new NiFiCoreLibClassLoader(nifiHomeDir, ClassLoader.getSystemClassLoader());
+ Thread.currentThread().setContextClassLoader(coreClassLoader);
+
+
+
+ this.testNiFi = new EmbeddedNiFi(new String[0], coreClassLoader);
+
+ } catch (Exception ex) {
+
+ currentState = State.START_FAILED;
+
+ throw new RuntimeException("Startup failed", ex);
+
+ }
+
+ currentState = State.STARTED;
+
+
+ }
+
+
+ public void stopAndCleanup() {
+ currentState.checkCanTransition(State.STOPPED);
+
+ try {
+ testNiFi.shutdown();
+
+ removeNiFiFilesCreatedForTemporaryInstallation(placeholderNiFiHomeDir);
+
+ } catch (Exception e) {
+ currentState = State.STOP_FAILED;
+
+ throw new RuntimeException(e);
+ }
+
+ currentState = State.STOPPED;
+ }
+
+ private static File requireCurrentWorkingDirectoryIsCorrect() {
+
+ File currentWorkDir = new File(System.getProperty("user.dir"));
+ if (!currentWorkDir.getName().equals("nifi_testharness_nifi_home")) {
+
+ throw new IllegalStateException(
+ "The test's working directory has to be set to nifi_testharness_nifi_home, but was: " + currentWorkDir);
+ }
+ return currentWorkDir;
+ }
+
+ private static File getActualNiFiHomeDir(File currentDir) {
+ File[] files = currentDir.listFiles((dir, name) -> name.startsWith("nifi-"));
+
+ if (files.length == 0) {
+ throw new IllegalStateException(
+ "No \"nifi-*\" directory found in temporary NiFi home directory container: " + currentDir);
+ }
+
+ if (files.length != 1) {
+ throw new IllegalStateException(
+ "Multiple \"nifi-*\" directories found in temporary NiFi home directory container: " + currentDir);
+ }
+
+ return files[0];
+ }
+
+ private static void removeNiFiFilesCreatedForTemporaryInstallation(File directoryToClear) {
+
+ if (directoryToClear != null) {
+ Arrays.stream(directoryToClear.listFiles())
+ .filter(file -> !"NIFI_TESTHARNESS_README.txt".equals(file.getName()))
+ .forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
+ }
+ }
+
+ private static void deleteFileOrDirectoryRecursively(File file) {
+ if (file.isDirectory()) {
+ FileUtils.deleteDirectoryRecursive(file);
+ } else {
+ file.delete();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "NiFi test instance(" + Integer.toHexString(hashCode())
+ + ") state: " + currentState + ", home: " + nifiHomeDir;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+
+ public static class Builder {
+
+ private boolean isDisposed = false;
+
+ private File nifiBinaryZip;
+ private File flowXml;
+ private FlowFileEditorCallback editCallback;
+
+ /**
+ * Sets the location of the NiFi binary distribution file, from which the test instance
+ * will be uncompressed and built.
+ *
+ * @param nifiBinaryZip
+ * the NiFi binary distribution file, from which the test instance will be built (never {@code null})
+ * @return {@code this} (for method chaining)
+ */
+ public Builder setNiFiBinaryDistributionZip(File nifiBinaryZip) {
+ if (!nifiBinaryZip.exists()) {
+ throw new IllegalArgumentException("File not found: " + nifiBinaryZip);
+ }
+
+ if (nifiBinaryZip.isDirectory()) {
+ throw new IllegalArgumentException("A ZIP file is expected to be specified, not a directory: "
+ + nifiBinaryZip);
+ }
+
+ this.nifiBinaryZip = nifiBinaryZip;
+ return this;
+ }
+
+ /**
+ * Sets the NiFi flow XML, which will be installed to the NiFi instance for testing.
+ *
+ * @param flowXml the NiFi flow file to install to the test instance for testing (never {@code null})
+ *
+ * @return {@code this} (for method chaining)
+ */
+ public Builder setFlowXmlToInstallForTesting(File flowXml) {
+ if (!flowXml.exists()) {
+ throw new IllegalArgumentException("File not found: " + flowXml);
+ }
+
+ this.flowXml = flowXml;
+ return this;
+ }
+
+ /**
+ *
+ * An optional callback to change the flow definition read from
+ * {@link #setFlowXmlToInstallForTesting(File)}, before it is actually installed for testing.
+ * (NOTE: The original file remains unchanged: changes are applied to a copy of it.)
+ *
+ *
+ * NOTE: {@link SimpleNiFiFlowDefinitionEditor} provides various common flow definition changes
+ * useful for testing.
+ *
+ *
+ * @param callback an optional callback to change the flow definition
+ *
+ * @return {@code this} (for method chaining)
+ *
+ * @see SimpleNiFiFlowDefinitionEditor
+ */
+ public Builder modifyFlowXmlBeforeInstalling(FlowFileEditorCallback callback) {
+ this.editCallback = callback;
+ return this;
+ }
+
+
+
+ public TestNiFiInstance build() {
+ if (isDisposed) {
+ throw new IllegalStateException("builder can only be used once");
+ }
+ isDisposed = true;
+
+ return new TestNiFiInstance(nifiBinaryZip, flowXml, editCallback);
+ }
+
+
+ }
+
+
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
new file mode 100644
index 000000000000..43ba1b530c95
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.api;
+
+import org.w3c.dom.Document;
+
+/**
+ *
+ * An interface that allows programmatic access to the contents of a NiFi Flow XML,
+ * allowing changes to be performed before it
+ * is actually installed to the NiFi instance.
+ *
+ *
+ * CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!
+ * Efforts will be made to retain backwards API compatibility, but
+ * no guarantee is given.
+ *
+ *
+ */
+public interface FlowFileEditorCallback {
+
+ /**
+ *
+ * @param document the document to change document (never {@code null})
+ * @return the changed document (never {@code null})
+ * @throws Exception in case the editing fails
+ */
+ Document edit(Document document) throws Exception;
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
new file mode 100644
index 000000000000..e207f089cc8c
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+
+public final class FileUtils {
+
+
+ private static final String MAC_DS_STORE_NAME = ".DS_Store";
+
+ private FileUtils() {
+ // no instances
+ }
+
+ public static void deleteDirectoryRecursive(Path directory) throws IOException {
+ Files.walkFileTree(directory, new SimpleFileVisitor() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ public static void deleteDirectoryRecursive(File dir) {
+ try {
+ deleteDirectoryRecursive(dir.toPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void createLink(Path newLink, Path existingFile) {
+ try {
+ Files.createSymbolicLink(newLink, existingFile);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void createSymlinks(File newLinkDir, File existingDir) {
+ Arrays.stream(existingDir.list())
+ .filter(fileName -> !MAC_DS_STORE_NAME.equals(fileName))
+ .forEach(fileName -> {
+ Path newLink = Paths.get(newLinkDir.getAbsolutePath(), fileName);
+ Path existingFile = Paths.get(existingDir.getAbsolutePath(), fileName);
+
+ File symlinkFile = newLink.toFile();
+ if (symlinkFile.exists()) {
+ symlinkFile.delete();
+ }
+
+ createLink(newLink, existingFile);
+ });
+ }
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
new file mode 100644
index 000000000000..a3af3634e5d6
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public final class NiFiCoreLibClassLoader extends URLClassLoader {
+
+
+ public NiFiCoreLibClassLoader(File nifiHomeDir, ClassLoader parent) {
+ super(getURls(nifiHomeDir), parent);
+ }
+
+ private static URL[] getURls(File nifiHomeDir) {
+
+ try {
+ File libDir = new File(nifiHomeDir, "lib");
+ File bootstrapLibDir = new File(libDir, "bootstrap");
+
+
+ List libs = Files.list(libDir.toPath())
+ .filter(NiFiCoreLibClassLoader::isJarOrNarFile)
+ .map(NiFiCoreLibClassLoader::toURL)
+ .collect(Collectors.toList());
+ List bootstrapLibs = Files.list(bootstrapLibDir.toPath())
+ .filter(NiFiCoreLibClassLoader::isJarOrNarFile)
+ .map(NiFiCoreLibClassLoader::toURL)
+ .collect(Collectors.toList());
+
+ LinkedList urls = new LinkedList<>();
+ urls.addAll(libs);
+ urls.addAll(bootstrapLibs);
+
+ return urls.toArray(new URL[urls.size()]);
+ } catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+
+
+ }
+
+ private static URL toURL(Path path) {
+ try {
+ return path.toUri().toURL();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private static boolean isJarOrNarFile(Path path) {
+ String fullPathString = path.getFileName().toString();
+
+ return path.toFile().isFile() && fullPathString.endsWith(".jar");
+ }
+
+
+}
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
new file mode 100644
index 000000000000..4442e7a1071a
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+import org.apache.nifi.testharness.api.FlowFileEditorCallback;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.File;
+import java.io.FileInputStream;
+
+public final class XmlUtils {
+
+ public static void editXml(File inputFile, File outputFile, FlowFileEditorCallback editCallback) {
+
+ try(FileInputStream inputStream = new FileInputStream(inputFile)) {
+
+ DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder documentBuilder = documentBuilderFactory.newDocumentBuilder();
+ Document document = documentBuilder.parse(new InputSource(inputStream));
+
+ document = editCallback.edit(document);
+
+ // save the result
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ Transformer transformer = transformerFactory.newTransformer();
+ transformer.transform(new DOMSource(document), new StreamResult(outputFile));
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to change XML document: " + e.getMessage(), e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
new file mode 100644
index 000000000000..12ea4032c1f6
--- /dev/null
+++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.util;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public final class Zip {
+
+ private Zip() {
+ // no external instances allowed
+ }
+
+
+ public interface StatusListener {
+ void onUncompressStarted(ZipEntry ze);
+
+ void onUncompressDone(ZipEntry ze);
+ }
+
+ public static class StatusListenerAdapter implements StatusListener {
+
+ @Override
+ public void onUncompressStarted(ZipEntry ze) {
+
+ }
+
+ @Override
+ public void onUncompressDone(ZipEntry ze) {
+
+ }
+ }
+
+ private static final StatusListener NO_OP_STATUS_LISTENER = new StatusListenerAdapter();
+
+ public static void unzipFile(File zipFile, File targetDirectory) throws IOException {
+ unzipFile(zipFile, targetDirectory, NO_OP_STATUS_LISTENER);
+
+ }
+
+
+ public static void unzipFile(File zipFile, File targetDirectory,
+ StatusListener statusListener) throws IOException {
+
+ if (!targetDirectory.exists()) {
+ boolean mkdirs = targetDirectory.mkdirs();
+ if (!mkdirs) {
+ throw new IOException("Failed to create directory: " + targetDirectory);
+ }
+ }
+
+ try (ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile))) {
+
+ ZipEntry ze = zipInputStream.getNextEntry();
+
+ while (ze != null) {
+
+ if(ze.isDirectory()) {
+ ze = zipInputStream.getNextEntry();
+ continue;
+ }
+
+ statusListener.onUncompressStarted(ze);
+
+ String fileName = ze.getName();
+ File outputFile = new File(targetDirectory, fileName);
+
+
+ File parentDir = new File(outputFile.getParent());
+ if (!parentDir.exists()) {
+ boolean couldCreateParentDir = parentDir.mkdirs();
+ if (!couldCreateParentDir) {
+ throw new IllegalStateException("Could not create: " + parentDir);
+
+ }
+ }
+
+
+
+ Files.copy(zipInputStream, outputFile.toPath());
+
+ statusListener.onUncompressDone(ze);
+
+
+ ze = zipInputStream.getNextEntry();
+ }
+
+ zipInputStream.closeEntry();
+
+
+ }
+
+
+ }
+
+
+ public static void gzipFile(File inputFile, File gzipFile) throws IOException {
+
+ try (GZIPOutputStream gzos =
+ new GZIPOutputStream(new FileOutputStream(gzipFile))) {
+
+
+ Files.copy(inputFile.toPath(), gzos);
+
+ gzos.finish();
+ }
+ }
+
+
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
new file mode 100644
index 000000000000..05367c55e2f2
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+import java.io.File;
+
+public final class Constants {
+
+ static final File OUTPUT_DIR = new File("./NiFiTest/NiFiReadTest");
+ static final File NIFI_ZIP_DIR = new File("../../nifi-assembly/target");
+ static final File FLOW_XML_FILE = new File(NiFiMockFlowTest.class.getResource("/flow.xml").getFile());
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
new file mode 100644
index 000000000000..69d9909d2bc9
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+
+
+import org.apache.nifi.testharness.TestNiFiInstance;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * This test demonstrates how to mock the source data by starting a mock HTTP server (using Jetty)
+ * and rewrite the URL in flow definition.
+ *
+ * NOTE: this test is disabled for now: see @Test(enabled = false)
+ */
+public class NiFiFlowTest {
+
+ private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
+ .setSingleProcessorProperty("GetHTTP", "URL", "http://localhost:12345")
+ .build();
+
+ // used by mocked GetHTTP; serves test data
+ private static Server testJettyServer;
+
+ private final TestNiFiInstance testNiFiInstance;
+
+
+ public NiFiFlowTest() {
+
+ File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
+
+ this.testNiFiInstance = TestNiFiInstance.builder()
+ .setNiFiBinaryDistributionZip(nifiZipFile)
+ .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
+ .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
+ .build();
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ NiFiFlowTest.testJettyServer = new Server(12345);
+
+
+ Handler handler = new TestHandler();
+ NiFiFlowTest.testJettyServer.setHandler(handler);
+ NiFiFlowTest.testJettyServer.start();
+ }
+
+
+ @BeforeMethod
+ public void bootstrapNiFi() throws Exception {
+
+ if (Constants.OUTPUT_DIR.exists()) {
+ FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
+ }
+
+ testNiFiInstance.install();
+ testNiFiInstance.start();
+ }
+
+ // test disabled for now: apparenlt NiFi does not like being started
+ // multiple times in the same JVM
+ @Test
+ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
+
+ // We deleted the output directory: our NiFi flow should create it
+
+ assertTrue(Constants.OUTPUT_DIR.exists(), "Output directory not found: " + Constants.OUTPUT_DIR);
+
+ File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
+
+ assertTrue(outputFile.exists(), "Output file not found: " + outputFile);
+
+ List strings = Files.readAllLines(outputFile.toPath());
+
+ boolean atLeastOneLineContainsBBC = strings.stream().anyMatch(line -> line.toLowerCase().contains("bbc"));
+
+ assertTrue(atLeastOneLineContainsBBC, "There was no line containing BBC");
+
+ boolean atLeastOneLineContainsIPhone = strings.stream().anyMatch(line -> line.toLowerCase().contains("iphone"));
+
+ assertTrue(atLeastOneLineContainsIPhone, "There was no line containing IPhone");
+
+ }
+
+ @AfterMethod
+ public void shutdownNiFi() throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+
+ testNiFiInstance.stopAndCleanup();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ NiFiFlowTest.testJettyServer.stop();
+ }
+
+
+ private static class TestHandler extends org.eclipse.jetty.server.handler.AbstractHandler {
+ @Override
+ public void handle(
+ String target,
+ Request baseRequest,
+ HttpServletRequest httpServletRequest,
+ HttpServletResponse response) throws IOException, ServletException {
+
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ baseRequest.setHandled(true);
+
+ InputStream resource = TestHandler.class.getResourceAsStream("/sample_technology_rss.xml");
+ ServletOutputStream outputStream = response.getOutputStream();
+
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = resource.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, len);
+ }
+ }
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
new file mode 100644
index 000000000000..9f8965e56097
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples;
+
+
+import org.apache.nifi.testharness.TestNiFiInstance;
+import org.apache.nifi.testharness.samples.mock.GetHTTPMock;
+import org.apache.nifi.testharness.util.FileUtils;
+import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * This test demonstrates how to mock the source data by mocking the processor
+ * itself in the flow definition.
+ */
+public class NiFiMockFlowTest {
+
+ private static final InputStream DEMO_DATA_AS_STREAM =
+ NiFiFlowTest.class.getResourceAsStream("/sample_technology_rss.xml");
+
+
+ // We have a dedicated class. It has to be public static
+ // so that NiFi engine can instantiate it.
+ public static class MockedGetHTTP extends GetHTTPMock {
+
+ public MockedGetHTTP() {
+ super("text/xml; charset=utf-8", () -> DEMO_DATA_AS_STREAM);
+ }
+ }
+
+
+ private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
+ .setClassOfSingleProcessor("GetHTTP", MockedGetHTTP.class)
+ .build();
+
+
+ private final TestNiFiInstance testNiFiInstance;
+
+
+ public NiFiMockFlowTest() {
+ try {
+ if (!Constants.NIFI_ZIP_DIR.exists()) {
+ throw new IllegalStateException("NiFi distribution ZIP file not found at the expected location: "
+ + Constants.NIFI_ZIP_DIR.getCanonicalPath());
+ }
+
+ File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
+
+ this.testNiFiInstance = TestNiFiInstance.builder()
+ .setNiFiBinaryDistributionZip(nifiZipFile)
+ .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
+ .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
+ .build();
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @BeforeMethod
+ public void bootstrapNiFi() throws Exception {
+
+ if (Constants.OUTPUT_DIR.exists()) {
+ FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
+ }
+
+ testNiFiInstance.install();
+ testNiFiInstance.start();
+ }
+
+ @Test
+ public void testFlowCreatesFilesInCorrectLocation() throws IOException {
+
+ // We deleted the output directory: our NiFi flow should create it
+
+ assertTrue(Constants.OUTPUT_DIR.exists(), "Output directory not found: " + Constants.OUTPUT_DIR);
+
+ File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
+
+ assertTrue(outputFile.exists(), "Output file not found: " + outputFile);
+
+ List strings = Files.readAllLines(outputFile.toPath());
+
+ boolean atLeastOneLineContainsBBC = strings.stream().anyMatch(line -> line.toLowerCase().contains("bbc"));
+
+ assertTrue(atLeastOneLineContainsBBC, "There was no line containing BBC");
+
+ boolean atLeastOneLineContainsIPhone = strings.stream().anyMatch(line -> line.toLowerCase().contains("iphone"));
+
+ assertTrue(atLeastOneLineContainsIPhone, "There was no line containing IPhone");
+
+ }
+
+ @AfterMethod
+ public void shutdownNiFi() throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+
+ testNiFiInstance.stopAndCleanup();
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
new file mode 100644
index 000000000000..afc046dc15f9
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.nifi.testharness.samples;
+
+import java.io.File;
+
+final class TestUtils {
+
+ private TestUtils() {
+ // no instances allowed
+ }
+
+ static File getBinaryDistributionZipFile(File binaryDistributionZipDir) {
+
+ File[] files = binaryDistributionZipDir.listFiles((dir, name) ->
+ name.startsWith("nifi-") && name.endsWith("-bin.zip"));
+
+ if (files.length == 0) {
+ throw new IllegalStateException(
+ "No NiFi distribution ZIP file is found in: " + binaryDistributionZipDir);
+ }
+
+ if (files.length != 1) {
+ throw new IllegalStateException(
+ "MultipleNiFi distribution ZIP files are found in: " + binaryDistributionZipDir);
+ }
+
+ return files[0];
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
new file mode 100644
index 000000000000..67055e497298
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.testharness.samples.mock;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class GetHTTPMock extends MockProcessor {
+
+ private final String contentType;
+ private final Supplier inputStreamSupplier;
+
+ public GetHTTPMock(String contentType, Supplier inputStreamSupplier) {
+ super("org.apache.nifi.processors.standard.GetHTTP");
+
+ this.contentType = contentType;
+ this.inputStreamSupplier = inputStreamSupplier;
+ }
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All files are transferred to the success relationship")
+ .build();
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory processSessionFactory) {
+
+ final ComponentLog logger = getLogger();
+
+ final StopWatch stopWatch = new StopWatch(true);
+
+ final ProcessSession session = processSessionFactory.createSession();
+
+ final String url = context.getProperty("URL").evaluateAttributeExpressions().getValue();
+ final URI uri;
+ String source = url;
+ try {
+ uri = new URI(url);
+ source = uri.getHost();
+ } catch (final URISyntaxException swallow) {
+ // this won't happen as the url has already been validated
+ }
+
+ FlowFile flowFile = session.create();
+
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty("Filename").evaluateAttributeExpressions().getValue());
+ flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source);
+ flowFile = session.importFrom(inputStreamSupplier.get(), flowFile);
+
+ flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), contentType);
+
+ final long flowFileSize = flowFile.getSize();
+ stopWatch.stop();
+ session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final String dataRate = stopWatch.calculateDataRate(flowFileSize);
+ logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
+ session.commit();
+
+ }
+}
diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
new file mode 100644
index 000000000000..cd62b2d35bd3
--- /dev/null
+++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.testharness.samples.mock;
+
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public abstract class MockProcessor implements Processor {
+
+ private final Processor delegate;
+ private ComponentLog logger;
+
+ protected MockProcessor(String delegateClassName) {
+ try {
+
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ final Class> delegateClass = Class.forName(delegateClassName, true, contextClassLoader);
+
+ delegate = (Processor) delegateClass.newInstance();
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException(e);
+ }
+
+
+ }
+
+ protected Processor getDelegate() {
+ return delegate;
+
+ }
+
+ protected final ComponentLog getLogger() {
+ return logger;
+ }
+
+ @Override
+ public void initialize(ProcessorInitializationContext processorInitializationContext) {
+ getDelegate().initialize(processorInitializationContext);
+ logger = processorInitializationContext.getLogger();
+ }
+
+ @Override
+ public Set getRelationships() {
+ return getDelegate().getRelationships();
+ }
+
+ @Override
+ public abstract void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory);
+
+ @Override
+ public Collection validate(ValidationContext validationContext) {
+ return getDelegate().validate(validationContext);
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(String s) {
+ return getDelegate().getPropertyDescriptor(s);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {
+ getDelegate().onPropertyModified(propertyDescriptor, s, s1);
+ }
+
+ @Override
+ public List getPropertyDescriptors() {
+ return getDelegate().getPropertyDescriptors();
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getDelegate().getIdentifier();
+ }
+}
diff --git a/nifi-testharness/src/test/resources/flow.xml b/nifi-testharness/src/test/resources/flow.xml
new file mode 100644
index 000000000000..dca17b3cecec
--- /dev/null
+++ b/nifi-testharness/src/test/resources/flow.xml
@@ -0,0 +1,150 @@
+
+
+ 10
+ 5
+
+ b895989d-015e-1000-6ace-31c53901f9e7
+ NiFi Flow
+
+
+
+ bd4ba8b0-015e-1000-d3ee-28f5484892b4
+ GetHTTP
+
+
+
+ org.apache.nifi.processors.standard.GetHTTP
+
+ org.apache.nifi
+ nifi-standard-nar
+ 1.8.0
+
+ 1
+ 0 sec
+ 30 sec
+ 1 sec
+ WARN
+ false
+ RUNNING
+ TIMER_DRIVEN
+ ALL
+ 0
+
+ URL
+ http://feeds.bbci.co.uk/news/technology/rss.xml?edition=uk#
+
+
+ Filename
+ bbc-world.rss.xml
+
+
+ SSL Context Service
+
+
+ Username
+
+
+ Password
+
+
+ Connection Timeout
+ 30 sec
+
+
+ Data Timeout
+ 30 sec
+
+
+ User Agent
+
+
+ Accept Content-Type
+
+
+ Follow Redirects
+ false
+
+
+ redirect-cookie-policy
+ default
+
+
+ Proxy Host
+
+
+ Proxy Port
+
+
+
+ bd4d98aa-015e-1000-72cb-f10194e94546
+ PutFile
+
+
+
+ org.apache.nifi.processors.standard.PutFile
+
+ org.apache.nifi
+ nifi-standard-nar
+ 1.8.0
+
+ 1
+ 0 sec
+ 30 sec
+ 1 sec
+ WARN
+ false
+ RUNNING
+ TIMER_DRIVEN
+ ALL
+ 0
+
+ Directory
+ ./NiFiTest/NiFiReadTest
+
+
+ Conflict Resolution Strategy
+ ignore
+
+
+ Create Missing Directories
+ true
+
+
+ Maximum File Count
+
+
+ Last Modified Time
+
+
+ Permissions
+
+
+ Owner
+
+
+ Group
+
+ success
+ failure
+
+
+ bd4e54a4-015e-1000-3b0f-19d24756ce1c
+
+
+ 1
+ 0
+ bd4ba8b0-015e-1000-d3ee-28f5484892b4
+ b895989d-015e-1000-6ace-31c53901f9e7
+ PROCESSOR
+ bd4d98aa-015e-1000-72cb-f10194e94546
+ b895989d-015e-1000-6ace-31c53901f9e7
+ PROCESSOR
+ success
+ 10000
+ 1 GB
+ 0 sec
+
+
+
+
+
diff --git a/nifi-testharness/src/test/resources/logback-test.xml b/nifi-testharness/src/test/resources/logback-test.xml
new file mode 100644
index 000000000000..ab903af0702f
--- /dev/null
+++ b/nifi-testharness/src/test/resources/logback-test.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/nifi-testharness/src/test/resources/sample_technology_rss.xml b/nifi-testharness/src/test/resources/sample_technology_rss.xml
new file mode 100644
index 000000000000..74465657af39
--- /dev/null
+++ b/nifi-testharness/src/test/resources/sample_technology_rss.xml
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+
+ http://www.bbc.co.uk/news/
+
+ http://news.bbcimg.co.uk/nol/shared/img/bbc_news_120x60.gif
+ BBC News - Technology
+ http://www.bbc.co.uk/news/
+
+ RSS for Node
+ Fri, 29 Sep 2017 07:38:08 GMT
+
+
+ 15
+
+
+
+ http://www.bbc.co.uk/news/technology-41412560
+ http://www.bbc.co.uk/news/technology-41412560
+ Wed, 27 Sep 2017 16:37:39 GMT
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b1f52503550f..76782936a69a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
nifi-mocknifi-nar-bundlesnifi-assembly
+ nifi-testharnessnifi-docsnifi-maven-archetypesnifi-external