From ae7241742ebe74e6ce40f2fbba0f270d2f3464d3 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Tue, 28 Mar 2017 15:50:48 -0400 Subject: [PATCH 1/2] MINIFI-236: Make GetFile, PutFile, TailFile, and ExecuteProcess thread safe --- .travis.yml | 2 +- CMakeLists.txt | 15 + .../include/core/ConfigurableComponent.h | 7 +- libminifi/include/core/FlowConfiguration.h | 2 +- libminifi/include/core/ProcessSession.h | 9 +- libminifi/include/core/Processor.h | 2 - libminifi/include/processors/GetFile.h | 72 +++-- libminifi/include/processors/PutFile.h | 22 +- libminifi/include/processors/TailFile.h | 16 +- libminifi/include/utils/ThreadPool.h | 287 ++++++++++++++++++ libminifi/src/core/ConfigurableComponent.cpp | 10 +- libminifi/src/core/FlowConfiguration.cpp | 8 +- libminifi/src/core/ProcessSession.cpp | 88 +++++- libminifi/src/processors/GetFile.cpp | 119 +++++--- libminifi/src/processors/PutFile.cpp | 53 ++-- libminifi/src/processors/TailFile.cpp | 32 +- libminifi/test/TestExecuteProcess.cpp | 136 +++++++++ libminifi/test/unit/ProcessorTests.cpp | 144 +++++++-- libminifi/test/unit/ProvenanceTestHelper.h | 1 - 19 files changed, 851 insertions(+), 174 deletions(-) create mode 100644 libminifi/include/utils/ThreadPool.h create mode 100644 libminifi/test/TestExecuteProcess.cpp diff --git a/.travis.yml b/.travis.yml index 31cb731344..faa291bbd3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,4 +47,4 @@ matrix: - package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew outdated ${package} || brew upgrade ${package}; } || brew install ${package} script: - - mkdir ./build && cd ./build && cmake .. && make && ./tests + - mkdir ./build && cd ./build && cmake .. && make && make test diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d7875e790..b84706da0a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -136,6 +136,21 @@ enable_testing(test) target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance") target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp) add_test(NAME LibMinifiTests COMMAND tests) + + file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/TestExecuteProcess.cpp") + add_executable(testExecuteProcess ${LIBMINIFI_TEST_EXECUTE_PROCESS} ${SPD_SOURCES}) + target_include_directories(testExecuteProcess PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") + target_include_directories(testExecuteProcess PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) + target_include_directories(testExecuteProcess PRIVATE BEFORE "include") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/core/repository") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/io") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/utils") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/processors") + target_include_directories(testExecuteProcess PRIVATE BEFORE "libminifi/include/provenance") + target_link_libraries(testExecuteProcess ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp) + add_test(NAME ExecuteProcess COMMAND testExecuteProcess) # Create a custom build target called "docker" that will invoke DockerBuild.sh and create the NiFi-MiNiFi-CPP Docker image add_custom_target( diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index c0cc623651..b0b8739cdc 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -32,7 +32,7 @@ namespace core { * Represents a configurable component * Purpose: Extracts configuration items for all components and localized them */ -class ConfigurableComponent { +class ConfigurableComponent { public: @@ -89,10 +89,13 @@ class ConfigurableComponent { virtual bool canEdit()= 0; std::mutex configuration_mutex_; - std::shared_ptr logger_; + // Supported properties std::map properties_; + private: + std::shared_ptr my_logger_; + }; } /* namespace core */ diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index c7eedd21b7..87505abcba 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -26,9 +26,9 @@ #include "processors/PutFile.h" #include "processors/TailFile.h" #include "processors/ListenSyslog.h" +#include "processors/ListenHTTP.h" #include "processors/GenerateFlowFile.h" #include "processors/RealTimeDataCollector.h" -#include "processors/ListenHTTP.h" #include "processors/LogAttribute.h" #include "processors/ExecuteProcess.h" #include "processors/AppendHostInfo.h" diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index b516817f4e..acde2456da 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -77,7 +77,13 @@ class ProcessSession { // Create a new UUID FlowFile with no content resource claim and without parent std::shared_ptr create(); // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent - std::shared_ptr create(std::shared_ptr &parent); + std::shared_ptr create( + std::shared_ptr &&parent); + + std::shared_ptr create( + std::shared_ptr &parent){ + return create(parent); + } // Clone a new UUID FlowFile from parent both for content resource claim and attributes std::shared_ptr clone( std::shared_ptr &parent); @@ -122,6 +128,7 @@ class ProcessSession { void penalize(std::shared_ptr &flow); void penalize(std::shared_ptr &&flow); // Import the existed file into the flow + void importFrom(io::DataStream &stream, std::shared_ptr &&flow); void import(std::string source, std::shared_ptr &flow, bool keepSource = true, uint64_t offset = 0); void import(std::string source, std::shared_ptr &&flow, diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index fd0411f73b..4a71816cef 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -251,8 +251,6 @@ class Processor : public Connectable, public ConfigurableComponent, // Check all incoming connections for work bool isWorkAvailable(); - // Logger - std::shared_ptr logger_; // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer Processor(const Processor &parent); diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index f1f069491c..4e96624d7d 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -1,6 +1,4 @@ /** - * @file GetFile.h - * GetFile class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +18,7 @@ #ifndef __GET_FILE_H__ #define __GET_FILE_H__ +#include #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" @@ -31,6 +30,20 @@ namespace nifi { namespace minifi { namespace processors { + struct GetFileRequest{ + std::string directory = "."; + bool recursive = true; + bool keepSourceFile = false; + int64_t minAge = 0; + int64_t maxAge = 0; + int64_t minSize = 0; + int64_t maxSize = 0; + bool ignoreHiddenFile = true; + int64_t pollInterval = 0; + int64_t batchSize = 10; + std::string fileFilter= "[^\\.].*"; + }; + // GetFile Class class GetFile : public core::Processor { public: @@ -38,21 +51,9 @@ class GetFile : public core::Processor { /*! * Create a new processor */ - GetFile(std::string name, uuid_t uuid = NULL) + explicit GetFile(std::string name, uuid_t uuid = NULL) : Processor(name, uuid) { - logger_ = logging::Logger::getLogger(); - _directory = "."; - _recursive = true; - _keepSourceFile = false; - _minAge = 0; - _maxAge = 0; - _minSize = 0; - _maxSize = 0; - _ignoreHiddenFile = true; - _pollInterval = 0; - _batchSize = 10; - _lastDirectoryListingTime = getTimeMillis(); - _fileFilter = "[^\\.].*"; + } // Destructor virtual ~GetFile() { @@ -79,16 +80,27 @@ class GetFile : public core::Processor { virtual void onTrigger( core::ProcessContext *context, core::ProcessSession *session); + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule( + core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory); // Initialize, over write by NiFi GetFile virtual void initialize(void); // perform directory listing - void performListing(std::string dir); + void performListing(std::string dir,const GetFileRequest &request); + + uint64_t getLastListingTime(const std::string &directory); + void updateListingTime(const std::string &directory); protected: private: - // Logger - std::shared_ptr logger_; + // Queue for store directory list std::queue _dirList; // Get Listing size @@ -101,23 +113,17 @@ class GetFile : public core::Processor { // Put full path file name into directory listing void putListing(std::string fileName); // Poll directory listing for files - void pollListing(std::queue &list, int maxSize); + void pollListing(std::queue &list,const GetFileRequest &request); // Check whether file can be added to the directory listing - bool acceptFile(std::string fullName, std::string name); + bool acceptFile(std::string fullName, std::string name, const GetFileRequest &request); + // Get file request object. + GetFileRequest request_; // Mutex for protection of the directory listing + std::mutex mutex_; - std::string _directory; - bool _recursive; - bool _keepSourceFile; - int64_t _minAge; - int64_t _maxAge; - int64_t _minSize; - int64_t _maxSize; - bool _ignoreHiddenFile; - int64_t _pollInterval; - int64_t _batchSize; - uint64_t _lastDirectoryListingTime; - std::string _fileFilter; + + std::map> last_listing_times_; + }; } /* namespace processors */ diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index 7653faca9c..c0effafd85 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -45,7 +45,6 @@ class PutFile : public core::Processor { */ PutFile(std::string name, uuid_t uuid = NULL) : core::Processor(name, uuid) { - logger_ = logging::Logger::getLogger(); } // Destructor virtual ~PutFile() { @@ -59,10 +58,18 @@ class PutFile : public core::Processor { static core::Relationship Success; static core::Relationship Failure; + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi PutFile - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi PutFile virtual void initialize(void); @@ -84,8 +91,11 @@ class PutFile : public core::Processor { protected: private: - // Logger - std::shared_ptr logger_; + + // directory + std::string directory_; + // conflict resolution type. + std::string conflict_resolution_; bool putFile(core::ProcessSession *session, std::shared_ptr flowFile, diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index 5be76e4b72..17507cdfe3 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -38,9 +38,8 @@ class TailFile : public core::Processor { /*! * Create a new processor */ - TailFile(std::string name, uuid_t uuid = NULL) + explicit TailFile(std::string name, uuid_t uuid = NULL) : core::Processor(name, uuid) { - logger_ = logging::Logger::getLogger(); _stateRecovered = false; } // Destructor @@ -57,9 +56,8 @@ class TailFile : public core::Processor { public: // OnTrigger method, implemented by NiFi TailFile - virtual void onTrigger( - core::ProcessContext *context, - core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); // Initialize, over write by NiFi TailFile virtual void initialize(void); // recoverState @@ -70,11 +68,7 @@ class TailFile : public core::Processor { protected: private: - // Logger - std::shared_ptr logger_; - std::string _fileLocation; - // Property Specified Tailed File Name - std::string _fileName; + std::mutex tail_file_mutex_; // File to save state std::string _stateFile; // State related to the tailed file @@ -86,7 +80,7 @@ class TailFile : public core::Processor { std::string trimLeft(const std::string& s); std::string trimRight(const std::string& s); void parseStateFileLine(char *buf); - void checkRollOver(); + void checkRollOver(std::string, std::string); }; diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h new file mode 100644 index 0000000000..750890089b --- /dev/null +++ b/libminifi/include/utils/ThreadPool.h @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_THREAD_POOL_H +#define LIBMINIFI_INCLUDE_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * Worker task + * purpose: Provides a wrapper for the functor + * and returns a future based on the template argument. + */ +template< typename T> +class Worker{ +public: + explicit Worker(std::function &task) : task(task) + { + promise = std::make_shared>(); + } + + /** + * Move constructor for worker tasks + */ + Worker(Worker &&other) : task (std::move(other.task)), + promise(other.promise) + { + } + + + /** + * Runs the task and takes the output from the funtor + * setting the result into the promise + */ + void run() + { + T result = task(); + promise->set_value(result); + } + + Worker(const Worker&) = delete; + Worker& operator = (const Worker&) = delete; + + Worker& operator = (Worker&&) ; + + std::shared_ptr> getPromise(); + +private: + std::function task; + std::shared_ptr> promise; +}; + +template< typename T> +Worker& Worker::operator = (Worker&& other) +{ + task = std::move(other.task); + promise = other.promise; + return *this; +} + + +template +std::shared_ptr> Worker::getPromise(){ + return promise; + } + +/** + * Thread pool + * Purpose: Provides a thread pool with basic functionality similar to + * ThreadPoolExecutor + * Design: Locked control over a manager thread that controls the worker threads + */ +template +class ThreadPool + { + public: + ThreadPool(int max_worker_threads, bool daemon_threads=false) : max_worker_threads_(max_worker_threads) + ,daemon_threads_(daemon_threads), running_(false){ + current_workers_ = 0; + } + virtual ~ThreadPool(){ + shutdown(); + } + + /** + * Execute accepts a worker task and returns + * a future + * @param task this thread pool will subsume ownership of + * the worker task + * @return future with the impending result. + */ + std::future execute(Worker &&task); + /** + * Starts the Thread Pool + */ + void start(); + /** + * Shutdown the thread pool and clear any + * currently running activities + */ + void shutdown(); + /** + * Set the max concurrent tasks. When this is done + * we must start and restart the thread pool if + * the number of tasks is less than the currently configured number + */ + void setMaxConcurrentTasks(uint16_t max) + { + std::lock_guard lock(manager_mutex_); + if (running_) + { + shutdown(); + } + max_worker_threads_= max; + if (!running_) + start(); + } + + protected: + + /** + * Drain will notify tasks to stop following notification + */ + void drain() + { + while(current_workers_ > 0) + { + tasks_available_.notify_one(); + } + } + // determines if threads are detached + bool daemon_threads_; + // max worker threads + int max_worker_threads_; + // current worker tasks. + std::atomic current_workers_; + // thread queue + std::vector thread_queue_; + // manager thread + std::thread manager_thread_; + // atomic running boolean + std::atomic running_; + // worker queue of worker objects + std::queue> worker_queue_; + // notification for available work + std::condition_variable tasks_available_; + // manager mutex + std::recursive_mutex manager_mutex_; + // work queue mutex + std::mutex worker_queue_mutex_; + + /** + * Call for the manager to start worker threads + */ + void startWorkers(); + + /** + * Runs worker tasks + */ + void run_tasks(); + }; + +template +std::future ThreadPool::execute(Worker &&task){ + + std::unique_lock lock(worker_queue_mutex_); + bool wasEmpty = worker_queue_.empty(); + std::future future = task.getPromise()->get_future(); + worker_queue_.push(std::move(task)); + if (wasEmpty) + { + tasks_available_.notify_one(); + } + return future; +} + +template< typename T> +void ThreadPool::startWorkers(){ + for (int i = 0; i < max_worker_threads_; i++) + { + thread_queue_.push_back( std::thread(&ThreadPool::run_tasks, this)); + current_workers_++; + } + + if (daemon_threads_) + { + for (auto &thread : thread_queue_){ + thread.detach(); + } + } + for (auto &thread : thread_queue_) + { + if (thread.joinable()) + thread.join(); + } +} +template< typename T> +void ThreadPool::run_tasks() +{ + while (running_.load()) + { + std::unique_lock lock(worker_queue_mutex_); + if (worker_queue_.empty()) + { + + tasks_available_.wait(lock); + } + + if (!running_.load()) + break; + + if (worker_queue_.empty()) + continue; + Worker task = std::move(worker_queue_.front()); + worker_queue_.pop(); + task.run(); + } + current_workers_--; + +} +template< typename T> + void ThreadPool::start() +{ + std::lock_guard lock(manager_mutex_); + if (!running_) + { + running_ = true; + manager_thread_ = std::thread(&ThreadPool::startWorkers, this); + + } +} + +template< typename T> +void ThreadPool::shutdown(){ + + std::lock_guard lock(manager_mutex_); + if (running_.load()) + { + + running_.store(false); + + drain(); + if (manager_thread_.joinable()) + manager_thread_.join(); + { + std::unique_lock lock(worker_queue_mutex_); + thread_queue_.clear(); + current_workers_ = 0; + while(!worker_queue_.empty()) + worker_queue_.pop(); + } + } +} + + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + + +#endif diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index e5703d116e..67a43dd641 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -28,14 +28,14 @@ namespace minifi { namespace core { ConfigurableComponent::ConfigurableComponent(std::shared_ptr logger) - : logger_(logger) { + : my_logger_(logger) { } ConfigurableComponent::ConfigurableComponent( const ConfigurableComponent &&other) : properties_(std::move(other.properties_)), - logger_(std::move(other.logger_)) { + my_logger_(std::move(other.my_logger_)) { } ConfigurableComponent::~ConfigurableComponent() { @@ -57,7 +57,7 @@ bool ConfigurableComponent::getProperty(const std::string name, if (it != properties_.end()) { Property item = it->second; value = item.getValue(); - logger_->log_info("Processor %s property name %s value %s", name.c_str(), + my_logger_->log_info("Processor %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); return true; } else { @@ -79,7 +79,7 @@ bool ConfigurableComponent::setProperty(const std::string name, Property item = it->second; item.setValue(value); properties_[item.getName()] = item; - logger_->log_info("Component %s property name %s value %s", name.c_str(), + my_logger_->log_info("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); return true; } else { @@ -101,7 +101,7 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { Property item = it->second; item.setValue(value); properties_[item.getName()] = item; - logger_->log_info("property name %s value %s", prop.getName().c_str(), + my_logger_->log_info("property name %s value %s", prop.getName().c_str(), item.getName().c_str(), value.c_str()); return true; } else { diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index c6472cc773..772e58369a 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -39,6 +39,10 @@ std::shared_ptr FlowConfiguration::createProcessor( == org::apache::nifi::minifi::processors::LogAttribute::ProcessorName) { processor = std::make_shared< org::apache::nifi::minifi::processors::LogAttribute>(name, uuid); + } else if (name + == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid); } else if (name == org::apache::nifi::minifi::processors::RealTimeDataCollector::ProcessorName) { processor = std::make_shared< @@ -63,10 +67,6 @@ std::shared_ptr FlowConfiguration::createProcessor( == org::apache::nifi::minifi::processors::ListenSyslog::ProcessorName) { processor = std::make_shared< org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid); } else if (name == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) { processor = std::make_shared< diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index e6fa7c4051..c4ba14c271 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -50,7 +50,7 @@ std::shared_ptr ProcessSession::create() { return record; } -std::shared_ptr ProcessSession::create(std::shared_ptr &parent) { +std::shared_ptr ProcessSession::create(std::shared_ptr &&parent) { std::map empty; std::shared_ptr record = std::make_shared(process_context_->getProvenanceRepository(), empty); @@ -530,6 +530,91 @@ void ProcessSession::read(std::shared_ptr &&flow, } } +void ProcessSession::importFrom(io::DataStream &stream, + std::shared_ptr &&flow) { + std::shared_ptr claim = std::make_shared(); + + int max_read = getpagesize(); + std::vector charBuffer; + charBuffer.resize(max_read); + + try { + std::ofstream fs; + uint64_t startTime = getTimeMillis(); + fs.open(claim->getContentFullPath().c_str(), + std::fstream::out | std::fstream::binary | std::fstream::trunc); + + + if (fs.is_open() ) { + + size_t position = 0; + const size_t max_size = stream.getSize(); + size_t read_size = max_read; + while(position < max_size) + { + if ((max_size - position) > max_read) + { + read_size = max_read; + } + else + { + read_size = max_size - position; + } + charBuffer.clear(); + stream.readData(charBuffer,read_size); + + fs.write((const char*)charBuffer.data(),read_size); + position+=read_size; + } + // Open the source file and stream to the flow file + + if (fs.good() && fs.tellp() >= 0) { + flow->setSize(fs.tellp()); + flow->setOffset(0); + if (flow->getResourceClaim() != nullptr) { + // Remove the old claim + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + flow->setResourceClaim(claim); + claim->increaseFlowFileRecordOwnedCount(); + + logger_->log_debug( + "Import offset %d length %d into content %s for FlowFile UUID %s", + flow->getOffset(), flow->getSize(), + flow->getResourceClaim()->getContentFullPath().c_str(), + flow->getUUIDStr().c_str()); + + fs.close(); + std::string details = process_context_->getProcessorNode().getName() + + " modify flow record content " + flow->getUUIDStr(); + uint64_t endTime = getTimeMillis(); + provenance_report_->modifyContent(flow, details, endTime - startTime); + } else { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + } else { + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + + } catch (std::exception &exception) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + if (flow && flow->getResourceClaim() == claim) { + flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + flow->clearResourceClaim(); + } + logger_->log_debug("Caught Exception during process session write"); + throw; + } +} + void ProcessSession::import(std::string source, std::shared_ptr &flow, bool keepSource, uint64_t offset) { @@ -639,6 +724,7 @@ void ProcessSession::import(std::string source, fs.write(buf, input.gcount()); } + if (fs.good() && fs.tellp() >= 0) { flow->setSize(fs.tellp()); flow->setOffset(0); diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index cf05657d65..4640ea9ec1 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -1,6 +1,4 @@ /** - * @file GetFile.cpp - * GetFile class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -87,8 +85,8 @@ core::Property GetFile::FileFilter( "File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*"); -core::Relationship GetFile::Success( - "success", "All files are routed to success"); +core::Relationship GetFile::Success("success", + "All files are routed to success"); void GetFile::initialize() { // Set the supported properties @@ -111,77 +109,78 @@ void GetFile::initialize() { setSupportedRelationships(relationships); } -void GetFile::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { +void GetFile::onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory) { std::string value; logger_->log_info("onTrigger GetFile"); if (context->getProperty(Directory.getName(), value)) { - _directory = value; + request_.directory = value; } if (context->getProperty(BatchSize.getName(), value)) { - core::Property::StringToInt(value, _batchSize); + core::Property::StringToInt(value, request_.batchSize); } if (context->getProperty(IgnoreHiddenFile.getName(), value)) { org::apache::nifi::minifi::utils::StringUtils::StringToBool( - value, _ignoreHiddenFile); + value, request_.ignoreHiddenFile); } if (context->getProperty(KeepSourceFile.getName(), value)) { org::apache::nifi::minifi::utils::StringUtils::StringToBool( - value, _keepSourceFile); + value, request_.keepSourceFile); } logger_->log_info("onTrigger GetFile"); if (context->getProperty(MaxAge.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, _maxAge, - unit) - && core::Property::ConvertTimeUnitToMS( - _maxAge, unit, _maxAge)) { + if (core::Property::StringToTime(value, request_.maxAge, unit) + && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit, + request_.maxAge)) { } } if (context->getProperty(MinAge.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, _minAge, - unit) - && core::Property::ConvertTimeUnitToMS( - _minAge, unit, _minAge)) { + if (core::Property::StringToTime(value, request_.minAge, unit) + && core::Property::ConvertTimeUnitToMS(request_.minAge, unit, + request_.minAge)) { } } if (context->getProperty(MaxSize.getName(), value)) { - core::Property::StringToInt(value, _maxSize); + core::Property::StringToInt(value, request_.maxSize); } if (context->getProperty(MinSize.getName(), value)) { - core::Property::StringToInt(value, _minSize); + core::Property::StringToInt(value, request_.minSize); } if (context->getProperty(PollInterval.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, - _pollInterval, - unit) - && core::Property::ConvertTimeUnitToMS( - _pollInterval, unit, _pollInterval)) { + if (core::Property::StringToTime(value, request_.pollInterval, unit) + && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit, + request_.pollInterval)) { } } if (context->getProperty(Recurse.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, - _recursive); + org::apache::nifi::minifi::utils::StringUtils::StringToBool( + value, request_.recursive); } if (context->getProperty(FileFilter.getName(), value)) { - _fileFilter = value; + request_.fileFilter = value; } +} + +void GetFile::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { // Perform directory list logger_->log_info("Is listing empty %i", isListingEmpty()); if (isListingEmpty()) { - if (_pollInterval == 0 - || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) { - performListing(_directory); + auto last_listing_time_ = getLastListingTime(request_.directory); + + if (request_.pollInterval == 0 + || (getTimeMillis() - last_listing_time_) > request_.pollInterval) { + performListing(request_.directory, request_); } } logger_->log_info("Is listing empty %i", isListingEmpty()); @@ -189,7 +188,7 @@ void GetFile::onTrigger( if (!isListingEmpty()) { try { std::queue list; - pollListing(list, _batchSize); + pollListing(list, request_); while (!list.empty()) { std::string fileName = list.front(); @@ -205,7 +204,7 @@ void GetFile::onTrigger( flowFile->updateKeyedAttribute(FILENAME, name); flowFile->updateKeyedAttribute(PATH, path); flowFile->addKeyedAttribute(ABSOLUTE_PATH, fileName); - session->import(fileName, flowFile, _keepSourceFile); + session->import(fileName, flowFile, request_.keepSourceFile); session->transfer(flowFile, Success); } } catch (std::exception &exception) { @@ -218,6 +217,24 @@ void GetFile::onTrigger( } +uint64_t GetFile::getLastListingTime(const std::string &directory) { + std::lock_guard lock(mutex_); + auto listing_time = last_listing_times_.find(directory); + if (listing_time == last_listing_times_.end()) { + last_listing_times_[directory] = getTimeMillis(); + listing_time = last_listing_times_.find(directory); + } + return listing_time->second.load(); +} + +void GetFile::updateListingTime(const std::string &directory) { + std::lock_guard lock(mutex_); + auto listing_time = last_listing_times_.find(directory); + if (listing_time != last_listing_times_.end()) { + listing_time->second.store(getTimeMillis()); + } +} + bool GetFile::isListingEmpty() { std::lock_guard lock(mutex_); @@ -230,10 +247,12 @@ void GetFile::putListing(std::string fileName) { _dirList.push(fileName); } -void GetFile::pollListing(std::queue &list, int maxSize) { +void GetFile::pollListing(std::queue &list, + const GetFileRequest &request) { std::lock_guard lock(mutex_); - while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize)) { + while (!_dirList.empty() + && (request.maxSize == 0 || list.size() < request.maxSize)) { std::string fileName = _dirList.front(); _dirList.pop(); list.push(fileName); @@ -242,37 +261,38 @@ void GetFile::pollListing(std::queue &list, int maxSize) { return; } -bool GetFile::acceptFile(std::string fullName, std::string name) { +bool GetFile::acceptFile(std::string fullName, std::string name, + const GetFileRequest &request) { struct stat statbuf; if (stat(fullName.c_str(), &statbuf) == 0) { - if (_minSize > 0 && statbuf.st_size < _minSize) + if (request.minSize > 0 && statbuf.st_size < request.minSize) return false; - if (_maxSize > 0 && statbuf.st_size > _maxSize) + if (request.maxSize > 0 && statbuf.st_size > request.maxSize) return false; uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); uint64_t fileAge = getTimeMillis() - modifiedTime; - if (_minAge > 0 && fileAge < _minAge) + if (request.minAge > 0 && fileAge < request.minAge) return false; - if (_maxAge > 0 && fileAge > _maxAge) + if (request.maxAge > 0 && fileAge > request.maxAge) return false; - if (_ignoreHiddenFile && fullName.c_str()[0] == '.') + if (request.ignoreHiddenFile && fullName.c_str()[0] == '.') return false; if (access(fullName.c_str(), R_OK) != 0) return false; - if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0) + if (request.keepSourceFile == false && access(fullName.c_str(), W_OK) != 0) return false; #ifdef __GNUC__ #if (__GNUC__ >= 4) #if (__GNUC_MINOR__ < 9) regex_t regex; - int ret = regcomp(®ex, _fileFilter.c_str(), 0); + int ret = regcomp(®ex, request.fileFilter.c_str(), 0); if (ret) return false; ret = regexec(®ex, name.c_str(), (size_t) 0, NULL, 0); @@ -281,7 +301,7 @@ bool GetFile::acceptFile(std::string fullName, std::string name) { return false; #else try { - std::regex re(_fileFilter); + std::regex re(fileFilter); if (!std::regex_match(name, re)) { return false; @@ -301,7 +321,7 @@ bool GetFile::acceptFile(std::string fullName, std::string name) { return false; } -void GetFile::performListing(std::string dir) { +void GetFile::performListing(std::string dir, const GetFileRequest &request) { logger_->log_info("Performing file listing against %s", dir.c_str()); DIR *d; d = opendir(dir.c_str()); @@ -310,6 +330,7 @@ void GetFile::performListing(std::string dir) { // only perform a listing while we are not empty logger_->log_info("Performing file listing against %s", dir.c_str()); while (isRunning()) { + updateListingTime(request.directory); struct dirent *entry; entry = readdir(d); if (!entry) @@ -317,14 +338,14 @@ void GetFile::performListing(std::string dir) { std::string d_name = entry->d_name; if ((entry->d_type & DT_DIR)) { // if this is a directory - if (_recursive && strcmp(d_name.c_str(), "..") != 0 + if (request.recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) { std::string path = dir + "/" + d_name; - performListing(path); + performListing(path, request); } } else { std::string fileName = dir + "/" + d_name; - if (acceptFile(fileName, d_name)) { + if (acceptFile(fileName, d_name, request)) { // check whether we can take this file putListing(fileName); } diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp index 85cf09b420..51fbb6f4fa 100644 --- a/libminifi/src/processors/PutFile.cpp +++ b/libminifi/src/processors/PutFile.cpp @@ -24,13 +24,13 @@ #include #include +#include "io/validation.h" #include "utils/StringUtils.h" #include "utils/TimeUtil.h" #include "processors/PutFile.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" - namespace org { namespace apache { namespace nifi { @@ -43,15 +43,16 @@ const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail"); const std::string PutFile::ProcessorName("PutFile"); -core::Property PutFile::Directory( - "Output Directory", "The output directory to which to put files", "."); +core::Property PutFile::Directory("Output Directory", + "The output directory to which to put files", + "."); core::Property PutFile::ConflictResolution( "Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory", CONFLICT_RESOLUTION_STRATEGY_FAIL); -core::Relationship PutFile::Success( - "success", "All files are routed to success"); +core::Relationship PutFile::Success("success", + "All files are routed to success"); core::Relationship PutFile::Failure( "failure", "Failed files (conflict, write failure, etc.) are transferred to failure"); @@ -69,25 +70,30 @@ void PutFile::initialize() { setSupportedRelationships(relationships); } -void PutFile::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { - std::string directory; +void PutFile::onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory) { - if (!context->getProperty(Directory.getName(), directory)) { + if (!context->getProperty(Directory.getName(), directory_)) { logger_->log_error("Directory attribute is missing or invalid"); - return; } - std::string conflictResolution; - - if (!context->getProperty(ConflictResolution.getName(), conflictResolution)) { + if (!context->getProperty(ConflictResolution.getName(), + conflict_resolution_)) { logger_->log_error( "Conflict Resolution Strategy attribute is missing or invalid"); + } + +} +void PutFile::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + + if (IsNullOrEmpty(directory_) || IsNullOrEmpty(conflict_resolution_)) { + context->yield(); return; } - std::shared_ptr flowFile = std::static_pointer_cast(session->get()); + std::shared_ptr flowFile = std::static_pointer_cast< + FlowFileRecord>(session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -103,17 +109,17 @@ void PutFile::onTrigger( uuid_generate(tmpFileUuid); uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr); std::stringstream tmpFileSs; - tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr; + tmpFileSs << directory_ << "/." << filename << "." << tmpFileUuidStr; std::string tmpFile = tmpFileSs.str(); logger_->log_info("PutFile using temporary file %s", tmpFile.c_str()); // Determine dest full file paths std::stringstream destFileSs; - destFileSs << directory << "/" << filename; + destFileSs << directory_ << "/" << filename; std::string destFile = destFileSs.str(); logger_->log_info("PutFile writing file %s into directory %s", - filename.c_str(), directory.c_str()); + filename.c_str(), directory_.c_str()); // If file exists, apply conflict resolution strategy struct stat statResult; @@ -121,11 +127,11 @@ void PutFile::onTrigger( if (stat(destFile.c_str(), &statResult) == 0) { logger_->log_info( "Destination file %s exists; applying Conflict Resolution Strategy: %s", - destFile.c_str(), conflictResolution.c_str()); + destFile.c_str(), conflict_resolution_.c_str()); - if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE) { + if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) { putFile(session, flowFile, tmpFile, destFile); - } else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE) { + } else if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_IGNORE) { session->transfer(flowFile, Success); } else { session->transfer(flowFile, Failure); @@ -136,8 +142,8 @@ void PutFile::onTrigger( } bool PutFile::putFile(core::ProcessSession *session, - std::shared_ptr flowFile, const std::string &tmpFile, - const std::string &destFile) { + std::shared_ptr flowFile, + const std::string &tmpFile, const std::string &destFile) { ReadCallback cb(tmpFile, destFile); session->read(flowFile, &cb); @@ -205,7 +211,6 @@ PutFile::ReadCallback::~ReadCallback() { unlink(_tmpFile.c_str()); } - } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index 859daa6bdf..0c0b6bf2f8 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -145,10 +145,10 @@ static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) { return (i.modifiedTime < j.modifiedTime); } -void TailFile::checkRollOver() { +void TailFile::checkRollOver(std::string fileLocation, std::string fileName) { struct stat statbuf; std::vector matchedFiles; - std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + std::string fullPath = fileLocation + "/" + _currentTailFileName; if (stat(fullPath.c_str(), &statbuf) == 0) { if (statbuf.st_size > this->_currentTailFilePosition) @@ -157,12 +157,12 @@ void TailFile::checkRollOver() { uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000); - std::string pattern = _fileName; - std::size_t found = _fileName.find_last_of("."); + std::string pattern = fileName; + std::size_t found = fileName.find_last_of("."); if (found != std::string::npos) - pattern = _fileName.substr(0, found); + pattern = fileName.substr(0, found); DIR *d; - d = opendir(this->_fileLocation.c_str()); + d = opendir(fileLocation.c_str()); if (!d) return; while (1) { @@ -173,7 +173,7 @@ void TailFile::checkRollOver() { std::string d_name = entry->d_name; if (!(entry->d_type & DT_DIR)) { std::string fileName = d_name; - std::string fileFullName = this->_fileLocation + "/" + d_name; + std::string fileFullName = fileLocation + "/" + d_name; if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) { if (((uint64_t) (statbuf.st_mtime) * 1000) @@ -215,24 +215,28 @@ void TailFile::checkRollOver() { void TailFile::onTrigger( core::ProcessContext *context, core::ProcessSession *session) { + + std::lock_guard tail_lock(tail_file_mutex_); std::string value; + std::string fileLocation=""; + std::string fileName=""; if (context->getProperty(FileName.getName(), value)) { std::size_t found = value.find_last_of("/\\"); - this->_fileLocation = value.substr(0, found); - this->_fileName = value.substr(found + 1); + fileLocation = value.substr(0, found); + fileName = value.substr(found + 1); } if (context->getProperty(StateFile.getName(), value)) { _stateFile = value + "." + getUUIDStr(); } if (!this->_stateRecovered) { _stateRecovered = true; - this->_currentTailFileName = _fileName; + this->_currentTailFileName = fileName; this->_currentTailFilePosition = 0; // recover the state if we have not done so this->recoverState(); } - checkRollOver(); - std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + checkRollOver(fileLocation,fileName); + std::string fullPath = fileLocation + "/" + _currentTailFileName; struct stat statbuf; if (stat(fullPath.c_str(), &statbuf) == 0) { if (statbuf.st_size <= this->_currentTailFilePosition) @@ -241,13 +245,13 @@ void TailFile::onTrigger( context->yield(); return; } - std::shared_ptr flowFile = std::static_pointer_cast(session->create());; + std::shared_ptr flowFile = std::static_pointer_cast(session->create()); if (!flowFile) return; std::size_t found = _currentTailFileName.find_last_of("."); std::string baseName = _currentTailFileName.substr(0, found); std::string extension = _currentTailFileName.substr(found + 1); - flowFile->updateKeyedAttribute(PATH, _fileLocation); + flowFile->updateKeyedAttribute(PATH, fileLocation); flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); session->import(fullPath, flowFile, true, this->_currentTailFilePosition); session->transfer(flowFile, Success); diff --git a/libminifi/test/TestExecuteProcess.cpp b/libminifi/test/TestExecuteProcess.cpp new file mode 100644 index 0000000000..cbf9df6e96 --- /dev/null +++ b/libminifi/test/TestExecuteProcess.cpp @@ -0,0 +1,136 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "FlowController.h" +#include "unit/ProvenanceTestHelper.h" +#include "core/logging/LogAppenders.h" +#include "core/logging/BaseLogger.h" +#include "processors/GetFile.h" +#include "core/core.h" +#include "core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" + +int main(int argc, char **argv) +{ + + std::ostringstream oss; + std::unique_ptr outputLogger = std::unique_ptr< + logging::BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + + + outputLogger = std::unique_ptr( + new org::apache::nifi::minifi::core::logging::NullAppender()); + logger->updateLogger(std::move(outputLogger)); + + std::shared_ptr processor = std::make_shared< + org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess"); + processor->setMaxConcurrentTasks(1); + + std::shared_ptr test_repo = + std::make_shared(); + + std::shared_ptr repo = + std::static_pointer_cast(test_repo); + std::shared_ptr controller = std::make_shared< + TestFlowController>(test_repo, test_repo); + + uuid_t processoruuid; + assert(true == processor->getUUID(processoruuid)); + + std::shared_ptr connection = std::make_shared< + minifi::Connection>(test_repo, "executeProcessConnection"); + connection->setRelationship(core::Relationship("success", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(processor); + connection->setDestination(processor); + + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(processoruuid); + + processor->addConnection(connection); + assert(processor->getName() == "executeProcess"); + + std::shared_ptr record; + processor->setScheduledState(core::ScheduledState::RUNNING); + + processor->initialize(); + + std::atomic is_ready(false); + + std::vector processor_workers; + + core::ProcessorNode node2(processor); + std::shared_ptr contextset = std::make_shared< + core::ProcessContext>(node2, test_repo); + core::ProcessSessionFactory factory(contextset.get()); + // processor->onSchedule(contextset.get(), &factory); + /* + for (int i = 0; i < 1; i++) { + // + processor_workers.push_back( + std::thread( + [processor,test_repo,&is_ready]() + { + core::ProcessorNode node(processor); + std::shared_ptr context = std::make_shared(node, test_repo); + context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,"sleep 0.5"); + //context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::CommandArguments," 1 >>" + ss.str()); + std::shared_ptr session = std::make_shared(context.get()); + while(!is_ready.load(std::memory_order_relaxed)) { + + } +*/ + core::ProcessorNode node(processor); + std::shared_ptr context = std::make_shared(node, test_repo); + context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,"sleep 0.5"); + //context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::CommandArguments," 1 >>" + ss.str()); + std::shared_ptr session = std::make_shared(context.get()); + processor->onTrigger(context.get(), session.get()); +/* + })); + } + + is_ready.store(true, std::memory_order_relaxed); + //is_ready.store(true); + + std::for_each(processor_workers.begin(), processor_workers.end(), + [](std::thread &t) + { + t.join(); + }); +*/ + outputLogger = std::unique_ptr( + new org::apache::nifi::minifi::core::logging::NullAppender()); + logger->updateLogger(std::move(outputLogger)); + + + std::shared_ptr execp = + std::static_pointer_cast< + org::apache::nifi::minifi::processors::ExecuteProcess>(processor); + +} diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 4f08d5df54..68cc14ef64 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -41,17 +41,16 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { testController.enableDebug(); - - std::shared_ptr processor = std::make_shared< org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - std::shared_ptr test_repo = std::make_shared(); - - std::shared_ptr repo = std::static_pointer_cast(test_repo); - std::shared_ptr controller = std::make_shared(test_repo, test_repo); + std::shared_ptr test_repo = + std::make_shared(); - + std::shared_ptr repo = + std::static_pointer_cast(test_repo); + std::shared_ptr controller = std::make_shared< + TestFlowController>(test_repo, test_repo); char format[] = "/tmp/gt.XXXXXX"; char *dir = testController.createTempDirectory(format); @@ -59,9 +58,8 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { uuid_t processoruuid; REQUIRE(true == processor->getUUID(processoruuid)); - std::shared_ptr connection = std::make_shared< - minifi::Connection>(test_repo,"getfileCreate2Connection"); + minifi::Connection>(test_repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); // link the connections so that we can test results at the end for this @@ -77,11 +75,12 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { core::ProcessorNode node(processor); core::ProcessContext context(node, test_repo); + core::ProcessSessionFactory factory(&context); context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir); core::ProcessSession session(&context); - + processor->onSchedule(&context, &factory); REQUIRE(processor->getName() == "getfileCreate2"); std::shared_ptr record; @@ -108,7 +107,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { rmdir(dir); reporter = session.getProvenanceReporter(); - REQUIRE( processor->getName() == "getfileCreate2"); + REQUIRE(processor->getName() == "getfileCreate2"); records = reporter->getEvents(); @@ -141,7 +140,104 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { if (!found) throw std::runtime_error("Did not find record"); + } + +} + +TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { + + TestController testController; + + testController.enableDebug(); + + std::shared_ptr processor = std::make_shared< + org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + + std::shared_ptr test_repo = + std::make_shared(); + + std::shared_ptr repo = + std::static_pointer_cast(test_repo); + std::shared_ptr controller = std::make_shared< + TestFlowController>(test_repo, test_repo); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + + std::shared_ptr connection = std::make_shared< + minifi::Connection>(test_repo, "getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(processor); + connection->setDestination(processor); + + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(processoruuid); + + processor->addConnection(connection); + REQUIRE(dir != NULL); + + core::ProcessorNode node(processor); + core::ProcessContext context(node, test_repo); + core::ProcessSessionFactory factory(&context); + context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, + dir); + // replicate 10 threads + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onSchedule(&context, &factory); + + for (int i = 0; i < 10; i++) { + + core::ProcessSession session(&context); + REQUIRE(processor->getName() == "getfileCreate2"); + + std::shared_ptr record; + + processor->onTrigger(&context, &session); + + provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + std::set records = + reporter->getEvents(); + record = session.get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(&context, &session); + unlink(ss.str().c_str()); + rmdir(dir); + reporter = session.getProvenanceReporter(); + + REQUIRE(processor->getName() == "getfileCreate2"); + + records = reporter->getEvents(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == processor->getName()); + } + session.commit(); + std::shared_ptr ffr = session.get(); + + REQUIRE(2 == repo->getRepoMap().size()); + + for (auto entry : repo->getRepoMap()) { + provenance::ProvenanceEventRecord newRecord; + newRecord.DeSerialize((uint8_t*) entry.second.data(), + entry.second.length()); + + } } } @@ -159,9 +255,7 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { testController.enableDebug(); - - std::shared_ptr repo = std::make_shared< - TestRepository>(); + std::shared_ptr repo = std::make_shared(); std::shared_ptr processor = std::make_shared< org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); @@ -178,25 +272,22 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { uuid_t logattribute_uuid; REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); - std::shared_ptr connection = std::make_shared< - minifi::Connection>(repo,"getfileCreate2Connection"); + minifi::Connection>(repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); std::shared_ptr connection2 = std::make_shared< - minifi::Connection>(repo,"logattribute"); + minifi::Connection>(repo, "logattribute"); connection2->setRelationship(core::Relationship("success", "description")); // link the connections so that we can test results at the end for this connection->setSource(processor); - // link the connections so that we can test results at the end for this connection->setDestination(logAttribute); connection2->setSource(logAttribute); - connection2->setSourceUUID(logattribute_uuid); connection->setSourceUUID(processoruuid); connection->setDestinationUUID(logattribute_uuid); @@ -220,10 +311,15 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { std::shared_ptr record; processor->setScheduledState(core::ScheduledState::RUNNING); + + core::ProcessSessionFactory factory(&context); + processor->onSchedule(&context, &factory); processor->onTrigger(&context, &session); logAttribute->incrementActiveTasks(); logAttribute->setScheduledState(core::ScheduledState::RUNNING); + core::ProcessSessionFactory factory2(&context2); + logAttribute->onSchedule(&context2, &factory2); logAttribute->onTrigger(&context2, &session2); provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); @@ -272,3 +368,13 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { logger->updateLogger(std::move(outputLogger)); } + +int fileSize(const char *add) { + std::ifstream mySource; + mySource.open(add, std::ios_base::binary); + mySource.seekg(0, std::ios_base::end); + int size = mySource.tellg(); + mySource.close(); + return size; +} + diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index cb8f520888..1e16aa6981 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -130,7 +130,6 @@ class TestFlowController : public minifi::FlowController { } protected: void initializePaths(const std::string &adjustedFilename) { - std::cout << "what" << std::endl; } }; From 03759a4f34530844f1d08de1fb21d2e9b6991ea4 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Thu, 30 Mar 2017 10:58:15 -0400 Subject: [PATCH 2/2] MINIFI-236: Updates per pull request --- .../include/core/ConfigurableComponent.h | 13 +++++++----- libminifi/include/core/FlowConfiguration.h | 2 +- libminifi/include/core/ProcessSession.h | 8 ++++++- libminifi/include/processors/GetFile.h | 13 +++++++----- libminifi/include/processors/TailFile.h | 5 ++++- libminifi/src/core/FlowConfiguration.cpp | 8 +++---- libminifi/src/core/ProcessSession.cpp | 6 ++++++ libminifi/src/processors/GetFile.cpp | 21 +------------------ libminifi/src/processors/TailFile.cpp | 2 +- libminifi/test/TestExecuteProcess.cpp | 15 +++++-------- 10 files changed, 45 insertions(+), 48 deletions(-) diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index b0b8739cdc..d46216b9b9 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -19,8 +19,14 @@ #ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_ #define LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_ +#include +#include +#include +#include +#include + +#include "logging/Logger.h" #include "Property.h" -#include "core/logging/Logger.h" namespace org { namespace apache { @@ -32,13 +38,11 @@ namespace core { * Represents a configurable component * Purpose: Extracts configuration items for all components and localized them */ -class ConfigurableComponent { +class ConfigurableComponent { public: - ConfigurableComponent() = delete; - explicit ConfigurableComponent(std::shared_ptr logger); explicit ConfigurableComponent(const ConfigurableComponent &&other); @@ -81,7 +85,6 @@ class ConfigurableComponent { protected: - /** * Returns true if the instance can be edited. * @return true/false diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 87505abcba..e95e6846e0 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -25,8 +25,8 @@ #include "processors/GetFile.h" #include "processors/PutFile.h" #include "processors/TailFile.h" -#include "processors/ListenSyslog.h" #include "processors/ListenHTTP.h" +#include "processors/ListenSyslog.h" #include "processors/GenerateFlowFile.h" #include "processors/RealTimeDataCollector.h" #include "processors/LogAttribute.h" diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index acde2456da..a80769e80e 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -127,8 +127,14 @@ class ProcessSession { // Penalize the flow void penalize(std::shared_ptr &flow); void penalize(std::shared_ptr &&flow); -// Import the existed file into the flow + + /** + * Imports a file from the data stream + * @param stream incoming data stream that contains the data to store into a file + * @param flow flow file + */ void importFrom(io::DataStream &stream, std::shared_ptr &&flow); + // import from the data source. void import(std::string source, std::shared_ptr &flow, bool keepSource = true, uint64_t offset = 0); void import(std::string source, std::shared_ptr &&flow, diff --git a/libminifi/include/processors/GetFile.h b/libminifi/include/processors/GetFile.h index 4e96624d7d..cc3beaaad0 100644 --- a/libminifi/include/processors/GetFile.h +++ b/libminifi/include/processors/GetFile.h @@ -91,12 +91,13 @@ class GetFile : public core::Processor { core::ProcessSessionFactory *sessionFactory); // Initialize, over write by NiFi GetFile virtual void initialize(void); - // perform directory listing + /** + * performs a listeing on the directory. + * @param dir directory to list + * @param request get file request. + */ void performListing(std::string dir,const GetFileRequest &request); - uint64_t getLastListingTime(const std::string &directory); - - void updateListingTime(const std::string &directory); protected: private: @@ -122,7 +123,9 @@ class GetFile : public core::Processor { std::mutex mutex_; - std::map> last_listing_times_; + // last listing time for root directory ( if recursive, we will consider the root + // as the top level time. + std::atomic last_listing_time_; }; diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index 17507cdfe3..c7b7b468f9 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -80,7 +80,10 @@ class TailFile : public core::Processor { std::string trimLeft(const std::string& s); std::string trimRight(const std::string& s); void parseStateFileLine(char *buf); - void checkRollOver(std::string, std::string); + /** + * Check roll over for the provided file. + */ + void checkRollOver(const std::string &, const std::string&); }; diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 772e58369a..f2dda0d708 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -39,10 +39,6 @@ std::shared_ptr FlowConfiguration::createProcessor( == org::apache::nifi::minifi::processors::LogAttribute::ProcessorName) { processor = std::make_shared< org::apache::nifi::minifi::processors::LogAttribute>(name, uuid); - } else if (name - == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid); } else if (name == org::apache::nifi::minifi::processors::RealTimeDataCollector::ProcessorName) { processor = std::make_shared< @@ -67,6 +63,10 @@ std::shared_ptr FlowConfiguration::createProcessor( == org::apache::nifi::minifi::processors::ListenSyslog::ProcessorName) { processor = std::make_shared< org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid); + } else if (name + == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) { + processor = std::make_shared< + org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid); } else if (name == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) { processor = std::make_shared< diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index c4ba14c271..09c3fa3a5a 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -530,6 +530,12 @@ void ProcessSession::read(std::shared_ptr &&flow, } } +/** + * Imports a file from the data stream + * @param stream incoming data stream that contains the data to store into a file + * @param flow flow file + * + */ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr &&flow) { std::shared_ptr claim = std::make_shared(); diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index 4640ea9ec1..652caf7749 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -176,11 +176,11 @@ void GetFile::onTrigger(core::ProcessContext *context, // Perform directory list logger_->log_info("Is listing empty %i", isListingEmpty()); if (isListingEmpty()) { - auto last_listing_time_ = getLastListingTime(request_.directory); if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) { performListing(request_.directory, request_); + last_listing_time_.store(getTimeMillis()); } } logger_->log_info("Is listing empty %i", isListingEmpty()); @@ -217,24 +217,6 @@ void GetFile::onTrigger(core::ProcessContext *context, } -uint64_t GetFile::getLastListingTime(const std::string &directory) { - std::lock_guard lock(mutex_); - auto listing_time = last_listing_times_.find(directory); - if (listing_time == last_listing_times_.end()) { - last_listing_times_[directory] = getTimeMillis(); - listing_time = last_listing_times_.find(directory); - } - return listing_time->second.load(); -} - -void GetFile::updateListingTime(const std::string &directory) { - std::lock_guard lock(mutex_); - auto listing_time = last_listing_times_.find(directory); - if (listing_time != last_listing_times_.end()) { - listing_time->second.store(getTimeMillis()); - } -} - bool GetFile::isListingEmpty() { std::lock_guard lock(mutex_); @@ -330,7 +312,6 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) { // only perform a listing while we are not empty logger_->log_info("Performing file listing against %s", dir.c_str()); while (isRunning()) { - updateListingTime(request.directory); struct dirent *entry; entry = readdir(d); if (!entry) diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index 0c0b6bf2f8..bcdd8fd362 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -145,7 +145,7 @@ static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) { return (i.modifiedTime < j.modifiedTime); } -void TailFile::checkRollOver(std::string fileLocation, std::string fileName) { +void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) { struct stat statbuf; std::vector matchedFiles; std::string fullPath = fileLocation + "/" + _currentTailFileName; diff --git a/libminifi/test/TestExecuteProcess.cpp b/libminifi/test/TestExecuteProcess.cpp index cbf9df6e96..6aa51fc4b6 100644 --- a/libminifi/test/TestExecuteProcess.cpp +++ b/libminifi/test/TestExecuteProcess.cpp @@ -88,8 +88,8 @@ int main(int argc, char **argv) std::shared_ptr contextset = std::make_shared< core::ProcessContext>(node2, test_repo); core::ProcessSessionFactory factory(contextset.get()); - // processor->onSchedule(contextset.get(), &factory); - /* + processor->onSchedule(contextset.get(), &factory); + for (int i = 0; i < 1; i++) { // processor_workers.push_back( @@ -104,14 +104,9 @@ int main(int argc, char **argv) while(!is_ready.load(std::memory_order_relaxed)) { } -*/ - core::ProcessorNode node(processor); - std::shared_ptr context = std::make_shared(node, test_repo); - context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,"sleep 0.5"); - //context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::CommandArguments," 1 >>" + ss.str()); - std::shared_ptr session = std::make_shared(context.get()); + processor->onTrigger(context.get(), session.get()); -/* + })); } @@ -123,7 +118,7 @@ int main(int argc, char **argv) { t.join(); }); -*/ + outputLogger = std::unique_ptr( new org::apache::nifi::minifi::core::logging::NullAppender()); logger->updateLogger(std::move(outputLogger));