Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2398 Only commit once per time_slice in EventDrivenSchedulingAgent #1815

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docker/test/integration/features/kafka.feature
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And the "success" relationship of the GetFile processor is connected to the UpdateAttribute
And the "success" relationship of the UpdateAttribute processor is connected to the PublishKafka
And the "success" relationship of the PublishKafka processor is connected to the PutFile
And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka

And a kafka broker is set up in correspondence with the PublishKafka

Expand Down Expand Up @@ -93,6 +94,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile processor is connected to the PublishKafka
And the "success" relationship of the PublishKafka processor is connected to the PutFile
And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka

And a kafka broker is set up in correspondence with the PublishKafka

Expand Down Expand Up @@ -120,6 +122,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile processor is connected to the PublishKafka
And the "success" relationship of the PublishKafka processor is connected to the PutFile
And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka

And a kafka broker is set up in correspondence with the PublishKafka

Expand Down Expand Up @@ -150,6 +153,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile processor is connected to the PublishKafka
And the "success" relationship of the PublishKafka processor is connected to the PutFile
And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka

And a kafka broker is set up in correspondence with the PublishKafka

Expand Down Expand Up @@ -178,6 +182,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And an ssl context service is set up for PublishKafka
And the "success" relationship of the GetFile processor is connected to the PublishKafka
And the "success" relationship of the PublishKafka processor is connected to the PutFile
And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka

And a kafka broker is set up in correspondence with the PublishKafka

Expand All @@ -203,6 +208,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And an ssl context service is set up for PublishKafka
And the "success" relationship of the GetFile processor is connected to the PublishKafka
And the "success" relationship of the PublishKafka processor is connected to the PutFile
And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka

And a kafka broker is set up in correspondence with the PublishKafka

Expand Down
3 changes: 1 addition & 2 deletions extensions/rocksdb-repos/tests/SwapTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ namespace org::apache::nifi::minifi::test {
class OutputProcessor : public core::Processor {
public:
using core::Processor::Processor;
using core::Processor::onTrigger;

static constexpr const char* Description = "Processor used for testing cycles";
static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
Expand Down Expand Up @@ -112,7 +111,7 @@ TEST_CASE("Connection will on-demand swap flow files") {
auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);

for (size_t i = 0; i < 200; ++i) {
processor->onTrigger(context, session_factory);
processor->triggerAndCommit(context, session_factory);
}

REQUIRE(connection->getQueueSize() == processor->flow_files_.size());
Expand Down
6 changes: 3 additions & 3 deletions libminifi/include/EventDrivenSchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {

void schedule(core::Processor* processor) override;

utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &process_context,
const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;

private:
std::chrono::milliseconds time_slice_;
std::chrono::milliseconds time_slice_{};
};

} // namespace org::apache::nifi::minifi
7 changes: 6 additions & 1 deletion libminifi/include/SchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,14 @@ class SchedulingAgent {
logger_->log_trace("Destroying scheduling agent");
}

nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* processor,
bool processorYields(core::Processor* processor) const;

nonstd::expected<void, std::exception_ptr> triggerAndCommit(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSessionFactory>& session_factory);
nonstd::expected<bool, std::exception_ptr> trigger(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSession>& process_session);

void start() {
running_ = true;
Expand Down
3 changes: 2 additions & 1 deletion libminifi/include/ThreadedSchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class ThreadedSchedulingAgent : public SchedulingAgent {

void stop() override;

private:
protected:
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger();

private:
std::set<utils::Identifier> processors_running_; // Set just for easy usage
};

Expand Down
11 changes: 8 additions & 3 deletions libminifi/include/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ class Processor : public Connectable, public ConfigurableComponent, public state
}

void incrementActiveTasks() {
active_tasks_++;
++active_tasks_;
}

void decrementActiveTask() {
if (active_tasks_ > 0)
active_tasks_--;
--active_tasks_;
}

void clearActiveTask() {
Expand Down Expand Up @@ -187,7 +187,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state
void initialize() override {
}

virtual void onTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory);
virtual void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory);
void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session);

