NIFI-8380: Allow for an extensions.directory property to specify wher…#4950
NIFI-8380: Allow for an extensions.directory property to specify wher…#4950markap14 wants to merge 3 commits intoapache:mainfrom
Conversation
…e to place downloaded files. Also fixed an issue that was encountered, when a Source Processor is scheduled for Primary Node Only but more than 1 task is set. In that case, even though only a single task will should be scheduled, an Exception was getting thrown because @OnScheduled methods of Processors were still called. To avoid this, moved the initialization of the dataflow outside of the creation of the dataflow so that initialization can be triggered only when appropriate.
|
|
||
| final File file = new File((String) value); | ||
| if (!file.exists()) { | ||
| if (!file.exists() && !file.mkdirs()) { |
There was a problem hiding this comment.
Is this necessary to create the dir in the validation phase?
There was a problem hiding this comment.
No, you're right, that shouldn't be there. I put it there as a bit of an experiment, and forgot to remove it.
| @@ -24,8 +24,33 @@ | |||
| import java.util.Set; | |||
|
|
|||
| public interface StatelessDataflow { | |||
There was a problem hiding this comment.
It is not part of this PR, but wouldn't make sense to make it extend AutoClosable? and having close() instead of shutdown(), etc.
There was a problem hiding this comment.
I don't think we should extend AutoCloseable in this case. The StatelessDataflow is not really a "resource" and is generally not used in a such a way that you'd want to use a try-with-resources to create it for a short period of time and then cleanup. Rather, shutdown() is generally used when the application is shutdown. This involves terminating thread pools, cleaning up potentially many resources, etc., etc.
...undle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
Show resolved
Hide resolved
…d extensions directory to exist; removed auto-creation of directories in validation
urbandan
left a comment
There was a problem hiding this comment.
I think the download related changes should be mentioned in the commit message.
|
|
||
| final String extensionsDirectoryFilename = properties.getProperty(EXTENSIONS_DIRECTORY); | ||
| final File extensionsDirectory = extensionsDirectoryFilename == null ? narDirectory : new File(extensionsDirectoryFilename); | ||
| if (!extensionsDirectory.exists() && !extensionsDirectory.mkdirs()) { |
There was a problem hiding this comment.
I'm not sure if the mkdirs call is ok here: the validator will fail if the dir doesn't exist, but the actual connector/task init will pass if the dir can be created. Seems like the validator is stricter than it should be.
There was a problem hiding this comment.
The validation does not fail if the directory doesn't exist - that validation was removed.
…eads trying to download/unpack extensions, we properly synchronize the unpacking and unpack into the correct sub-directory under the working directory
| final long unpackStart = System.currentTimeMillis(); | ||
| final Predicate<BundleCoordinate> narFilter = coordinate -> true; | ||
| NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, extensionsWorkingDir, null, narDirectories, false, false, false, narFilter); | ||
| NarUnpackLock.lock(); |
There was a problem hiding this comment.
Is this lock also protecting the part when the nars are getting downloaded? Can it cause issues if the download is not protected by it?
With this solution, do we avoid downloading the nars multiple times? (i.e. if one task is blocked on getting the lock, will it realize that the sibling task already downloaded and unpacked the necessary nars?)
There was a problem hiding this comment.
No, each process that needs the nar will download it for itself. They download using a temporary filename. Only one task will manage to rename its download to the 'final' name. Others will delete their temporary files. It's not efficient, but it was a simple/straight-forward solution that can be improved upon later.
|
|
||
| final String extensionsDirectoryFilename = properties.getProperty(EXTENSIONS_DIRECTORY); | ||
| final File extensionsDirectory = extensionsDirectoryFilename == null ? narDirectory : new File(extensionsDirectoryFilename); | ||
| if (!extensionsDirectory.exists() && !extensionsDirectory.mkdirs()) { |
| * (due to multiple classloders being used for the 'stateless nar'), we define a singleton Lock within the nifi-stateless-api module. | ||
| * This lock should always be obtained before attempting to unpack nars. | ||
| */ | ||
| public class NarUnpackLock { |
There was a problem hiding this comment.
As an improvement, this could be providing locks on a per-directory basis - so if we have multiple NiFi Connectors with separate nar directories, they can progress without blocking each other.
There was a problem hiding this comment.
Yes, we can certainly improve this in the future. Given that this happens only at startup and likely will only take hundreds of milliseconds, I'm not particularly worried about the performance impact initially. But we can certainly improve this process if we need to, going forward.
akatona84
left a comment
There was a problem hiding this comment.
Tested a bit on a cluster, seems to be working now!
|
Merged to main, thanks @markap14 @akatona84 @urbandan |
|
Awesome! Thanks @akatona84 and @urbandan for such thorough testing & review! And thanks @pvillard31 for reviewing & merging! |
…e to place downloaded files. Also fixed an issue that was encountered, when a Source Processor is scheduled for Primary Node Only but more than 1 task is set. In that case, even though only a single task will should be scheduled, an Exception was getting thrown because @OnScheduled methods of Processors were still called. To avoid this, moved the initialization of the dataflow outside of the creation of the dataflow so that initialization can be triggered only when appropriate. NIFI-8380: Removed requirement in validation for working directory and extensions directory to exist; removed auto-creation of directories in validation NIFI-8380: Fixed a few thrading bugs, so that if we have multiple threads trying to download/unpack extensions, we properly synchronize the unpacking and unpack into the correct sub-directory under the working directory Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes apache#4950.
…e to place downloaded files. Also fixed an issue that was encountered, when a Source Processor is scheduled for Primary Node Only but more than 1 task is set. In that case, even though only a single task will should be scheduled, an Exception was getting thrown because @OnScheduled methods of Processors were still called. To avoid this, moved the initialization of the dataflow outside of the creation of the dataflow so that initialization can be triggered only when appropriate.
Thank you for submitting a contribution to Apache NiFi.
Please provide a short description of the PR here:
Description of PR
Enables X functionality; fixes bug NIFI-YYYY.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically
main)?Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not
squashor use--forcewhen pushing to allow for clean monitoring of changes.For code changes:
mvn -Pcontrib-check clean installat the rootnififolder?LICENSEfile, including the mainLICENSEfile undernifi-assembly?NOTICEfile, including the mainNOTICEfile found undernifi-assembly?.displayNamein addition to .name (programmatic access) for each of the new properties?For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.