From 7f052c84312ba988a016de126f334576f6f121ed Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 30 Jun 2017 10:05:15 -0400 Subject: [PATCH 1/8] MINIFI-338: Convert processor threads to use thread pools --- .../include/EventDrivenSchedulingAgent.h | 2 +- libminifi/include/SchedulingAgent.h | 2 +- libminifi/include/ThreadedSchedulingAgent.h | 56 +++++- .../include/TimerDrivenSchedulingAgent.h | 2 +- libminifi/include/core/Processor.h | 5 +- libminifi/include/utils/ThreadPool.h | 180 +++++++++++++++++- libminifi/src/EventDrivenSchedulingAgent.cpp | 13 +- libminifi/src/FlowController.cpp | 15 +- libminifi/src/SchedulingAgent.cpp | 4 +- libminifi/src/ThreadedSchedulingAgent.cpp | 44 +++-- libminifi/src/TimerDrivenSchedulingAgent.cpp | 13 +- libminifi/test/unit/SocketTests.cpp | 2 +- libminifi/test/unit/ThreadPoolTests.cpp | 2 +- 13 files changed, 281 insertions(+), 59 deletions(-) diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index c838b11f2b..ca9f0211e1 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -46,7 +46,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { virtual ~EventDrivenSchedulingAgent() { } // Run function for the thread - void run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); + uint64_t run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); private: // Prevent default copy constructor and assignment operation diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 1ff3faca76..130c0884bc 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -84,7 +84,7 @@ class SchedulingAgent { running_ = true; } // stop - void stop() { + virtual void stop() { running_ = false; component_lifecycle_thread_pool_.shutdown(); } diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index b4db4bf301..27b8b3a731 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -20,6 +20,7 @@ #ifndef __THREADED_SCHEDULING_AGENT_H__ #define __THREADED_SCHEDULING_AGENT_H__ +#include #include "properties/Configure.h" #include "core/logging/LoggerConfiguration.h" #include "core/Processor.h" @@ -32,6 +33,47 @@ namespace apache { namespace nifi { namespace minifi { +/** + * Uses the wait time for a given worker to determine if it is eligible to run + */ +class TimerAwareMonitor : public utils::AfterExecute { + public: + TimerAwareMonitor(std::atomic *run_monitor) + : run_monitor_(run_monitor), + current_wait_(0) { + + } + explicit TimerAwareMonitor(TimerAwareMonitor &&other) + : AfterExecute(std::move(other)), + run_monitor_(std::move(other.run_monitor_)) { + current_wait_.store(other.current_wait_.load()); + } + virtual bool isFinished(const uint64_t &result) { + current_wait_.store(result); + if (*run_monitor_) { + return false; + } + return true; + } + virtual bool isCancelled(const uint64_t &result) { + if (*run_monitor_) { + return false; + } + return true; + } + /** + * Time to wait before re-running this task if necessary + * @return milliseconds since epoch after which we are eligible to re-run this task. + */ + virtual int64_t wait_time() { + return current_wait_.load(); + } + private: + + std::atomic current_wait_; + std::atomic *run_monitor_; +}; + /** * An abstract scheduling agent which creates and manages a pool of threads for * each processor scheduled. @@ -48,13 +90,18 @@ class ThreadedSchedulingAgent : public SchedulingAgent { std::shared_ptr configuration) : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration), logger_(logging::LoggerFactory::getLogger()) { + + utils::ThreadPool pool = utils::ThreadPool(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true); + thread_pool_ = std::move(pool); + thread_pool_.start(); + } // Destructor virtual ~ThreadedSchedulingAgent() { } // Run function for the thread - virtual void run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0; + virtual uint64_t run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) = 0; public: // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent @@ -62,9 +109,12 @@ class ThreadedSchedulingAgent : public SchedulingAgent { // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent virtual void unschedule(std::shared_ptr processor); + virtual void stop(); + protected: + utils::ThreadPool thread_pool_; + protected: - // Threads - std::map> _threads; + private: // Prevent default copy constructor and assignment operation diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 816bcec656..1502c4761c 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -49,7 +49,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { /** * Run function that accepts the processor, context and session factory. */ - void run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); + uint64_t run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory); private: // Prevent default copy constructor and assignment operation diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 251ec4772d..0853c11594 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -220,6 +220,9 @@ class Processor : public Connectable, public ConfigurableComponent, public std:: virtual void onSchedule(ProcessContext *context, ProcessSessionFactory *sessionFactory) { } + // Check all incoming connections for work + bool isWorkAvailable(); + protected: // Processor state @@ -246,8 +249,6 @@ class Processor : public Connectable, public ConfigurableComponent, public std:: // Yield Expiration std::atomic yield_expiration_; - // Check all incoming connections for work - bool isWorkAvailable(); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer Processor(const Processor &parent); diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 77772cdde6..8ff39751fb 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -32,6 +32,35 @@ namespace nifi { namespace minifi { namespace utils { +/** + * Worker task helper that determines + * whether or not we will run + */ +template +class AfterExecute { + public: + virtual ~AfterExecute() { + + } + + explicit AfterExecute() { + + } + + explicit AfterExecute(AfterExecute &&other) { + + } + virtual bool isFinished(const T &result) = 0; + virtual bool isCancelled(const T &result) = 0; + /** + * Time to wait before re-running this task if necessary + * @return milliseconds since epoch after which we are eligible to re-run this task. + */ + virtual int64_t wait_time() { + return 0; + } +}; + /** * Worker task * purpose: Provides a wrapper for the functor @@ -40,12 +69,29 @@ namespace utils { template class Worker { public: - explicit Worker(std::function &task) - : task(task) { + explicit Worker(std::function &task, const std::string &identifier, std::unique_ptr> run_determinant) + : task(task), + run_determinant_(std::move(run_determinant)), + identifier_(identifier), + time_slice_(0) { + promise = std::make_shared>(); + } + + explicit Worker(std::function &task, const std::string &identifier) + : task(task), + run_determinant_(nullptr), + identifier_(identifier), + time_slice_(0) { promise = std::make_shared>(); } - explicit Worker() { + explicit Worker(const std::string identifier = "") + : identifier_(identifier), + time_slice_(0) { + } + + virtual ~Worker() { + } /** @@ -53,16 +99,35 @@ class Worker { */ Worker(Worker &&other) : task(std::move(other.task)), - promise(other.promise) { + promise(other.promise), + time_slice_(std::move(other.time_slice_)), + identifier_(std::move(other.identifier_)), + run_determinant_(std::move(other.run_determinant_)) { } /** - * Runs the task and takes the output from the funtor + * Runs the task and takes the output from the functor * setting the result into the promise + * @return whether or not to continue running + * false == finished || error + * true == run again */ - void run() { + virtual bool run() { T result = task(); - promise->set_value(result); + if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) || run_determinant_->isCancelled(result))) { + promise->set_value(result); + return false; + } + time_slice_ = increment_time(run_determinant_->wait_time()); + return true; + } + + virtual void setIdentifier(const std::string identifier) { + identifier_ = identifier; + } + + virtual uint64_t getTimeSlice() { + return time_slice_; } Worker(const Worker&) = delete; @@ -72,8 +137,22 @@ class Worker { std::shared_ptr> getPromise(); - private: + const std::string &getIdentifier() { + return identifier_; + } + protected: + + inline uint64_t increment_time(const uint64_t &time) { + std::chrono::time_point now = + std::chrono::system_clock::now(); + auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); + return millis + time; + } + + std::string identifier_; + uint64_t time_slice_; std::function task; + std::unique_ptr> run_determinant_; std::shared_ptr> promise; }; @@ -81,6 +160,9 @@ template Worker& Worker::operator =(Worker && other) { task = std::move(other.task); promise = other.promise; + time_slice_ = std::move(other.time_slice_); + identifier_ = std::move(other.identifier_); + run_determinant_ = std::move(other.run_determinant_); return *this; } @@ -125,6 +207,21 @@ class ThreadPool { * @return true if future can be created and thread pool is in a running state. */ bool execute(Worker &&task, std::future &future); + + /** + * attempts to stop tasks with the provided identifier. + * @param identifier for worker tasks. Note that these tasks won't + * immediately stop. + */ + void stopTasks(const std::string &identifier); + + /** + * Returns true if a task is running. + */ + bool isRunning(const std::string &identifier) { + return task_status_[identifier] == true; + } + /** * Starts the Thread Pool */ @@ -199,6 +296,8 @@ class ThreadPool { moodycamel::ConcurrentQueue> worker_queue_; // notification for available work std::condition_variable tasks_available_; + // map to identify if a task should be + std::map task_status_; // manager mutex std::recursive_mutex manager_mutex_; // work queue mutex @@ -218,6 +317,10 @@ class ThreadPool { template bool ThreadPool::execute(Worker &&task, std::future &future) { + { + std::unique_lock lock(worker_queue_mutex_); + task_status_[task.getIdentifier()] = true; + } future = std::move(task.getPromise()->get_future()); bool enqueued = worker_queue_.enqueue(std::move(task)); if (running_) { @@ -246,15 +349,67 @@ void ThreadPool::startWorkers() { template void ThreadPool::run_tasks() { auto waitperiod = std::chrono::milliseconds(1) * 100; + uint64_t wait_decay_ = 0; while (running_.load()) { + // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible + // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially + // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should + // be more likely to run. This is intentional. + if (wait_decay_ > 1000) { + std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); + } Worker task; if (!worker_queue_.try_dequeue(task)) { + std::unique_lock lock(worker_queue_mutex_); tasks_available_.wait_for(lock, waitperiod); continue; } - task.run(); + else { + + std::unique_lock lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { + continue; + } + } + + bool wait_to_run = false; + if (task.getTimeSlice() > 1) { + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto ms = std::chrono::duration_cast(now); + if (task.getTimeSlice() > ms.count()) { + wait_to_run = true; + } + } + // if we have to wait we re-queue the worker. + if (wait_to_run) { + { + std::unique_lock lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { + continue; + } + } + worker_queue_.enqueue(std::move(task)); + + wait_decay_ += 100; + continue; + } + + const bool task_renew = task.run(); + wait_decay_ = 0; + if (task_renew) { + + { + // even if we have more work to do we will not + std::unique_lock lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { + continue; + } + } + worker_queue_.enqueue(std::move(task)); + + } } current_workers_--; @@ -271,6 +426,12 @@ void ThreadPool::start() { } } +template +void ThreadPool::stopTasks(const std::string &identifier) { + std::unique_lock lock(worker_queue_mutex_); + task_status_[identifier] = false; +} + template void ThreadPool::shutdown() { if (running_.load()) { @@ -278,6 +439,7 @@ void ThreadPool::shutdown() { running_.store(false); drain(); + task_status_.clear(); if (manager_thread_.joinable()) manager_thread_.join(); { diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index 8a2a874588..db5ca0815f 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -32,22 +32,27 @@ namespace apache { namespace nifi { namespace minifi { -void EventDrivenSchedulingAgent::run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { +uint64_t EventDrivenSchedulingAgent::run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { while (this->running_) { bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { // Honor the yield - std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); + return processor->getYieldTime(); } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure - std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_)); + return this->bored_yield_duration_; } // Block until work is available + processor->waitForWork(1000); + + if (!processor->isWorkAvailable()) { + return 1000; + } } - return; + return 0; } } /* namespace minifi */ diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 6358ed0515..32fd29801e 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -183,17 +183,16 @@ void FlowController::stop(bool force) { std::lock_guard < std::recursive_mutex > flow_lock(mutex_); if (running_) { // immediately indicate that we are not running - running_ = false; - logger_->log_info("Stop Flow Controller"); - this->timer_scheduler_->stop(); - this->event_scheduler_->stop(); - this->flow_file_repo_->stop(); - this->provenance_repo_->stop(); - // Wait for sometime for thread stop - std::this_thread::sleep_for(std::chrono::milliseconds(3000)); if (this->root_) this->root_->stopProcessing(this->timer_scheduler_.get(), this->event_scheduler_.get()); + this->flow_file_repo_->stop(); + this->provenance_repo_->stop(); + // stop after we've attempted to stop the processors. + this->timer_scheduler_->stop(); + this->event_scheduler_->stop(); + running_ = false; + } } diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 1060830ae7..e228ba5d8e 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -46,7 +46,7 @@ void SchedulingAgent::enableControllerService(std::shared_ptrenable(); }; // create a functor that will be submitted to the thread pool. - utils::Worker functor(f_ex); + utils::Worker functor(f_ex, serviceNode->getUUIDStr()); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. std::future future; @@ -59,7 +59,7 @@ void SchedulingAgent::disableControllerService(std::shared_ptrdisable(); }; // create a functor that will be submitted to the thread pool. - utils::Worker functor(f_ex); + utils::Worker functor(f_ex, serviceNode->getUUIDStr()); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. std::future future; diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 7b4ce85687..d6b8fae43b 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -61,8 +61,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr processo return; } - std::map>::iterator it = _threads.find(processor->getUUIDStr()); - if (it != _threads.end()) { + if (thread_pool_.isRunning(processor->getUUIDStr())) { logger_->log_info("Can not schedule threads for processor %s because there are existing threads running"); return; } @@ -74,20 +73,33 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr processo processor->onSchedule(processContext.get(), sessionFactory.get()); std::vector threads; + + ThreadedSchedulingAgent *agent = this; for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) { - ThreadedSchedulingAgent *agent = this; - std::thread *thread = new std::thread([agent, processor, processContext, sessionFactory] () { - agent->run(processor, processContext.get(), sessionFactory.get()); - }); - thread->detach(); - threads.push_back(thread); - logger_->log_info("Scheduled thread %d running for process %s", thread->get_id(), processor->getName().c_str()); + + // reference the disable function from serviceNode + std::function f_ex = [agent, processor, processContext, sessionFactory] () { + return agent->run(processor, processContext.get(), sessionFactory.get()); + }; + // create a functor that will be submitted to the thread pool. + std::unique_ptr monitor = std::unique_ptr(new TimerAwareMonitor(&running_)); + utils::Worker functor(f_ex, processor->getUUIDStr(), std::move(monitor)); + // move the functor into the thread pool. While a future is returned + // we aren't terribly concerned with the result. + std::future future; + thread_pool_.execute(std::move(functor), future); + } - _threads[processor->getUUIDStr().c_str()] = threads; + logger_->log_info("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName().c_str()); return; } +void ThreadedSchedulingAgent::stop() { + SchedulingAgent::stop(); + thread_pool_.shutdown(); +} + void ThreadedSchedulingAgent::unschedule(std::shared_ptr processor) { std::lock_guard < std::mutex > lock(mutex_); logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str()); @@ -97,18 +109,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr proces return; } - std::map>::iterator it = _threads.find(processor->getUUIDStr()); + thread_pool_.stopTasks(processor->getUUIDStr()); - if (it == _threads.end()) { - logger_->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str()); - return; - } - for (std::vector::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) { - std::thread *thread = *itThread; - logger_->log_info("Scheduled thread %d deleted for process %s", thread->get_id(), processor->getName().c_str()); - delete thread; - } - _threads.erase(processor->getUUIDStr()); processor->clearActiveTask(); } diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index b9a41eaa5b..32764706db 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -29,19 +29,22 @@ namespace apache { namespace nifi { namespace minifi { -void TimerDrivenSchedulingAgent::run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { +uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { while (this->running_) { bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { // Honor the yield - std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); + return processor->getYieldTime(); } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure - std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_)); + //std::this_thread::sleep_for(std::chrono::milliseconds(x)); + return this->bored_yield_duration_; } - std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); + return processor->getSchedulingPeriodNano() / 1000000; + //std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); } - return; + return 0; + //return; } } /* namespace minifi */ diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index a791b3f0b5..0576d5fdb0 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -200,7 +200,7 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") { std::vector> futures; for (int i = 0; i < 20; i++) { std::function f_ex = createSocket; - utils::Worker functor(f_ex); + utils::Worker functor(f_ex, "id"); std::future fut; REQUIRE(true == pool.execute(std::move(functor), fut)); futures.push_back(std::move(fut)); diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index 0bba76794a..670958ae5c 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -29,7 +29,7 @@ bool function() { TEST_CASE("ThreadPoolTest1", "[TPT1]") { utils::ThreadPool pool(5); std::function f_ex = function; - utils::Worker functor(f_ex); + utils::Worker functor(f_ex, "id"); pool.start(); std::future fut; REQUIRE(true == pool.execute(std::move(functor), fut)); From 97c8a7f11e4e5cd4fd86439dd1b8744382fd7e2c Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Wed, 5 Jul 2017 12:13:45 -0400 Subject: [PATCH 2/8] MINIFI-338: Address linter errors --- libminifi/src/FlowController.cpp | 1 - libminifi/src/ThreadedSchedulingAgent.cpp | 4 +--- libminifi/src/TimerDrivenSchedulingAgent.cpp | 3 --- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 32fd29801e..b0fbffa116 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -192,7 +192,6 @@ void FlowController::stop(bool force) { this->timer_scheduler_->stop(); this->event_scheduler_->stop(); running_ = false; - } } diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index d6b8fae43b..82d4dfdfc5 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -76,7 +77,6 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr processo ThreadedSchedulingAgent *agent = this; for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) { - // reference the disable function from serviceNode std::function f_ex = [agent, processor, processContext, sessionFactory] () { return agent->run(processor, processContext.get(), sessionFactory.get()); @@ -88,10 +88,8 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr processo // we aren't terribly concerned with the result. std::future future; thread_pool_.execute(std::move(functor), future); - } logger_->log_info("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName().c_str()); - return; } diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 32764706db..c3aaa697b8 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -37,14 +37,11 @@ uint64_t TimerDrivenSchedulingAgent::run(std::shared_ptr proces return processor->getYieldTime(); } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure - //std::this_thread::sleep_for(std::chrono::milliseconds(x)); return this->bored_yield_duration_; } return processor->getSchedulingPeriodNano() / 1000000; - //std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); } return 0; - //return; } } /* namespace minifi */ From 9d500354a3a4c5538ee425162482cb5e8af1bf00 Mon Sep 17 00:00:00 2001 From: Marc Date: Thu, 20 Jul 2017 19:28:26 -0400 Subject: [PATCH 3/8] MINIFI-338: Improve wait decay per pull request comments --- libminifi/include/utils/ThreadPool.h | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 8ff39751fb..5335c81a2d 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -129,6 +129,10 @@ class Worker { virtual uint64_t getTimeSlice() { return time_slice_; } + + virtual uint64_t getWaitTime(){ + return run_determinant_->wait_time(); + } Worker(const Worker&) = delete; Worker& operator =(const Worker&) = delete; @@ -352,11 +356,19 @@ void ThreadPool::run_tasks() { uint64_t wait_decay_ = 0; while (running_.load()) { + // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning + // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state + // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from + // there. This ensures we don't have arbitrarily long sleep cycles. + if (wait_decay_ > 500000000L){ + wait_decay_ = 100000000L; + } // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should // be more likely to run. This is intentional. - if (wait_decay_ > 1000) { + + if (wait_decay_ > 2000) { std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); } Worker task; @@ -376,9 +388,12 @@ void ThreadPool::run_tasks() { bool wait_to_run = false; if (task.getTimeSlice() > 1) { + double wt = (double)task.getWaitTime(); auto now = std::chrono::system_clock::now().time_since_epoch(); - auto ms = std::chrono::duration_cast(now); - if (task.getTimeSlice() > ms.count()) { + auto ms = std::chrono::duration_cast(now).count(); + // if our differential is < 10% of the wait time we will not put the task into a wait state + // since requeuing will break the time slice contract. + if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt*.10)) { wait_to_run = true; } } @@ -392,7 +407,7 @@ void ThreadPool::run_tasks() { } worker_queue_.enqueue(std::move(task)); - wait_decay_ += 100; + wait_decay_ += 25; continue; } From e98ff6844b059763d587b0151c016c466a330461 Mon Sep 17 00:00:00 2001 From: "Andrew I. Christianson" Date: Tue, 8 Aug 2017 13:16:19 -0400 Subject: [PATCH 4/8] MINIFI-367 port tests to use boost::filesystem vs. stat.h for better portability This closes #124. Signed-off-by: Marc Parisi --- libminifi/CMakeLists.txt | 3 ++- libminifi/test/unit/PutFileTests.cpp | 22 ++++++---------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index a78fd8fca4..5e63a30517 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -80,9 +80,10 @@ target_link_libraries (minifi ${ZLIB_LIBRARIES}) if (NOT IOS) # Include Boost System -find_package(Boost COMPONENTS system REQUIRED) +find_package(Boost COMPONENTS system filesystem REQUIRED) find_package(CURL) target_link_libraries(minifi ${Boost_SYSTEM_LIBRARY}) +target_link_libraries(minifi ${Boost_FILESYSTEM_LIBRARY}) if (CURL_FOUND) include_directories(${CURL_INCLUDE_DIRS}) diff --git a/libminifi/test/unit/PutFileTests.cpp b/libminifi/test/unit/PutFileTests.cpp index c18c72c5cd..024b6fa689 100644 --- a/libminifi/test/unit/PutFileTests.cpp +++ b/libminifi/test/unit/PutFileTests.cpp @@ -25,6 +25,8 @@ #include #include +#include + #include "../TestBase.h" #include "processors/ListenHTTP.h" #include "processors/LogAttribute.h" @@ -45,18 +47,6 @@ TEST_CASE("Test Creation of PutFile", "[getfileCreate]") { REQUIRE(processor->getName() == "processorname"); } -uint64_t getModificationTime(std::string filename) { - struct stat result; - if (stat(filename.c_str(), &result) == 0) { -#if !defined(_POSIX_C_SOURCE) || defined(_DARWIN_C_SOURCE) - return result.st_mtimespec.tv_sec; -#else - return result.st_mtime; -#endif - } - return 0; -} - TEST_CASE("PutFileTest", "[getfileputpfile]") { TestController testController; @@ -231,7 +221,7 @@ TEST_CASE("PutFileTestFileExistsIgnore", "[getfileputpfile]") { file.open(movedFile.str(), std::ios::out); file << "tempFile"; file.close(); - uint64_t filemodtime = getModificationTime(movedFile.str()); + auto filemodtime = boost::filesystem::last_write_time(movedFile.str()); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); plan->reset(); @@ -252,7 +242,7 @@ TEST_CASE("PutFileTestFileExistsIgnore", "[getfileputpfile]") { // verify that the fle was moved REQUIRE(false == std::ifstream(ss.str()).good()); REQUIRE(true == std::ifstream(movedFile.str()).good()); - REQUIRE(filemodtime == getModificationTime(movedFile.str())); + REQUIRE(filemodtime == boost::filesystem::last_write_time(movedFile.str())); LogTestController::getInstance().reset(); } @@ -299,7 +289,7 @@ TEST_CASE("PutFileTestFileExistsReplace", "[getfileputpfile]") { file.open(movedFile.str(), std::ios::out); file << "tempFile"; file.close(); - uint64_t filemodtime = getModificationTime(movedFile.str()); + auto filemodtime = boost::filesystem::last_write_time(movedFile.str()); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); plan->reset(); @@ -320,7 +310,7 @@ TEST_CASE("PutFileTestFileExistsReplace", "[getfileputpfile]") { // verify that the fle was moved REQUIRE(false == std::ifstream(ss.str()).good()); REQUIRE(true == std::ifstream(movedFile.str()).good()); - REQUIRE(filemodtime != getModificationTime(movedFile.str())); + REQUIRE(filemodtime != boost::filesystem::last_write_time(movedFile.str())); LogTestController::getInstance().reset(); } From 058bb84ba18be6e3da26573849d05bde8ce8fdb0 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Tue, 15 Aug 2017 15:05:41 -0400 Subject: [PATCH 5/8] MINIFI-375: Remove forward slash from urls This closes #127. Signed-off-by: Aldrin Piri --- libminifi/src/RemoteProcessorGroupPort.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 3c88e8f870..05d5f7af73 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -212,7 +212,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { if (this->host_.empty() || this->port_ == -1 || this->protocol_.empty()) return; - std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller/"; + std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/controller"; this->site2site_port_ = -1; configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_); @@ -221,7 +221,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { std::string token; if (!rest_user_name_.empty()) { - std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token/"; + std::string loginUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/access/token"; token = utils::get_token(loginUrl, this->rest_user_name_, this->rest_password_, this->securityConfig_); logger_->log_debug("Token from NiFi REST Api endpoint %s", token); if (token.empty()) @@ -285,7 +285,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_); } } else { - logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo"); + logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", http_code, fullUrl); } } else { logger_->log_error( From 951c03e3db705c0472f052dd3d883dca01913c80 Mon Sep 17 00:00:00 2001 From: "Andrew I. Christianson" Date: Wed, 16 Aug 2017 11:56:34 -0400 Subject: [PATCH 6/8] MINIFI-376 removed defunct references to curlbuild.h This closes #128. Signed-off-by: Aldrin Piri --- libminifi/include/utils/HTTPUtils.h | 1 - libminifi/src/HttpConfigurationListener.cpp | 1 - libminifi/src/RemoteProcessorGroupPort.cpp | 1 - libminifi/src/processors/InvokeHTTP.cpp | 1 - 4 files changed, 4 deletions(-) diff --git a/libminifi/include/utils/HTTPUtils.h b/libminifi/include/utils/HTTPUtils.h index 46aa67ad07..e47bc118ba 100644 --- a/libminifi/include/utils/HTTPUtils.h +++ b/libminifi/include/utils/HTTPUtils.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include "ByteInputCallBack.h" diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp index c16ca75eca..6b3a061482 100644 --- a/libminifi/src/HttpConfigurationListener.cpp +++ b/libminifi/src/HttpConfigurationListener.cpp @@ -18,7 +18,6 @@ #include "HttpConfigurationListener.h" #include "FlowController.h" -#include #include #include #include diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 05d5f7af73..bcc3d4957b 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -21,7 +21,6 @@ #include "RemoteProcessorGroupPort.h" #include -#include #include #include #include diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp index 7dc75d2463..81271a50ae 100644 --- a/libminifi/src/processors/InvokeHTTP.cpp +++ b/libminifi/src/processors/InvokeHTTP.cpp @@ -18,7 +18,6 @@ #include "processors/InvokeHTTP.h" #include -#include #include #include #include From 893e87d145031b48b98b353d6bf9137a0cc19d63 Mon Sep 17 00:00:00 2001 From: "Andrew I. Christianson" Date: Wed, 9 Aug 2017 11:53:04 -0400 Subject: [PATCH 7/8] MINIFI-368 exclude hidden files when scanning for src files This closes #125. Signed-off-by: Aldrin Piri --- cmake/BuildTests.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index 2502537141..29603bfab9 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -21,7 +21,7 @@ MACRO(GETSOURCEFILES result curdir) FILE(GLOB children RELATIVE ${curdir} ${curdir}/*) SET(dirlist "") FOREACH(child ${children}) - IF( "${curdir}/${child}" MATCHES .*\\.cpp) + IF( "${child}" MATCHES ^[^.].*\\.cpp) LIST(APPEND dirlist ${child}) ENDIF() From 264cefe1f129c278e03aab8e5eb7d79164e47340 Mon Sep 17 00:00:00 2001 From: Bin Qiu Date: Thu, 24 Aug 2017 10:04:32 -0700 Subject: [PATCH 8/8] Merge Content processor --- README.md | 1 + libminifi/include/core/FlowConfiguration.h | 1 + libminifi/include/core/ProcessSession.h | 2 + libminifi/include/processors/BinFiles.h | 296 +++++++ libminifi/include/processors/LoadProcessors.h | 1 + libminifi/include/processors/MergeContent.h | 206 +++++ libminifi/src/core/ProcessSession.cpp | 4 + libminifi/src/processors/BinFiles.cpp | 303 ++++++++ libminifi/src/processors/MergeContent.cpp | 283 +++++++ libminifi/test/unit/MergeFileTests.cpp | 720 ++++++++++++++++++ 10 files changed, 1817 insertions(+) create mode 100644 libminifi/include/processors/BinFiles.h create mode 100644 libminifi/include/processors/MergeContent.h create mode 100644 libminifi/src/processors/BinFiles.cpp create mode 100644 libminifi/src/processors/MergeContent.cpp create mode 100644 libminifi/test/unit/MergeFileTests.cpp diff --git a/README.md b/README.md index 8830aaa862..db10ab492b 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a * ListenSyslog * PutFile * TailFile + * MergeContent * Provenance events generation is supported and are persisted using levelDB. ## System Requirements diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 43d2bc04a3..d9ebc72189 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -35,6 +35,7 @@ #include "processors/LogAttribute.h" #include "processors/ExecuteProcess.h" #include "processors/AppendHostInfo.h" +#include "processors/MergeContent.h" #include "core/Processor.h" #include "core/logging/LoggerConfiguration.h" #include "core/ProcessContext.h" diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index d853e9bf31..3a3b1431e9 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -79,6 +79,8 @@ class ProcessSession { std::shared_ptr create(std::shared_ptr &parent) { return create(parent); } + // Add a FlowFile to the session + void add(std::shared_ptr &flow); // Clone a new UUID FlowFile from parent both for content resource claim and attributes std::shared_ptr clone(std::shared_ptr &parent); // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim diff --git a/libminifi/include/processors/BinFiles.h b/libminifi/include/processors/BinFiles.h new file mode 100644 index 0000000000..6c619a8189 --- /dev/null +++ b/libminifi/include/processors/BinFiles.h @@ -0,0 +1,296 @@ +/** + * @file BinFiles.h + * BinFiles class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __BIN_FILES_H__ +#define __BIN_FILES_H__ + +#include +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// Bin Class +class Bin { + public: + // Constructor + /*! + * Create a new Bin. Note: this object is not thread safe + */ + explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const int &minEntries, const int & maxEntries, + const std::string &fileCount, const std::string &groupId) + : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(fileCount), + groupId_(groupId), logger_(logging::LoggerFactory::getLogger()) { + queued_data_size_ = 0; + creation_dated_ = getTimeMillis(); + std::shared_ptr id_generator = utils::IdGenerator::getIdGenerator(); + char uuidStr[37] = { 0 }; + id_generator->generate(uuid_); + uuid_unparse_lower(uuid_, uuidStr); + uuid_str_ = uuidStr; + logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_); + } + virtual ~Bin() { + logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_); + } + // check whether the bin is full + bool isFull() { + if (queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_) + return true; + else + return false; + } + // check whether the bin meet the min required size and entries so that it can be processed for merge + bool isReadyForMerge() { + return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); + } + // check whether the bin is older than the time specified in msec + bool isOlderThan(const uint64_t &duration) { + uint64_t currentTime = getTimeMillis(); + if (currentTime > (creation_dated_ + duration)) + return true; + else + return false; + } + std::deque> & getFlowFile() { + return queue_; + } + // offer the flowfile to the bin + bool offer(std::shared_ptr flow) { + if (!fileCount_.empty()) { + std::string value; + if (flow->getAttribute(fileCount_, value)) { + try { + // for defrag case using the identification + int count = std::stoi(value); + maxEntries_ = count; + minEntries_ = count; + } catch (...) { + + } + } + } + + if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) + return false; + + queue_.push_back(flow); + queued_data_size_ += flow->getSize(); + logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", + uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); + + return true; + } + // getBinAge + uint64_t getBinAge() { + return creation_dated_; + } + int getSize() { + return queue_.size(); + } + // Get the UUID as string + std::string getUUIDStr() { + return uuid_str_; + } + std::string getGroupId() { + return groupId_; + } + + protected: + + private: + uint64_t minSize_; + uint64_t maxSize_; + int maxEntries_; + int minEntries_; + // Queued data size + uint64_t queued_data_size_; + // Queue for the Flow File + std::deque> queue_; + uint64_t creation_dated_; + std::string fileCount_; + std::string groupId_; + std::shared_ptr logger_; + // A global unique identifier + uuid_t uuid_; + // UUID string + std::string uuid_str_; +}; + +// BinManager Class +class BinManager { + public: + // Constructor + /*! + * Create a new BinManager + */ + BinManager() + : minSize_(0), maxSize_(ULLONG_MAX), maxEntries_(INT_MAX), minEntries_(1), binAge_(ULLONG_MAX), binCount_(0), + logger_(logging::LoggerFactory::getLogger()) { + } + virtual ~BinManager() { + purge(); + } + void setMinSize(const uint64_t &size) { + minSize_ = size; + } + void setMaxSize(const uint64_t &size) { + maxSize_ = size; + } + void setMaxEntries(const int &entries) { + maxEntries_ = entries; + } + void setMinEntries(const int &entries) { + minEntries_ = entries; + } + void setBinAge(const uint64_t &age) { + binAge_ = age; + } + int getBinCount() { + return binCount_; + } + void setFileCount(const std::string &value) { + fileCount_ = value; + } + void purge() { + std::lock_guard < std::mutex > lock(mutex_); + groupBinMap_.clear(); + binCount_ = 0; + } + // Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary. + bool offer(const std::string &group, std::shared_ptr flow); + // gather ready bins once the bin are full enough or exceed bin age + void gatherReadyBins(); + // remove oldest bin + void removeOldestBin(); + // get ready bin from binManager + void getReadyBin(std::deque> &retBins); + + protected: + + private: + std::mutex mutex_; + uint64_t minSize_; + uint64_t maxSize_; + int maxEntries_; + int minEntries_; + std::string fileCount_; + // Bin Age in msec + uint64_t binAge_; + std::map>>> groupBinMap_; + std::deque> readyBin_; + int binCount_; + std::shared_ptr logger_; +}; + +// BinFiles Class +class BinFiles : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit BinFiles(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { + maxBinCount_ = 100; + } + // Destructor + virtual ~BinFiles() { + } + // Processor Name + static constexpr char const* ProcessorName = "BinFiles"; + // Supported Properties + static core::Property MinSize; + static core::Property MaxSize; + static core::Property MinEntries; + static core::Property MaxEntries; + static core::Property MaxBinCount; + static core::Property MaxBinAge; + + // Supported Relationships + static core::Relationship Failure; + static core::Relationship Original; + + // attributes + static const char *FRAGMENT_ID_ATTRIBUTE; + static const char *FRAGMENT_INDEX_ATTRIBUTE; + static const char *FRAGMENT_COUNT_ATTRIBUTE; + + static const char *SEGMENT_ID_ATTRIBUTE; + static const char *SEGMENT_INDEX_ATTRIBUTE; + static const char *SEGMENT_COUNT_ATTRIBUTE; + static const char *SEGMENT_ORIGINAL_FILENAME; + + public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi BinFiles + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + // Initialize, over write by NiFi BinFiles + virtual void initialize(void); + + protected: + // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId(). + virtual void preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr flow); + // Returns a group ID representing a bin. This allows flow files to be binned into like groups + virtual std::string getGroupId(core::ProcessContext *context, std::shared_ptr flow) { + return ""; + } + // Processes a single bin. + virtual bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin) { + return false; + } + // transfer flows to failure in bin + void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin); + // add flows to session + void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin); + + BinManager binManager_; + + private: + std::shared_ptr logger_; + int maxBinCount_; +}; + +REGISTER_RESOURCE(BinFiles); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif diff --git a/libminifi/include/processors/LoadProcessors.h b/libminifi/include/processors/LoadProcessors.h index 3e6cfcfe99..e8d207a174 100644 --- a/libminifi/include/processors/LoadProcessors.h +++ b/libminifi/include/processors/LoadProcessors.h @@ -29,5 +29,6 @@ #include "LogAttribute.h" #include "PutFile.h" #include "TailFile.h" +#include "MergeContent.h" #endif /* LIBMINIFI_INCLUDE_PROCESSORS_LOADPROCESSORS_H_ */ diff --git a/libminifi/include/processors/MergeContent.h b/libminifi/include/processors/MergeContent.h new file mode 100644 index 0000000000..3f4c74a5f1 --- /dev/null +++ b/libminifi/include/processors/MergeContent.h @@ -0,0 +1,206 @@ +/** + * @file MergeContent.h + * MergeContent class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __MERGE_CONTENT_H__ +#define __MERGE_CONTENT_H__ + +#include "processors/BinFiles.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MERGE_STRATEGY_BIN_PACK "Bin-Packing Algorithm" +#define MERGE_STRATEGY_DEFRAGMENT "Defragment" +#define MERGE_FORMAT_TAR_VALUE "TAR" +#define MERGE_FORMAT_ZIP_VALUE "ZIP" +#define MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE "FlowFile Stream, v3" +#define MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE "FlowFile Stream, v2" +#define MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE "FlowFile Tar, v1" +#define MERGE_FORMAT_CONCAT_VALUE "Binary Concatenation" +#define MERGE_FORMAT_AVRO_VALUE "Avro" +#define DELIMITER_STRATEGY_FILENAME "Filename" +#define DELIMITER_STRATEGY_TEXT "Text" + +// MergeBin Class +class MergeBin { +public: + virtual std::string getMergedContentType() = 0; + // merge the flows in the bin + virtual std::shared_ptr merge(core::ProcessContext *context, core::ProcessSession *session, + std::deque> &flows, std::string &header, std::string &footer, std::string &demarcator) = 0; +}; + +// BinaryConcatenationMerge Class +class BinaryConcatenationMerge : public MergeBin { +public: + static const char *mimeType; + std::string getMergedContentType() { + return mimeType; + } + std::shared_ptr merge(core::ProcessContext *context, core::ProcessSession *session, + std::deque> &flows, std::string &header, std::string &footer, std::string &demarcator); + // Nest Callback Class for read stream + class ReadCallback : public InputStreamCallback { + public: + ReadCallback(uint64_t size, std::shared_ptr stream) + : buffer_size_(size), stream_(stream) { + } + ~ReadCallback() { + } + int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) + read_size = stream->getSize(); + else + read_size = buffer_size_; + ret = stream_->write(buffer, read_size); + return ret; + } + uint64_t buffer_size_; + std::shared_ptr stream_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: + WriteCallback(std::string &header, std::string &footer, std::string &demarcator, std::deque> &flows, core::ProcessSession *session) : + header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), session_(session) { + } + std::string &header_; + std::string &footer_; + std::string &demarcator_; + std::deque> &flows_; + core::ProcessSession *session_; + int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + if (!header_.empty()) { + int64_t len = stream->write(reinterpret_cast(const_cast(header_.data())), header_.size()); + if (len < 0) + return len; + ret += len; + } + bool isFirst = true; + for (auto flow : flows_) { + if (!isFirst && !demarcator_.empty()) { + int64_t len = stream->write(reinterpret_cast(const_cast(demarcator_.data())), demarcator_.size()); + if (len < 0) + return len; + ret += len; + } + ReadCallback readCb(flow->getSize(), stream); + session_->read(flow, &readCb); + ret += flow->getSize(); + isFirst = false; + } + if (!footer_.empty()) { + int64_t len = stream->write(reinterpret_cast(const_cast(footer_.data())), footer_.size()); + if (len < 0) + return len; + ret += len; + } + return ret; + } + }; +}; + + +// MergeContent Class +class MergeContent : public processors::BinFiles { + public: + // Constructor + /*! + * Create a new processor + */ + explicit MergeContent(std::string name, uuid_t uuid = NULL) + : processors::BinFiles(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { + mergeStratgey_ = MERGE_STRATEGY_DEFRAGMENT; + mergeFormat_ = MERGE_FORMAT_CONCAT_VALUE; + delimiterStratgey_ = DELIMITER_STRATEGY_FILENAME; + keepPath_ = false; + } + // Destructor + virtual ~MergeContent() { + } + // Processor Name + static constexpr char const* ProcessorName = "MergeContent"; + // Supported Properties + static core::Property MergeStrategy; + static core::Property MergeFormat; + static core::Property CorrelationAttributeName; + static core::Property DelimiterStratgey; + static core::Property KeepPath; + static core::Property Header; + static core::Property Footer; + static core::Property Demarcator; + + // Supported Relationships + static core::Relationship Merge; + + public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi MergeContent + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + // Initialize, over write by NiFi MergeContent + virtual void initialize(void); + virtual bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin); + + protected: + // Returns a group ID representing a bin. This allows flow files to be binned into like groups + virtual std::string getGroupId(core::ProcessContext *context, std::shared_ptr flow); + // check whether the defragment bin is validate + bool checkDefragment(std::unique_ptr &bin); + + private: + std::shared_ptr logger_; + std::string mergeStratgey_; + std::string mergeFormat_; + std::string correlationAttributeName_; + bool keepPath_; + std::string delimiterStratgey_; + std::string header_; + std::string footer_; + std::string demarcator_; + std::string headerContent_; + std::string footerContent_; + std::string demarcatorContent_; + // readContent + std::string readContent(std::string path); +}; + +REGISTER_RESOURCE(MergeContent); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index c69b361957..b3035cb5b9 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -50,6 +50,10 @@ std::shared_ptr ProcessSession::create() { return record; } +void ProcessSession::add(std::shared_ptr &record) { + _addedFlowFiles[record->getUUIDStr()] = record; +} + std::shared_ptr ProcessSession::create(std::shared_ptr &&parent) { std::map empty; std::shared_ptr record = std::make_shared(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty); diff --git a/libminifi/src/processors/BinFiles.cpp b/libminifi/src/processors/BinFiles.cpp new file mode 100644 index 0000000000..bd4afca489 --- /dev/null +++ b/libminifi/src/processors/BinFiles.cpp @@ -0,0 +1,303 @@ +/** + * @file BinFiles.cpp + * BinFiles class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "processors/BinFiles.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property BinFiles::MinSize("Minimum Group Size", "The minimum size of for the bundle", "0"); +core::Property BinFiles::MaxSize("Maximum Group Size", "The maximum size for the bundle. If not specified, there is no maximum.", ""); +core::Property BinFiles::MinEntries("Minimum Number of Entries", "The minimum number of files to include in a bundle", "1"); +core::Property BinFiles::MaxEntries("Maximum Number of Entries", "The maximum number of files to include in a bundle. If not specified, there is no maximum.", ""); +core::Property BinFiles::MaxBinAge("Max Bin Age", "The maximum age of a Bin that will trigger a Bin to be complete. Expected format is