Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-12959: Support loading python processors from NARs #8573

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.nifi.util;

import org.apache.nifi.properties.ApplicationProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -36,9 +40,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.properties.ApplicationProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The NiFiProperties class holds all properties which are needed for various
Expand Down
88 changes: 54 additions & 34 deletions nifi-docs/src/main/asciidoc/python-developer-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -516,37 +516,6 @@ environment for each Processor implementation (not for each instance of a Proces
dependencies in that environment.


[[deploying]]
== Deploying a Developed Processor

Once a Processor has been developed, it can be made available in NiFi by copying the source of the Python extension to the `$NIFI_HOME/python/extensions` directory by default.
The actual directory to look for extensions can be configured in `nifi.properties` via properties that have the prefix `nifi.python.extensions.source.directory.`.
For example, by default, `nifi.python.extensions.source.directory.default` is set to `./python/extensions`. However, additional paths may be added by replacing `default`
in the property name with some other value.

Any `.py` file found in the directory will be parsed and examined in order to determine whether or not it is a valid NiFi Processor.
In order to be found, the Processor must have a valid parent (`FlowFileTransform` or `RecordTransform`) and must have an inner class named `Java`
with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`.
This will allow NiFi to automatically discover the Processor.

Note, however, that if the Processor implementation is broken into multiple Python modules, those modules will not be made available by default. In order
to package a Processor along with its modules, the Processor and any related module must be added to a directory that is directly below the Extensions directory.
For example, if the `WriteNumber.py` file contains a NiFi Processor and also depends on the `ProcessorUtil.py` module, the directory structure would look like this:
----
NIFI_HOME/
- python/
- extensions/
ProcessorA.py
ProcessorB.py
write-number/
__init__.py
ProcessorUtils.py
WriteNumber.py
----
By packaging them together in a subdirectory, NiFi knows to expose the modules to one another. However, the ProcessorA module will have no access
to the `ProcessorUtils` module. Only `WriteNumber` will have access to it.


[[reloading]]
== Processor Reloading

Expand Down Expand Up @@ -623,15 +592,66 @@ Here, we accept any version of `pandas` (though the latest is preferred), and we
[[dependency-isolation]]
=== Dependency Isolation

On startup, NiFi will create a separate Python env (venv) for each Processor implementation and will use `pip` to install
the specified dependencies from PyPI only into the appropriate Python environment for that Processor. Therefore, dependencies of one
Processor are not made available to another Processor.
The first time that a user creates a NiFi Processor of a given type, NiFi will create a separate Python env (venv) for the Processor.
It will use `pip` to install the specified dependencies from PyPI only into the appropriate Python environment for that Processor.
Therefore, dependencies of one Processor are not made available to another Processor.

Beyond that, dependencies of one version of a Processor are not made available to other versions of the Processor. So, for example,
if we have two different versions of the same Processor made available, version `0.0.1` and version `0.0.2`, the dependencies that are
necessary for version `0.0.1` will not be made available to version `0.0.2` unless version `0.0.2` of the Processor also declares
those dependencies.

Some environments, however, cannot make use of `pip` for package management. In an air-gapped environment, for example, or in
environments with strict security policies in place, `pip` may not be available. In such a case, Python processors can be packaged
using the NiFi ARchive (NAR) format. This is a .zip file with the following specific layout, and uses a filename extension of `.nar`:

```
my-nar-bundle.nar
+-- META-INF/
+-- MANIFEST.MF
+-- NAR-INF/
+-- bundled-dependencies/
+-- dependency1
+-- dependency2
+-- ...
+-- dependencyN
MyProcessor.py
```


[[deploying]]
== Deploying a Developed Processor

Once a Processor has been developed, it can be made available in NiFi using one of two methods.
For Processors that have been packaged as a NAR file, the NAR file should be copied to NiFi's `lib/` directory or configured extensions directory.
For Processors that are not pre-packaged as a NAR, the Processor is deployed by copying the source of the Python extension to the `$NIFI_HOME/python/extensions` directory by default.
The actual directory to look for extensions can be configured in `nifi.properties` via properties that have the prefix `nifi.python.extensions.source.directory.`.
For example, by default, `nifi.python.extensions.source.directory.default` is set to `./python/extensions`. However, additional paths may be added by replacing `default`
in the property name with some other value.

Any `.py` file found in the directory will be parsed and examined in order to determine whether or not it is a valid NiFi Processor.
In order to be found, the Processor must have a valid parent (`FlowFileTransform` or `RecordTransform`) and must have an inner class named `Java`
with a `implements = ['org.apache.nifi.python.processor.FlowFileTransform']` or `implements = ['org.apache.nifi.python.processor.RecordFileTransform']`.
This will allow NiFi to automatically discover the Processor.

Note, however, that if the Processor implementation is broken into multiple Python modules, those modules will not be made available by default. In order
to package a Processor along with its modules, the Processor and any related module must be added to a directory that is directly below the Extensions directory.
For example, if the `WriteNumber.py` file contains a NiFi Processor and also depends on the `ProcessorUtil.py` module, the directory structure would look like this:
----
NIFI_HOME/
- python/
- extensions/
ProcessorA.py
ProcessorB.py
write-number/
__init__.py
ProcessorUtils.py
WriteNumber.py
----
By packaging them together in a subdirectory, NiFi knows to expose the modules to one another. However, the ProcessorA module will have no access
to the `ProcessorUtils` module. Only `WriteNumber` will have access to it.



[[troubleshooting]]
== Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public List<BoundObjectCounts> getBoundObjectCounts() {
}

@Override
public void discoverExtensions() {
public void discoverExtensions(final boolean includeNarDirectories) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) {
delegate.discoverExtensions();
delegate.discoverExtensions(includeNarDirectories);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.scheduling.CronSchedulingAgent;
import org.apache.nifi.controller.scheduling.LifecycleStateManager;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
Expand Down Expand Up @@ -216,8 +216,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.NotificationEmitter;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -250,6 +248,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.management.NotificationEmitter;
import javax.net.ssl.SSLContext;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -900,10 +900,19 @@ private PythonBridge createPythonBridge(final NiFiProperties nifiProperties, fin
maxProcessesPerType = maxProcesses;
}

final List<File> narDirectories = new ArrayList<>();
for (final org.apache.nifi.bundle.Bundle bundle : extensionManager.getAllBundles()) {
final File workingDir = bundle.getBundleDetails().getWorkingDirectory();
if (workingDir.exists()) {
narDirectories.add(workingDir);
}
}

final PythonProcessConfig pythonProcessConfig = new PythonProcessConfig.Builder()
.pythonCommand(pythonCommand)
.pythonFrameworkDirectory(pythonFrameworkSourceDirectory)
.pythonExtensionsDirectories(pythonExtensionsDirectories)
.narDirectories(narDirectories)
.pythonWorkingDirectory(pythonWorkingDirectory)
.commsTimeout(commsTimeout == null ? null : Duration.ofMillis(FormatUtils.getTimeDuration(commsTimeout, TimeUnit.MILLISECONDS)))
.maxPythonProcesses(maxProcesses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ default void discoverExtensions(Set<Bundle> narBundles) {
void setPythonBridge(PythonBridge pythonBridge);

/**
* Discovers any Python based extensions using the given Python Bridge
* @param pythonBundle the system bundle
* Discovers any Python based extensions that exist in either the Python extensions directories or NAR bundles that have been expanded.
* @param pythonBundle the python bundle
*/
void discoverPythonExtensions(Bundle pythonBundle);

/**
* Discovers any new Python based extensions that have been added. This method will scan only the Python extension directories
* that have been configured and will not include scanning NAR bundles.
* @param pythonBundle the python bundle
*/
void discoverNewPythonExtensions(Bundle pythonBundle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
Expand Down Expand Up @@ -136,9 +135,7 @@ public StandardExtensionDiscoveringManager(final Collection<Class<? extends Conf

@Override
public Set<Bundle> getAllBundles() {
return classNameBundleLookup.values().stream()
.flatMap(List::stream)
.collect(Collectors.toSet());
return new HashSet<>(bundleCoordinateBundleLookup.values());
}

@Override
Expand Down Expand Up @@ -186,10 +183,20 @@ public void setPythonBridge(final PythonBridge pythonBridge) {

@Override
public void discoverPythonExtensions(final Bundle pythonBundle) {
discoverPythonExtensions(pythonBundle, true);
}

@Override
public void discoverNewPythonExtensions(final Bundle pythonBundle) {
logger.debug("Scanning to discover new Python extensions...");
discoverPythonExtensions(pythonBundle, false);
}

private void discoverPythonExtensions(final Bundle pythonBundle, final boolean includeNarBundles) {
logger.debug("Scanning to discover which Python extensions are available and importing any necessary dependencies. If new components are discovered, this may take a few minutes. " +
"See python logs for more details.");
final long start = System.currentTimeMillis();
pythonBridge.discoverExtensions();
pythonBridge.discoverExtensions(includeNarBundles);

bundleCoordinateBundleLookup.putIfAbsent(pythonBundle.getBundleDetails().getCoordinate(), pythonBundle);

Expand Down Expand Up @@ -269,11 +276,6 @@ private BundleDetails createBundleDetailsWithOverriddenVersion(final BundleDetai
.build();
}

@Override
public void discoverNewPythonExtensions(final Bundle pythonBundle) {
logger.debug("Scanning to discover new Python extensions...");
discoverPythonExtensions(pythonBundle);
}

/**
* Loads extensions from the specified bundle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,21 @@

public interface ProcessorCreationWorkflow {

/**
* @return <code>true</code> if the Processor has been packaged along with its dependencies, <code>false</code> otherwise
*/
boolean isPackagedWithDependencies();

/**
* Downloads any dependencies required by the Processor using <code>pip</code>.
* If the Processor is already packaged with its dependencies, this method does nothing.
*/
void downloadDependencies();

/**
* Creates the Processor on the Python side and returns an adapter for interacting with the Processor from the Java side.
* @return an adapter for interacting with the Python Processor
*/
PythonProcessorAdapter createProcessor();

}