virtual void onTriggerSharedPtr(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
onTrigger(*context, *session);
Expand Down Expand Up @@ -219,6 +220,10 @@ class Processor : public Connectable, public ConfigurableComponent, public state
return metrics_;
}

auto getMetrics() const {
return metrics_;
}

static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};

static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{};
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/core/ProcessorMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ProcessorMetrics : public state::response::ResponseNode {
};

[[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() const;
static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;

std::mutex transferred_relationships_mutex_;
std::unordered_map<std::string, size_t> transferred_relationships_;
Expand Down
15 changes: 6 additions & 9 deletions libminifi/src/CronDrivenSchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
#include "CronDrivenSchedulingAgent.h"
#include <chrono>
#include <memory>
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSessionFactory.h"
Expand All @@ -36,30 +35,28 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces
using std::chrono::system_clock;

if (this->running_ && processor->isRunning()) {
auto uuid = processor->getUUID();
auto current_time = date::make_zoned<seconds>(date::current_zone(), time_point_cast<seconds>(system_clock::now()));
const auto uuid = processor->getUUID();
const auto current_time = date::make_zoned<seconds>(date::current_zone(), time_point_cast<seconds>(system_clock::now()));
std::lock_guard<std::mutex> lock(mutex_);

schedules_.emplace(uuid, utils::Cron(processor->getCronPeriod()));
last_exec_.emplace(uuid, current_time.get_local_time());

auto last_trigger = last_exec_[uuid];
auto next_to_last_trigger = schedules_.at(uuid).calculateNextTrigger(last_trigger);
const auto last_trigger = last_exec_[uuid];
const auto next_to_last_trigger = schedules_.at(uuid).calculateNextTrigger(last_trigger);
if (!next_to_last_trigger)
return utils::TaskRescheduleInfo::Done();

if (*next_to_last_trigger > current_time.get_local_time())
return utils::TaskRescheduleInfo::RetryIn(*next_to_last_trigger-current_time.get_local_time());

auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory);

if (on_trigger_result)
if (this->triggerAndCommit(processor, processContext, sessionFactory))
last_exec_[uuid] = current_time.get_local_time();

if (processor->isYield())
return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());

if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
if (const auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
return utils::TaskRescheduleInfo::RetryIn(*next_trigger-current_time.get_local_time());
}
return utils::TaskRescheduleInfo::Done();
Expand Down
68 changes: 57 additions & 11 deletions libminifi/src/EventDrivenSchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,66 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* processor) {
ThreadedSchedulingAgent::schedule(processor);
}

utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (this->running_) {
auto start_time = std::chrono::steady_clock::now();
// trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice
while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
if (!this->running_) {
return utils::TaskRescheduleInfo::Done();
}
if (processorYields(processor)) {
return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
}

const auto start_time = std::chrono::steady_clock::now();
// trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice
// trigger processor while it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice


const auto process_session = session_factory->createSession();
process_session->setMetrics(processor->getMetrics());
bool needs_commit = true;

while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
const auto trigger_result = this->trigger(processor, process_context, process_session);
if (!trigger_result) {
try {
std::rethrow_exception(trigger_result.error());
} catch (const std::exception& exception) {
logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})",
exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName());
needs_commit = false;
break;
} catch (...) {
logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName());
needs_commit = false;
break;
}
}
return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
if (!*trigger_result) {
logger_->log_trace("Processor {} ({}) yielded", processor->getUUIDStr(), processor->getName());
break;
}
}
return utils::TaskRescheduleInfo::Done();
try {
if (needs_commit) {
process_session->commit();
} else {
process_session->rollback();
}
} catch (const std::exception& exception) {
logger_->log_warn("Caught \"{}\" ({}) during ProcessSession::commit after triggering processor: {} ({})",
exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName());
process_session->rollback();
throw;
} catch (...) {
logger_->log_warn("Caught unknown exception during ProcessSession::commit after triggering processor: {} ({})", processor->getUUIDStr(), processor->getName());
process_session->rollback();
throw;
}

