MINIFICPP-1740 Add FetchFile processor#1262
Conversation
5cc8e67 to
35f738d
Compare
| return file_to_fetch_path; | ||
| } | ||
|
|
||
| void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const { |
There was a problem hiding this comment.
Consider making this a local template that forwards its arguments to the appropriate logging function, and use the formatting capabilities of the logger. This way, we avoid allocations of temporaries and use %s to interpolate the filenames.
| void FetchFile::logWithLevel(LogLevelOption log_level, const std::string& message) const { | |
| template<typename... Args> | |
| static void logWithLevel(LogLevelOption log_level, Args&&... args) const { |
and
logger_->log_trace(std::forward<Args>(args)...);
| auto generate_flow_file_processor = plan_->addProcessor("GenerateFlowFile", "GenerateFlowFile"); | ||
| plan_->setProperty(generate_flow_file_processor, org::apache::nifi::minifi::processors::GenerateFlowFile::FileSize.getName(), "0B"); | ||
| update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", core::Relationship("success", "description"), true); | ||
| plan_->setProperty(update_attribute_processor_, "absolute.path", input_dir_, true); | ||
| plan_->setProperty(update_attribute_processor_, "filename", input_file_name_, true); | ||
|
|
||
| fetch_file_processor_ = plan_->addProcessor("FetchFile", "FetchFile", core::Relationship("success", "description"), true); | ||
|
|
||
| auto success_putfile = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false); | ||
| plan_->addConnection(fetch_file_processor_, {"success", "d"}, success_putfile); | ||
| success_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); | ||
| plan_->setProperty(success_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), success_output_dir_); | ||
|
|
||
| auto failure_putfile = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false); | ||
| plan_->addConnection(fetch_file_processor_, {"failure", "d"}, failure_putfile); | ||
| failure_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"failure", "d"}}); | ||
| plan_->setProperty(failure_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), failure_output_dir_); | ||
|
|
||
| auto not_found_putfile = plan_->addProcessor("PutFile", "NotFoundPutFile", { {"success", "d"} }, false); | ||
| plan_->addConnection(fetch_file_processor_, {"not.found", "d"}, not_found_putfile); | ||
| not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"not.found", "d"}}); | ||
| plan_->setProperty(not_found_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), not_found_output_dir_); | ||
|
|
||
| auto permission_denied_putfile = plan_->addProcessor("PutFile", "PermissionDeniedPutFile", { {"success", "d"} }, false); | ||
| plan_->addConnection(fetch_file_processor_, {"permission.denied", "d"}, permission_denied_putfile); | ||
| not_found_putfile->setAutoTerminatedRelationships({{"success", "d"}, {"permission.denied", "d"}}); | ||
| plan_->setProperty(permission_denied_putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), permission_denied_output_dir_); |
There was a problem hiding this comment.
You could simplify this using SingleInputTestController. You can specify one input flow file, trigger the processor once, and observe each output relationship directly for flow files, eliminating the need for GenerateFlowFile, UpdateAttribute and each PutFile. Its signature looks like this:
std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>
trigger(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
| flow_file->getAttribute("absolute.path", file_to_fetch_path); | ||
| std::string filename; | ||
| flow_file->getAttribute("filename", filename); | ||
| file_to_fetch_path += utils::file::FileUtils::get_separator() + filename; | ||
| return file_to_fetch_path; |
There was a problem hiding this comment.
Consider using std::filesystem::path for stronger typing.
| flow_file->getAttribute("absolute.path", file_to_fetch_path); | |
| std::string filename; | |
| flow_file->getAttribute("filename", filename); | |
| file_to_fetch_path += utils::file::FileUtils::get_separator() + filename; | |
| return file_to_fetch_path; | |
| flow_file->getAttribute("absolute.path", file_to_fetch_path); | |
| std::string filename; | |
| flow_file->getAttribute("filename", filename); | |
| return std::filesystem::path{file_to_fetch_path} / filename; |
| logger_->log_info("Due to conflict file '%s' is moved with generated name '%s' by the Move Completion Strategy", file_to_fetch_path, generated_filename); | ||
| std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename)); | ||
| } else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) { | ||
| logger_->log_info("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path); |
There was a problem hiding this comment.
I would reduce these log levels (debug?) to avoid spamming the logs on high traffic deployments when everything is working normally.
| #ifndef WIN32 | ||
| utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, 0644); | ||
| #endif |
There was a problem hiding this comment.
You can use std::filesystem::permissions for cross platform coverage.
| return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name)); | ||
| } | ||
|
|
||
| bool FetchFile::moveWouldFailWithDestinationconflict(const std::string& file_name) const { |
There was a problem hiding this comment.
typo: 'c' of conflict should be capital
|
This was a mistake on my side, because there are still open threads. @lordgamez agreed to submit a followup with the changes, thanks. |
|
Addressed comments in #1288 |
FetchFile added mostly according to NiFi's implementation. A difference between this and NiFi's implementation is that the flow is only transferred to failure due to completion strategy failure if it is explicitly set to fail (in case of move strategy conflict). In NiFi's case it's a bit inconsistent as it could also fail if the move target directory could not be created or it has permission problems, but the flow is transferred to success in any other completion strategy failure.
https://issues.apache.org/jira/browse/MINIFICPP-1740
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.