diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index a21cd1fe58..99ebd62c8e 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -91,7 +91,7 @@ class SchedulingAgent { nonstd::expected triggerAndCommit(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& session_factory); - nonstd::expected trigger(core::Processor* processor, + nonstd::expected trigger(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& process_session); diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index f27253d42e..61b338b958 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -38,38 +38,48 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* processor) { utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& session_factory) { - if (this->running_) { - const auto start_time = std::chrono::steady_clock::now(); - // trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice + if (!this->running_) { + return utils::TaskRescheduleInfo::Done(); + } + if (processorYields(processor)) { + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); + } + + const auto start_time = std::chrono::steady_clock::now(); + // trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice - const auto process_session = session_factory->createSession(); - process_session->setMetrics(processor->getMetrics()); + const auto process_session = session_factory->createSession(); + process_session->setMetrics(processor->getMetrics()); - try { - while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { - this->trigger(processor, process_context, process_session); - if (processor->isYield()) { - process_session->commit(); - return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); - } + + try { + const auto run_commit = gsl::finally([&]() { + process_session->commit(); + }); + while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { + const auto trigger_result = this->trigger(processor, process_context, process_session); + if (!trigger_result || !*trigger_result) { + break; } - process_session->commit(); - } catch (const std::exception& exception) { - logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})", - exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName()); - processor->yield(admin_yield_duration_); - process_session->rollback(); - throw; - } catch (...) { - logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName()); - processor->yield(admin_yield_duration_); - process_session->rollback(); - throw; } + } catch (const std::exception& exception) { + logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})", + exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName()); + processor->yield(admin_yield_duration_); + process_session->rollback(); + throw; + } catch (...) { + logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName()); + processor->yield(admin_yield_duration_); + process_session->rollback(); + throw; + } - return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available + if (processor->isYield()) { + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); } - return utils::TaskRescheduleInfo::Done(); + + return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available } } // namespace org::apache::nifi::minifi diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index c4a3ff8a1e..f42ca057f6 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -99,12 +99,12 @@ nonstd::expected SchedulingAgent::triggerAndCommit(cor return {}; } -nonstd::expected SchedulingAgent::trigger(core::Processor* processor, +nonstd::expected SchedulingAgent::trigger(core::Processor* processor, const std::shared_ptr& process_context, const std::shared_ptr& process_session) { gsl_Expects(processor); if (processorYields(processor)) { - return {}; + return false; } auto schedule_it = scheduled_processors_.end(); @@ -134,7 +134,7 @@ nonstd::expected SchedulingAgent::trigger(core::Proces processor->yield(admin_yield_duration_); return nonstd::make_unexpected(std::current_exception()); } - return {}; + return true; } void SchedulingAgent::watchDogFunc() { diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 4dfc122db8..757055d2a3 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -78,7 +78,8 @@ ProcessSession::ProcessSession(std::shared_ptr processContext) ProcessSession::~ProcessSession() { if (stateManager_ && stateManager_->isTransactionInProgress()) { - logger_->log_error("Session has ended without decision on state (commit or rollback)."); + logger_->log_critical("Session has ended without decision on state (commit or rollback)."); + std::terminate(); } removeReferences(); }