if (processor->isYield()) {
return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
}

return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
}

} // namespace org::apache::nifi::minifi
61 changes: 54 additions & 7 deletions libminifi/src/SchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {

namespace org::apache::nifi::minifi {

nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Processor* processor,
const std::shared_ptr<core::ProcessContext> &process_context,
const std::shared_ptr<core::ProcessSessionFactory> &session_factory) {
gsl_Expects(processor);
bool SchedulingAgent::processorYields(core::Processor* processor) const {
if (processor->isYield()) {
logger_->log_debug("Not running {} since it must yield", processor->getName());
return {};
return true;
}

// No need to yield, reset yield expiration to 0
Expand All @@ -52,11 +49,22 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Proc

if (!hasWorkToDo(processor)) {
processor->yield(bored_yield_duration);
return {};
return true;
}
if (processor->isThrottledByBackpressure()) {
logger_->log_debug("backpressure applied because too much outgoing for {} {}", processor->getUUIDStr(), processor->getName());
processor->yield(bored_yield_duration);
return true;
}

return false;
}

nonstd::expected<void, std::exception_ptr> SchedulingAgent::triggerAndCommit(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
gsl_Expects(processor);
if (processorYields(processor)) {
return {};
}

Expand All @@ -74,8 +82,9 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Proc

processor->incrementActiveTasks();
auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); });

try {
processor->onTrigger(process_context, session_factory);
processor->triggerAndCommit(process_context, session_factory);
} catch (const std::exception& exception) {
logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor {} (uuid: {}), type: {}, what: {}",
processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what());
Expand All @@ -90,6 +99,44 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Proc
return {};
}

nonstd::expected<bool, std::exception_ptr> SchedulingAgent::trigger(core::Processor* processor,
const std::shared_ptr<core::ProcessContext>& process_context,
const std::shared_ptr<core::ProcessSession>& process_session) {
gsl_Expects(processor);
if (processorYields(processor)) {
return false;
}

auto schedule_it = scheduled_processors_.end();

{
std::lock_guard<std::mutex> lock(watchdog_mtx_);
schedule_it = scheduled_processors_.emplace(processor).first;
}

const auto guard = gsl::finally([this, &schedule_it](){
std::lock_guard<std::mutex> lock(watchdog_mtx_);
scheduled_processors_.erase(schedule_it);
});

processor->incrementActiveTasks();
auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); });
try {
processor->trigger(process_context, process_session);
} catch (const std::exception& exception) {
logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor {} (uuid: {}), type: {}, what: {}",
processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what());
processor->yield(admin_yield_duration_);
return nonstd::make_unexpected(std::current_exception());
} catch (...) {
logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor {} (uuid: {}), type: {}",
processor->getName(), processor->getUUIDStr(), getCurrentExceptionTypeName());
processor->yield(admin_yield_duration_);
return nonstd::make_unexpected(std::current_exception());
}
return true;
}

void SchedulingAgent::watchDogFunc() {
std::lock_guard<std::mutex> lock(watchdog_mtx_);
auto now = std::chrono::steady_clock::now();
Expand Down
8 changes: 4 additions & 4 deletions libminifi/src/TimerDrivenSchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ namespace org::apache::nifi::minifi {
utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (this->running_ && processor->isRunning()) {
auto trigger_start_time = std::chrono::steady_clock::now();
this->onTrigger(processor, processContext, sessionFactory);
const auto trigger_start_time = std::chrono::steady_clock::now();
this->triggerAndCommit(processor, processContext, sessionFactory);

auto next_scheduled_run = trigger_start_time + processor->getSchedulingPeriod();
auto yield_expiration_time = processor->getYieldExpirationTime();
const auto next_scheduled_run = trigger_start_time + processor->getSchedulingPeriod();
const auto yield_expiration_time = processor->getYieldExpirationTime();
return utils::TaskRescheduleInfo::RetryAfter(std::max(next_scheduled_run, yield_expiration_time));
}
return utils::TaskRescheduleInfo::Done();
Expand Down
Loading