From 7b369aabcb615c099f324e030d65aea9decfc165 Mon Sep 17 00:00:00 2001 From: Caleb Johnson Date: Thu, 26 Oct 2017 17:22:57 +0000 Subject: [PATCH] MINIFICPP-268 Implement ManipulateArchive processor --- CMakeLists.txt | 2 +- README.md | 1 + extensions/libarchive/ArchiveLoader.h | 4 + extensions/libarchive/ArchiveTests.h | 64 ++++ extensions/libarchive/ManipulateArchive.cpp | 237 ++++++++++++ extensions/libarchive/ManipulateArchive.h | 97 +++++ libminifi/test/archive-tests/CMakeLists.txt | 2 +- .../test/archive-tests/FocusArchiveTests.cpp | 152 +------- .../archive-tests/ManipulateArchiveTests.cpp | 343 ++++++++++++++++++ .../test/archive-tests/util/ArchiveTests.cpp | 202 +++++++++++ 10 files changed, 952 insertions(+), 152 deletions(-) create mode 100644 extensions/libarchive/ArchiveTests.h create mode 100644 extensions/libarchive/ManipulateArchive.cpp create mode 100644 extensions/libarchive/ManipulateArchive.h create mode 100644 libminifi/test/archive-tests/ManipulateArchiveTests.cpp create mode 100644 libminifi/test/archive-tests/util/ArchiveTests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 57636fcba1..d005a5e325 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -134,7 +134,7 @@ if (NOT LibArchive_FOUND OR BUILD_LIBARCHIVE) endif() option(DISABLE_LIBARCHIVE "Disables the lib archive extensions." OFF) if (NOT DISABLE_LIBARCHIVE) - createExtension(ARCHIVE-EXTENSIONS "ARCHIVE EXTENSIONS" "This Enables libarchive functionality including MergeContent, CompressContent, and (Un)FocusArchiveEntry" "extensions/libarchive" "${TEST_DIR}/archive-tests" BUILD_TP "thirdparty/libarchive-3.3.2") + createExtension(ARCHIVE-EXTENSIONS "ARCHIVE EXTENSIONS" "This Enables libarchive functionality including MergeContent, CompressContent, (Un)FocusArchiveEntry and ManipulateArchive." "extensions/libarchive" "${TEST_DIR}/archive-tests" BUILD_TP "thirdparty/libarchive-3.3.2") endif() option(ENABLE_GPS "Enables the GPS extension." OFF) diff --git a/README.md b/README.md index b4ff57e204..0f2a210fd0 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a * CompressContent * FocusArchive * UnfocusArchive + * ManipulateArchive * Provenance events generation is supported and are persisted using RocksDB. ## System Requirements diff --git a/extensions/libarchive/ArchiveLoader.h b/extensions/libarchive/ArchiveLoader.h index 2b7c9347a4..fcd4ee9d57 100644 --- a/extensions/libarchive/ArchiveLoader.h +++ b/extensions/libarchive/ArchiveLoader.h @@ -22,6 +22,7 @@ #include "CompressContent.h" #include "FocusArchiveEntry.h" #include "UnfocusArchiveEntry.h" +#include "ManipulateArchive.h" #include "core/ClassLoader.h" class __attribute__((visibility("default"))) ArchiveFactory : public core::ObjectFactory { @@ -51,6 +52,7 @@ class __attribute__((visibility("default"))) ArchiveFactory : public core::Objec class_names.push_back("CompressContent"); class_names.push_back("FocusArchiveEntry"); class_names.push_back("UnfocusArchiveEntry"); + class_names.push_back("ManipulateArchive"); return class_names; } @@ -63,6 +65,8 @@ class __attribute__((visibility("default"))) ArchiveFactory : public core::Objec return std::unique_ptr(new core::DefautObjectFactory()); } else if (utils::StringUtils::equalsIgnoreCase(class_name,"UnfocusArchiveEntry")) { return std::unique_ptr(new core::DefautObjectFactory()); + } else if (utils::StringUtils::equalsIgnoreCase(class_name,"ManipulateArchive")) { + return std::unique_ptr(new core::DefautObjectFactory()); } else { return nullptr; } diff --git a/extensions/libarchive/ArchiveTests.h b/extensions/libarchive/ArchiveTests.h new file mode 100644 index 0000000000..72b52deb53 --- /dev/null +++ b/extensions/libarchive/ArchiveTests.h @@ -0,0 +1,64 @@ +/** + * @file ArchiveTests.h + * Archive test declarations + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ARCHIVE_TESTS_H +#define ARCHIVE_TESTS_H + +#include +#include +#include + +#include +#include + +typedef struct { + const char* content; + std::string name; + mode_t type; + mode_t perms; + uid_t uid; + gid_t gid; + time_t mtime; + uint32_t mtime_nsec; + size_t size; +} TestArchiveEntry; + + +typedef std::map TAE_MAP_T; +typedef std::vector FN_VEC_T; + +typedef struct { + TAE_MAP_T map; + FN_VEC_T order; +} OrderedTestArchive; + +TAE_MAP_T build_test_archive_map(int, const char**, const char**); + +FN_VEC_T build_test_archive_order(int, const char**); + +OrderedTestArchive build_ordered_test_archive(int, const char**, const char**); + +void build_test_archive(std::string, TAE_MAP_T entries, FN_VEC_T order = FN_VEC_T()); +void build_test_archive(std::string, OrderedTestArchive); + +bool check_archive_contents(std::string, TAE_MAP_T entries, bool check_attributes=true, FN_VEC_T order = FN_VEC_T()); +bool check_archive_contents(std::string, OrderedTestArchive, bool check_attributes=true); + +#endif \ No newline at end of file diff --git a/extensions/libarchive/ManipulateArchive.cpp b/extensions/libarchive/ManipulateArchive.cpp new file mode 100644 index 0000000000..c5c979fdf5 --- /dev/null +++ b/extensions/libarchive/ManipulateArchive.cpp @@ -0,0 +1,237 @@ +/** + * @file ManipulateArchive.cpp + * ManipulateArchive class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "ManipulateArchive.h" +#include "Exception.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/FlowFile.h" +#include "utils/file/FileManager.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ManipulateArchive::Operation("Operation", "Operation to perform on the archive (touch, remove, copy, move).", ""); +core::Property ManipulateArchive::Target("Target", "An existing entry within the archive to perform the operation on.", ""); +core::Property ManipulateArchive::Destination("Destination", "Destination for operations (touch, move or copy) which result in new entries.", ""); +core::Property ManipulateArchive::Before("Before", "For operations which result in new entries, places the new entry before the entry specified by this property.", ""); +core::Property ManipulateArchive::After("After", "For operations which result in new entries, places the new entry after the entry specified by this property.", ""); +core::Relationship ManipulateArchive::Success("success", "FlowFiles will be transferred to the success relationship if the operation succeeds."); +core::Relationship ManipulateArchive::Failure("failure", "FlowFiles will be transferred to the failure relationship if the operation fails."); + +char const* ManipulateArchive::OPERATION_REMOVE = "remove"; +char const* ManipulateArchive::OPERATION_COPY = "copy"; +char const* ManipulateArchive::OPERATION_MOVE = "move"; +char const* ManipulateArchive::OPERATION_TOUCH = "touch"; + +void ManipulateArchive::initialize() { + //! Set the supported properties + std::set properties; + properties.insert(Operation); + properties.insert(Target); + properties.insert(Destination); + properties.insert(Before); + properties.insert(After); + setSupportedProperties(properties); + + //! Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void ManipulateArchive::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + context->getProperty(Operation.getName(), operation_); + bool invalid = false; + std::transform(operation_.begin(), operation_.end(), operation_.begin(), ::tolower); + + bool op_create = operation_ == OPERATION_COPY || + operation_ == OPERATION_MOVE || + operation_ == OPERATION_TOUCH; + + // Operation must be one of copy, move, touch or remove + if(!op_create && (operation_ != OPERATION_REMOVE)) { + logger_->log_error("Invalid operation %s for ManipulateArchive.", operation_); + invalid = true; + } + + + context->getProperty(Target.getName(), targetEntry_); + context->getProperty(Destination.getName(), destination_); + context->getProperty(Before.getName(), before_); + context->getProperty(After.getName(), after_); + + // All operations which create new entries require a set destination + if(op_create == destination_.empty()) { + logger_->log_error("ManipulateArchive requires a destination for %s.", operation_); + invalid = true; + } + + // The only operation that doesn't require an existing target is touch + if((operation_ == OPERATION_TOUCH) != targetEntry_.empty()) { + logger_->log_error("ManipulateArchive requires a target for %s.", operation_); + invalid = true; + } + + // Users may specify one or none of before or after, but never both. + if(before_.size() && after_.size()) { + logger_->log_error("ManipulateArchive: cannot specify both before and after."); + invalid = true; + } + + if(invalid) { + throw Exception(GENERAL_EXCEPTION, "Invalid ManipulateArchive configuration"); + } +} + +void ManipulateArchive::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr flowFile = session->get(); + + if (!flowFile) { + return; + } + + FocusArchiveEntry::ArchiveMetadata archiveMetadata; + fileutils::FileManager file_man; + + FocusArchiveEntry::ReadCallback readCallback(this, &file_man, &archiveMetadata); + session->read(flowFile, &readCallback); + + auto entries_end = archiveMetadata.entryMetadata.end(); + + auto target_position = scanArchiveEntries(archiveMetadata, targetEntry_); + + if (target_position == entries_end && operation_ != OPERATION_TOUCH) { + logger_->log_warn("ManipulateArchive could not find entry %s to %s!", + targetEntry_, operation_); + session->transfer(flowFile, Failure); + return; + } else { + logger_->log_info("ManipulateArchive found %s for %s.", + targetEntry_, operation_); + } + + if (!destination_.empty()) { + auto dest_position = scanArchiveEntries(archiveMetadata, destination_); + if (dest_position != entries_end) { + logger_->log_warn("ManipulateArchive cannot perform %s to existing destination_ %s!", + operation_, destination_); + session->transfer(flowFile, Failure); + return; + } + } + + std::list::iterator position; + + // Small speedup for when neither before or after are provided or needed + if ((before_.empty() && after_.empty()) || operation_ == OPERATION_REMOVE) { + position = entries_end; + } else { + std::string positionEntry = after_.empty() ? before_ : after_; + position = scanArchiveEntries(archiveMetadata, positionEntry); + + if (position == entries_end) + logger_->log_warn("ManipulateArchive could not find entry %s to " + "perform %s %s; appending to end of archive...", + positionEntry, operation_, + after_.empty() ? "before" : "after"); + + else + logger_->log_info("ManipulateArchive found entry %s to %s %s.", + positionEntry, operation_, + after_.empty() ? "before" : "after"); + + if (!after_.empty() && position != entries_end) + position++; + } + + if (operation_ == OPERATION_REMOVE) { + std::remove((*target_position).tmpFileName.c_str()); + target_position = archiveMetadata.entryMetadata.erase(target_position); + } else if (operation_ == OPERATION_COPY) { + FocusArchiveEntry::ArchiveEntryMetadata copy = *target_position; + + // Copy tmp file + const auto origTmpFileName = copy.tmpFileName; + const auto newTmpFileName = file_man.unique_file(false); + copy.tmpFileName = newTmpFileName; + std::ifstream src(origTmpFileName, std::ios::binary); + std::ofstream dst(newTmpFileName, std::ios::binary); + dst << src.rdbuf(); + copy.entryName = destination_; + + archiveMetadata.entryMetadata.insert(position, copy); + } else if (operation_ == OPERATION_MOVE) { + FocusArchiveEntry::ArchiveEntryMetadata moveEntry = *target_position; + target_position = archiveMetadata.entryMetadata.erase(target_position); + moveEntry.entryName = destination_; + archiveMetadata.entryMetadata.insert(position, moveEntry); + } else if (operation_ == OPERATION_TOUCH) { + FocusArchiveEntry::ArchiveEntryMetadata touchEntry; + touchEntry.entryName = destination_; + touchEntry.entryType = AE_IFREG; + touchEntry.entrySize = 0; + touchEntry.entryMTime = time(nullptr); + touchEntry.entryMTimeNsec = 0; + touchEntry.entryUID = 0; + touchEntry.entryGID = 0; + touchEntry.entryPerm = 0777; + + archiveMetadata.entryMetadata.insert(position, touchEntry); + } + + UnfocusArchiveEntry::WriteCallback writeCallback(&archiveMetadata); + session->write(flowFile, &writeCallback); + + session->transfer(flowFile, Success); +} + +std::list::iterator ManipulateArchive::scanArchiveEntries( + FocusArchiveEntry::ArchiveMetadata& archiveMetadata, + const std::string& target) { + auto targetTest = [&](const FocusArchiveEntry::ArchiveEntryMetadata& entry) -> bool { + return entry.entryName == target; + }; + + return std::find_if(archiveMetadata.entryMetadata.begin(), + archiveMetadata.entryMetadata.end(), + targetTest); +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/extensions/libarchive/ManipulateArchive.h b/extensions/libarchive/ManipulateArchive.h new file mode 100644 index 0000000000..455044c4ec --- /dev/null +++ b/extensions/libarchive/ManipulateArchive.h @@ -0,0 +1,97 @@ +/** + * @file ManipulateArchive.h + * ManipulateArchive class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_PROCESSORS_MANIPULATEARCHIVE_H_ +#define LIBMINIFI_INCLUDE_PROCESSORS_MANIPULATEARCHIVE_H_ + +#include +#include + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" + +#include "FocusArchiveEntry.h" +#include "UnfocusArchiveEntry.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +using logging::Logger; + +//! ManipulateArchive Class +class ManipulateArchive : public core::Processor { +public: + //! Constructor + /*! + * Create a new processor + */ + ManipulateArchive(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { + } + //! Destructor + virtual ~ManipulateArchive() {} + + //! Processor Name + static constexpr char const* ProcessorName = "ManipulateArchive"; + + //! Supported operations + static char const* OPERATION_REMOVE; + static char const* OPERATION_COPY; + static char const* OPERATION_MOVE; + static char const* OPERATION_TOUCH; + + //! Supported Properties + static core::Property Operation; + static core::Property Target; + static core::Property Destination; + static core::Property Before; + static core::Property After; + //! Supported Relationships + static core::Relationship Success; + static core::Relationship Failure; + + //! OnTrigger method, implemented by NiFi ManipulateArchive + void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + //! Initialize, over write by NiFi ManipulateArchive + void initialize(void); + +protected: + +private: + //! Logger + std::shared_ptr logger_; + std::string before_, after_, operation_, destination_, targetEntry_; + std::list::iterator scanArchiveEntries(FocusArchiveEntry::ArchiveMetadata&, const std::string&); +}; + +REGISTER_RESOURCE(ManipulateArchive); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif // LIBMINIFI_INCLUDE_PROCESSORS_MANIPULATEARCHIVE_H_ \ No newline at end of file diff --git a/libminifi/test/archive-tests/CMakeLists.txt b/libminifi/test/archive-tests/CMakeLists.txt index 340291de7f..1514a1e803 100644 --- a/libminifi/test/archive-tests/CMakeLists.txt +++ b/libminifi/test/archive-tests/CMakeLists.txt @@ -22,7 +22,7 @@ file(GLOB ARCHIVE_INTEGRATION_TESTS "*.cpp") SET(EXTENSIONS_TEST_COUNT 0) FOREACH(testfile ${ARCHIVE_INTEGRATION_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${testfile}") + add_executable("${testfilename}" "${testfile}" "${TEST_DIR}/archive-tests/util/ArchiveTests.cpp") target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/libarchive") target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/libarchive-3.3.2/libarchive") createTests("${testfilename}") diff --git a/libminifi/test/archive-tests/FocusArchiveTests.cpp b/libminifi/test/archive-tests/FocusArchiveTests.cpp index 1a68d206fb..87fab1bf4e 100644 --- a/libminifi/test/archive-tests/FocusArchiveTests.cpp +++ b/libminifi/test/archive-tests/FocusArchiveTests.cpp @@ -26,6 +26,7 @@ #include #include "../TestBase.h" +#include "ArchiveTests.h" #include "processors/GetFile.h" #include "processors/PutFile.h" #include "FocusArchiveEntry.h" @@ -34,20 +35,6 @@ #include #include -typedef struct { - const char* content; - std::string name; - mode_t type; - mode_t perms; - uid_t uid; - gid_t gid; - time_t mtime; - uint32_t mtime_nsec; - size_t size; -} TestArchiveEntry; - -typedef std::map TAE_MAP_T; - const char TEST_ARCHIVE_NAME[] = "focus_test_archive.tar"; const int NUM_FILES = 2; const char* FILE_NAMES[NUM_FILES] = {"file1", "file2"}; @@ -56,141 +43,6 @@ const char* FILE_CONTENT[NUM_FILES] = {"Test file 1\n", "Test file 2\n"}; const char* FOCUSED_FILE = FILE_NAMES[0]; const char* FOCUSED_CONTENT = FILE_CONTENT[0]; -TAE_MAP_T build_test_archive_map() { - TAE_MAP_T test_entries; - - for (int i = 0; i < NUM_FILES; i++) { - std::string name {FILE_NAMES[i]}; - TestArchiveEntry entry; - - entry.name = name; - entry.content = FILE_CONTENT[i]; - entry.size = strlen(FILE_CONTENT[i]); - entry.type = AE_IFREG; - entry.perms = 0765; - entry.uid = 12; entry.gid = 34; - entry.mtime = time(nullptr); - entry.mtime_nsec = 0; - - test_entries[name] = entry; - } - - return test_entries; -} - -void build_test_archive(std::string path, TAE_MAP_T entries) { - std::cout << "Creating " << path << std::endl; - archive * test_archive = archive_write_new(); - - archive_write_set_format_ustar(test_archive); - archive_write_open_filename(test_archive, path.c_str()); - struct archive_entry* entry = archive_entry_new(); - - for (auto &kvp : entries) { - std::string name = kvp.first; - TestArchiveEntry test_entry = kvp.second; - - std::cout << "Adding entry: " << name << std::endl; - - archive_entry_set_filetype(entry, test_entry.type); - archive_entry_set_pathname(entry, test_entry.name.c_str()); - archive_entry_set_size(entry, test_entry.size); - archive_entry_set_perm(entry, test_entry.perms); - archive_entry_set_uid(entry, test_entry.uid); - archive_entry_set_gid(entry, test_entry.gid); - archive_entry_set_mtime(entry, test_entry.mtime, test_entry.mtime_nsec); - - archive_write_header(test_archive, entry); - archive_write_data(test_archive, test_entry.content, test_entry.size); - - archive_entry_clear(entry); - } - - archive_entry_free(entry); - archive_write_close(test_archive); -} - -bool check_archive_contents(std::string path, TAE_MAP_T entries) { - std::set read_names; - std::set extra_names; - bool ok = true; - struct archive *a = archive_read_new(); - struct archive_entry *entry; - - archive_read_support_format_all(a); - archive_read_support_filter_all(a); - - int r = archive_read_open_filename(a, path.c_str(), 16384); - - if (r != ARCHIVE_OK) { - std::cout << "Unable to open archive " << path << " for checking!" << std::endl; - return false; - } - - while (archive_read_next_header(a, &entry) == ARCHIVE_OK) { - std::string name {archive_entry_pathname(entry)}; - auto it = entries.find(name); - if (it == entries.end()) { - extra_names.insert(name); - } else { - read_names.insert(name); - TestArchiveEntry test_entry = it->second; - size_t size = archive_entry_size(entry); - - std::cout << "Checking archive entry: " << name << std::endl; - - REQUIRE(size == test_entry.size); - - if (size > 0) { - int rlen, nlen = 0; - const char* buf[size]; - bool read_ok = true; - - for (;;) { - rlen = archive_read_data(a, buf, size); - nlen += rlen; - if (rlen == 0) break; - if (rlen < 0) { - std::cout << "FAIL: Negative size read?" << std::endl; - read_ok = false; - break; - } - } - - if (read_ok) { - REQUIRE(nlen == size); - REQUIRE(memcmp(buf, test_entry.content, size) == 0); - } - } - - REQUIRE(archive_entry_filetype(entry) == test_entry.type); - REQUIRE(archive_entry_uid(entry) == test_entry.uid); - REQUIRE(archive_entry_gid(entry) == test_entry.gid); - REQUIRE(archive_entry_perm(entry) == test_entry.perms); - REQUIRE(archive_entry_mtime(entry) == test_entry.mtime); - } - } - - archive_read_close(a); - archive_read_free(a); - - if (!extra_names.empty()) { - ok = false; - std::cout << "Extra files found: "; - for (std::string filename : extra_names) std::cout << filename << " "; - std::cout << std::endl; - } - - REQUIRE(extra_names.empty()); - - std::set test_file_entries; - std::transform(entries.begin(), entries.end(), - std::inserter(test_file_entries, test_file_entries.end()), - [](std::pair p){ return p.first; }); - REQUIRE(read_names == test_file_entries); - - return ok; -} TEST_CASE("Test Creation of FocusArchiveEntry", "[getfileCreate]") { TestController testController; @@ -252,7 +104,7 @@ TEST_CASE("FocusArchive", "[testFocusArchive]") { ss1 << dir1 << "/" << TEST_ARCHIVE_NAME; std::string archive_path_1 = ss1.str(); - TAE_MAP_T test_archive_map = build_test_archive_map(); + TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); build_test_archive(archive_path_1, test_archive_map); REQUIRE(check_archive_contents(archive_path_1, test_archive_map)); diff --git a/libminifi/test/archive-tests/ManipulateArchiveTests.cpp b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp new file mode 100644 index 0000000000..e8491f072a --- /dev/null +++ b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp @@ -0,0 +1,343 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "ArchiveTests.h" +#include "processors/GetFile.h" +#include "processors/PutFile.h" +#include "ManipulateArchive.h" + +const char TEST_ARCHIVE_NAME[] = "manipulate_test_archive.tar"; +const int NUM_FILES = 3; +const char* FILE_NAMES[NUM_FILES] = {"first", "middle", "last"}; +const char* FILE_CONTENT[NUM_FILES] = {"Test file 1\n", "Test file 2\n", "Test file 3\n"}; + +const char* MODIFY_SRC = FILE_NAMES[0]; +const char* ORDER_ANCHOR = FILE_NAMES[1]; +const char* MODIFY_DEST = "modified"; + +typedef std::map PROP_MAP_T; + +bool run_archive_test(OrderedTestArchive input_archive, OrderedTestArchive output_archive, PROP_MAP_T properties, bool check_attributes = true) { + TestController testController; + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + + std::shared_ptr plan = testController.createPlan(); + std::shared_ptr repo = std::make_shared(); + + char dir1[] = "/tmp/gt.XXXXXX"; + char dir2[] = "/tmp/gt.XXXXXX"; + + REQUIRE(testController.createTempDirectory(dir1) != nullptr); + std::shared_ptr getfile = plan->addProcessor("GetFile", "getfileCreate2"); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir1); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile.getName(), "true"); + + std::shared_ptr maprocessor = plan->addProcessor("ManipulateArchive", "testManipulateArchive", core::Relationship("success", "description"), true); + + for (auto kv : properties) { + plan->setProperty(maprocessor, kv.first, kv.second); + } + + REQUIRE(testController.createTempDirectory(dir2) != nullptr); + std::shared_ptr putfile2 = plan->addProcessor("PutFile", "PutFile2", core::Relationship("success", "description"), true); + plan->setProperty(putfile2, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), dir2); + plan->setProperty(putfile2, org::apache::nifi::minifi::processors::PutFile::ConflictResolution.getName(), + org::apache::nifi::minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE); + + std::stringstream ss1; + ss1 << dir1 << "/" << TEST_ARCHIVE_NAME; + std::string archive_path_1 = ss1.str(); + + build_test_archive(archive_path_1, input_archive); + REQUIRE(check_archive_contents(archive_path_1, input_archive, true)); + + plan->runNextProcessor(); // GetFile + plan->runNextProcessor(); // ManipulateArchive + plan->runNextProcessor(); // PutFile 2 (manipulated) + + std::stringstream ss2; + ss2 << dir2 << "/" << TEST_ARCHIVE_NAME; + std::string output_path = ss2.str(); + return check_archive_contents(output_path, output_archive, check_attributes); +} + +bool run_archive_test(TAE_MAP_T input_map, TAE_MAP_T output_map, PROP_MAP_T properties, bool check_attributes = true) { + OrderedTestArchive input_archive, output_archive; + + // An empty vector is treated as "ignore order" + input_archive.order = output_archive.order = FN_VEC_T(); + input_archive.map = input_map; + output_archive.map = output_map; + return run_archive_test(input_archive, output_archive, properties, check_attributes); +} + +TEST_CASE("Test creation of ManipulateArchive", "[manipulatearchiveCreate]") { + TestController testController; + std::shared_ptr processor = std::make_shared("processorname"); + REQUIRE(processor->getName() == "processorname"); + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); +} + +TEST_CASE("Test ManipulateArchive Touch", "[testManipulateArchiveTouch]") { + TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_TOUCH} + }; + + // The other attributes aren't checked, so we can leave them uninitialized + TestArchiveEntry touched_entry; + touched_entry.name = MODIFY_DEST; + touched_entry.content = ""; + touched_entry.size = 0; + touched_entry.type = AE_IFREG; + + // Copy original map and append touched entry + TAE_MAP_T mod_archive_map(test_archive_map); + mod_archive_map[MODIFY_DEST] = touched_entry; + + REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties, false)); +} + +TEST_CASE("Test ManipulateArchive Copy", "[testManipulateArchiveCopy]") { + TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Target.getName(), MODIFY_SRC}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_COPY} + }; + + TAE_MAP_T mod_archive_map(test_archive_map); + mod_archive_map[MODIFY_DEST] = test_archive_map[MODIFY_SRC]; + + REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties)); +} + +TEST_CASE("Test ManipulateArchive Move", "[testManipulateArchiveMove]") { + TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Target.getName(), MODIFY_SRC}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_MOVE} + }; + + TAE_MAP_T mod_archive_map(test_archive_map); + + mod_archive_map[MODIFY_DEST] = test_archive_map[MODIFY_SRC]; + mod_archive_map[MODIFY_DEST].name = MODIFY_DEST; + + auto it = mod_archive_map.find(MODIFY_SRC); + mod_archive_map.erase(it); + + REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties)); +} + +TEST_CASE("Test ManipulateArchive Remove", "[testManipulateArchiveRemove]") { + TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Target.getName(), MODIFY_SRC}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_REMOVE} + }; + + TAE_MAP_T mod_archive_map(test_archive_map); + + auto it = mod_archive_map.find(MODIFY_SRC); + mod_archive_map.erase(it); + + REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties)); +} + +TEST_CASE("Test ManipulateArchive Ordered Touch (before)", "[testManipulateArchiveOrderedTouchBefore]") { + OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_TOUCH}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Before.getName(), ORDER_ANCHOR} + }; + + // The other attributes aren't checked, so we can leave them uninitialized + TestArchiveEntry touched_entry; + touched_entry.name = MODIFY_DEST; + touched_entry.content = ""; + touched_entry.size = 0; + touched_entry.type = AE_IFREG; + + // Copy original map and append touched entry + OrderedTestArchive mod_archive = test_archive; + mod_archive.map[MODIFY_DEST] = touched_entry; + + auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR); + mod_archive.order.insert(it, MODIFY_DEST); + + REQUIRE(run_archive_test(test_archive, mod_archive, properties, false)); +} + +TEST_CASE("Test ManipulateArchive Ordered Copy (before)", "[testManipulateArchiveOrderedCopyBefore]") { + OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Target.getName(), MODIFY_SRC}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_COPY}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Before.getName(), ORDER_ANCHOR} + }; + + OrderedTestArchive mod_archive = test_archive; + mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC]; + auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR); + mod_archive.order.insert(it, MODIFY_DEST); + + REQUIRE(run_archive_test(test_archive, mod_archive, properties)); +} + +TEST_CASE("Test ManipulateArchive Ordered Move (before)", "[testManipulateArchiveOrderedMoveBefore]") { + OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Target.getName(), MODIFY_SRC}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_MOVE}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Before.getName(), ORDER_ANCHOR} + }; + + OrderedTestArchive mod_archive = test_archive; + + // Update map + mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC]; + mod_archive.map[MODIFY_DEST].name = MODIFY_DEST; + auto m_it = mod_archive.map.find(MODIFY_SRC); + mod_archive.map.erase(m_it); + + // Update order + auto o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), MODIFY_SRC); + mod_archive.order.erase(o_it); + o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR); + mod_archive.order.insert(o_it, MODIFY_DEST); + + REQUIRE(run_archive_test(test_archive, mod_archive, properties)); +} + +TEST_CASE("Test ManipulateArchive Ordered Touch (after)", "[testManipulateArchiveOrderedTouchAfter]") { + OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_TOUCH}, + {org::apache::nifi::minifi::processors::ManipulateArchive::After.getName(), ORDER_ANCHOR} + }; + + // The other attributes aren't checked, so we can leave them uninitialized + TestArchiveEntry touched_entry; + touched_entry.name = MODIFY_DEST; + touched_entry.content = ""; + touched_entry.size = 0; + touched_entry.type = AE_IFREG; + + // Copy original map and append touched entry + OrderedTestArchive mod_archive = test_archive; + mod_archive.map[MODIFY_DEST] = touched_entry; + + auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR); + it++; + mod_archive.order.insert(it, MODIFY_DEST); + + REQUIRE(run_archive_test(test_archive, mod_archive, properties, false)); +} + +TEST_CASE("Test ManipulateArchive Ordered Copy (after)", "[testManipulateArchiveOrderedCopyAfter]") { + OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Target.getName(), MODIFY_SRC}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_COPY}, + {org::apache::nifi::minifi::processors::ManipulateArchive::After.getName(), ORDER_ANCHOR} + }; + + OrderedTestArchive mod_archive = test_archive; + mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC]; + auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR); + it++; + mod_archive.order.insert(it, MODIFY_DEST); + + REQUIRE(run_archive_test(test_archive, mod_archive, properties)); +} + +TEST_CASE("Test ManipulateArchive Ordered Move (after)", "[testManipulateArchiveOrderedMoveAfter]") { + OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT); + + PROP_MAP_T properties { + {org::apache::nifi::minifi::processors::ManipulateArchive::Target.getName(), MODIFY_SRC}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Destination.getName(), MODIFY_DEST}, + {org::apache::nifi::minifi::processors::ManipulateArchive::Operation.getName(), + org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_MOVE}, + {org::apache::nifi::minifi::processors::ManipulateArchive::After.getName(), ORDER_ANCHOR} + }; + + OrderedTestArchive mod_archive = test_archive; + + // Update map + mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC]; + mod_archive.map[MODIFY_DEST].name = MODIFY_DEST; + auto m_it = mod_archive.map.find(MODIFY_SRC); + mod_archive.map.erase(m_it); + + // Update order + auto o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), MODIFY_SRC); + mod_archive.order.erase(o_it); + o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR); + o_it++; + mod_archive.order.insert(o_it, MODIFY_DEST); + + REQUIRE(run_archive_test(test_archive, mod_archive, properties)); +} diff --git a/libminifi/test/archive-tests/util/ArchiveTests.cpp b/libminifi/test/archive-tests/util/ArchiveTests.cpp new file mode 100644 index 0000000000..f6e7d7e207 --- /dev/null +++ b/libminifi/test/archive-tests/util/ArchiveTests.cpp @@ -0,0 +1,202 @@ +/** + * @file ArchiveTests.cpp + * Archive test definitions + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ArchiveTests.h" +#include +#include +#include +#include + +#include +#include +#include "../../TestBase.h" + +TAE_MAP_T build_test_archive_map(int NUM_FILES, const char** FILE_NAMES, const char** FILE_CONTENT) { + TAE_MAP_T test_entries; + + for (int i = 0; i < NUM_FILES; i++) { + std::string name {FILE_NAMES[i]}; + TestArchiveEntry entry; + + entry.name = name; + entry.content = FILE_CONTENT[i]; + entry.size = strlen(FILE_CONTENT[i]); + entry.type = AE_IFREG; + entry.perms = 0765; + entry.uid = 12; entry.gid = 34; + entry.mtime = time(nullptr); + entry.mtime_nsec = 3.14159; + + test_entries[name] = entry; + } + + return test_entries; +} + +FN_VEC_T build_test_archive_order(int NUM_FILES, const char** FILE_NAMES) { + FN_VEC_T ret; + for (int i = 0; i < NUM_FILES; i++) ret.push_back(FILE_NAMES[i]); + return ret; +} + +OrderedTestArchive build_ordered_test_archive(int NUM_FILES, const char** FILE_NAMES, const char** FILE_CONTENT) { + OrderedTestArchive ret; + ret.map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT); + ret.order = build_test_archive_order(NUM_FILES, FILE_NAMES); + return ret; +} + +void build_test_archive(std::string path, TAE_MAP_T entries, FN_VEC_T order) { + std::cout << "Creating " << path << std::endl; + archive * test_archive = archive_write_new(); + + archive_write_set_format_ustar(test_archive); + archive_write_open_filename(test_archive, path.c_str()); + struct archive_entry* entry = archive_entry_new(); + + if (order.empty()) { // Use map sort order + for (auto &kvp : entries) + order.push_back(kvp.first); + } + + for (std::string name : order) { + TestArchiveEntry test_entry = entries.at(name); + + std::cout << "Adding entry: " << name << std::endl; + + archive_entry_set_filetype(entry, test_entry.type); + archive_entry_set_pathname(entry, test_entry.name.c_str()); + archive_entry_set_size(entry, test_entry.size); + archive_entry_set_perm(entry, test_entry.perms); + archive_entry_set_uid(entry, test_entry.uid); + archive_entry_set_gid(entry, test_entry.gid); + archive_entry_set_mtime(entry, test_entry.mtime, test_entry.mtime_nsec); + + archive_write_header(test_archive, entry); + archive_write_data(test_archive, test_entry.content, test_entry.size); + + archive_entry_clear(entry); + } + + archive_entry_free(entry); + archive_write_close(test_archive); +} + +void build_test_archive(std::string path, OrderedTestArchive ordered_archive) { + build_test_archive(path, ordered_archive.map, ordered_archive.order); +} + +bool check_archive_contents(std::string path, TAE_MAP_T entries, bool check_attributes, FN_VEC_T order) { + FN_VEC_T read_names; + FN_VEC_T extra_names; + bool ok = true; + struct archive *a = archive_read_new(); + struct archive_entry *entry; + + archive_read_support_format_all(a); + archive_read_support_filter_all(a); + + int r = archive_read_open_filename(a, path.c_str(), 16384); + + if (r != ARCHIVE_OK) { + std::cout << "Unable to open archive " << path << " for checking!" << std::endl; + return false; + } + + while (archive_read_next_header(a, &entry) == ARCHIVE_OK) { + std::string name {archive_entry_pathname(entry)}; + auto it = entries.find(name); + if (it == entries.end()) { + extra_names.push_back(name); + } else { + read_names.push_back(name); + TestArchiveEntry test_entry = it->second; + size_t size = archive_entry_size(entry); + + std::cout << "Checking archive entry: " << name << std::endl; + + REQUIRE(size == test_entry.size); + + if (size > 0) { + int rlen, nlen = 0; + const char* buf[size]; + bool read_ok = true; + + for (;;) { + rlen = archive_read_data(a, buf, size); + nlen += rlen; + if (rlen == 0) break; + if (rlen < 0) { + std::cout << "FAIL: Negative size read?" << std::endl; + read_ok = false; + break; + } + } + + if (read_ok) { + REQUIRE(nlen == size); + REQUIRE(memcmp(buf, test_entry.content, size) == 0); + } + } + + REQUIRE(archive_entry_filetype(entry) == test_entry.type); + + if (check_attributes) { + REQUIRE(archive_entry_uid(entry) == test_entry.uid); + REQUIRE(archive_entry_gid(entry) == test_entry.gid); + REQUIRE(archive_entry_perm(entry) == test_entry.perms); + REQUIRE(archive_entry_mtime(entry) == test_entry.mtime); + } + } + } + + archive_read_close(a); + archive_read_free(a); + + if (!extra_names.empty()) { + ok = false; + std::cout << "Extra files found: "; + for (std::string filename : extra_names) std::cout << filename << " "; + std::cout << std::endl; + } + + REQUIRE(extra_names.empty()); + + if (!order.empty()) { + REQUIRE(order.size() == entries.size()); + } + + if (!order.empty()) { + REQUIRE(read_names == order); + } else { + std::set read_names_set(read_names.begin(), read_names.end()); + std::set test_file_entries_set; + std::transform(entries.begin(), entries.end(), + std::inserter(test_file_entries_set, test_file_entries_set.end()), + [](std::pair p){ return p.first; }); + + REQUIRE(read_names_set == test_file_entries_set); + } + + return ok; +} + +bool check_archive_contents(std::string path, OrderedTestArchive archive, bool check_attributes) { + return check_archive_contents(path, archive.map, check_attributes, archive.order); +}