Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Jun 12, 2024
1 parent df7003f commit 1b23b67
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 31 deletions.
2 changes: 1 addition & 1 deletion libminifi/include/SchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SchedulingAgent {
nonstd::expected<void, std::exception_ptr> triggerAndCommit(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSessionFactory>& session_factory);
nonstd::expected<void, std::exception_ptr> trigger(core::Processor* processor,
nonstd::expected<bool, std::exception_ptr> trigger(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSession>& process_session);

Expand Down
62 changes: 36 additions & 26 deletions libminifi/src/EventDrivenSchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,48 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* processor) {
utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
if (this->running_) {
const auto start_time = std::chrono::steady_clock::now();
// trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice
if (!this->running_) {
return utils::TaskRescheduleInfo::Done();
}
if (processorYields(processor)) {
return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
}

const auto start_time = std::chrono::steady_clock::now();
// trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice

const auto process_session = session_factory->createSession();
process_session->setMetrics(processor->getMetrics());
const auto process_session = session_factory->createSession();
process_session->setMetrics(processor->getMetrics());

try {
while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
this->trigger(processor, process_context, process_session);
if (processor->isYield()) {
process_session->commit();
return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
}

try {
const auto run_commit = gsl::finally([&]() {
process_session->commit();
});
while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
const auto trigger_result = this->trigger(processor, process_context, process_session);
if (!trigger_result || !*trigger_result) {
break;
}
process_session->commit();
} catch (const std::exception& exception) {
logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})",
exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName());
processor->yield(admin_yield_duration_);
process_session->rollback();
throw;
} catch (...) {
logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName());
processor->yield(admin_yield_duration_);
process_session->rollback();
throw;
}
} catch (const std::exception& exception) {
logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})",
exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName());
processor->yield(admin_yield_duration_);
process_session->rollback();
throw;
} catch (...) {
logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName());
processor->yield(admin_yield_duration_);
process_session->rollback();
throw;
}

return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
if (processor->isYield()) {
return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
}
return utils::TaskRescheduleInfo::Done();

return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
}

} // namespace org::apache::nifi::minifi
6 changes: 3 additions & 3 deletions libminifi/src/SchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::triggerAndCommit(cor
return {};
}

nonstd::expected<void, std::exception_ptr> SchedulingAgent::trigger(core::Processor* processor,
nonstd::expected<bool, std::exception_ptr> SchedulingAgent::trigger(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSession>& process_session) {
gsl_Expects(processor);
if (processorYields(processor)) {
return {};
return false;
}

auto schedule_it = scheduled_processors_.end();
Expand Down Expand Up @@ -134,7 +134,7 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::trigger(core::Proces
processor->yield(admin_yield_duration_);
return nonstd::make_unexpected(std::current_exception());
}
return {};
return true;
}

void SchedulingAgent::watchDogFunc() {
Expand Down
3 changes: 2 additions & 1 deletion libminifi/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ ProcessSession::ProcessSession(std::shared_ptr<ProcessContext> processContext)

ProcessSession::~ProcessSession() {
if (stateManager_ && stateManager_->isTransactionInProgress()) {
logger_->log_error("Session has ended without decision on state (commit or rollback).");
logger_->log_critical("Session has ended without decision on state (commit or rollback).");
std::terminate();
}
removeReferences();
}
Expand Down

0 comments on commit 1b23b67

Please sign in to comment.