Skip to content

Commit

Permalink
Give actors more leeway in grouping messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Sep 4, 2020
1 parent b23bf28 commit 243802e
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 45 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Expand Up @@ -127,6 +127,12 @@ is based on [Keep a Changelog](https://keepachangelog.com).
*types*, but did not enforce it because the name was only used for logging.
Since the new metrics use this name for filtering now, we enforce static names
in order to help avoid hard-to-find issues with the filtering mechanism.
- The config parameter `caf.scheduler.max-throughput` was replaced with
`caf.scheduler.cooperative-yield-after`. Further, the new parameter no longer
reflects a hard limit on how many messages an actor may consume in a single
run. Rather, actors are encouraged to allow other actors to run after
consuming that many messages. This gives actors more leeway in how they
process messages.

### Removed

Expand Down Expand Up @@ -183,7 +189,7 @@ is based on [Keep a Changelog](https://keepachangelog.com).
- Datagram servants of UDP socket managers were not added as children to their
parent broker on creation, which prevented proper system shutdown in some
cases. Adding all servants consistently to the broker should make sure UDP
brokers terminate correctly (#1133).
brokers terminate correctly (#1133).

## [0.17.6] - 2020-07-24

Expand Down
2 changes: 1 addition & 1 deletion libcaf_core/caf/defaults.hpp
Expand Up @@ -91,7 +91,7 @@ namespace caf::defaults::scheduler {

constexpr auto policy = string_view{"stealing"};
constexpr auto profiling_output_file = string_view{""};
constexpr auto max_throughput = std::numeric_limits<size_t>::max();
constexpr auto cooperative_yield_after = std::numeric_limits<size_t>::max();
constexpr auto profiling_resolution = timespan(100'000'000);

} // namespace caf::defaults::scheduler
Expand Down
2 changes: 1 addition & 1 deletion libcaf_core/caf/resumable.hpp
Expand Up @@ -64,7 +64,7 @@ class CAF_CORE_EXPORT resumable {

/// Resume any pending computation until it is either finished
/// or needs to be re-scheduled later.
virtual resume_result resume(execution_unit*, size_t max_throughput) = 0;
virtual resume_result resume(execution_unit*, size_t yield_after) = 0;

/// Add a strong reference count to this object.
virtual void intrusive_ptr_add_ref_impl() = 0;
Expand Down
6 changes: 3 additions & 3 deletions libcaf_core/caf/scheduler/abstract_coordinator.hpp
Expand Up @@ -62,8 +62,8 @@ class CAF_CORE_EXPORT abstract_coordinator : public actor_system::module {

const actor_system_config& config() const;

size_t max_throughput() const {
return max_throughput_;
size_t yield_after() const {
return yield_after_;
}

size_t num_workers() const {
Expand Down Expand Up @@ -94,7 +94,7 @@ class CAF_CORE_EXPORT abstract_coordinator : public actor_system::module {
std::atomic<size_t> next_worker_;

/// Number of messages each actor is allowed to consume per resume.
size_t max_throughput_;
size_t yield_after_;

/// Configured number of workers.
size_t num_workers_;
Expand Down
2 changes: 1 addition & 1 deletion libcaf_core/caf/scheduler/coordinator.hpp
Expand Up @@ -67,7 +67,7 @@ class coordinator : public abstract_coordinator {
workers_.reserve(num);
// Create worker instanes.
for (size_t i = 0; i < num; ++i)
workers_.emplace_back(new worker_type(i, this, init, max_throughput_));
workers_.emplace_back(new worker_type(i, this, init, yield_after_));
// Start all workers.
for (auto& w : workers_)
w->start();
Expand Down
10 changes: 5 additions & 5 deletions libcaf_core/caf/scheduler/worker.hpp
Expand Up @@ -42,7 +42,7 @@ class worker : public execution_unit {
worker(size_t worker_id, coordinator_ptr worker_parent,
const policy_data& init, size_t throughput)
: execution_unit(&worker_parent->system()),
max_throughput_(throughput),
yield_after_(throughput),
id_(worker_id),
parent_(worker_parent),
data_(init) {
Expand Down Expand Up @@ -101,8 +101,8 @@ class worker : public execution_unit {
return data_;
}

size_t max_throughput() {
return max_throughput_;
size_t yield_after() {
return yield_after_;
}

private:
Expand All @@ -115,7 +115,7 @@ class worker : public execution_unit {
CAF_ASSERT(job->subtype() != resumable::io_actor);
CAF_PUSH_AID_FROM_PTR(dynamic_cast<abstract_actor*>(job));
policy_.before_resume(this, job);
auto res = job->resume(this, max_throughput_);
auto res = job->resume(this, yield_after_);
policy_.after_resume(this, job);
switch (res) {
case resumable::resume_later: {
Expand All @@ -142,7 +142,7 @@ class worker : public execution_unit {
}
}
// number of messages each actor is allowed to consume per resume
size_t max_throughput_;
size_t yield_after_;
// the worker's thread
std::thread this_thread_;
// the worker's ID received from scheduler
Expand Down
7 changes: 4 additions & 3 deletions libcaf_core/src/actor_system_config.cpp
Expand Up @@ -85,7 +85,8 @@ actor_system_config::actor_system_config()
opt_group{custom_options_, "caf.scheduler"}
.add<string>("policy", "'stealing' (default) or 'sharing'")
.add<size_t>("max-threads", "maximum number of worker threads")
.add<size_t>("max-throughput", "nr. of messages actors can consume per run")
.add<size_t>("cooperative-yield-after",
"nr. of messages actors may consume per run")
.add<bool>("enable-profiling", "enables profiler output")
.add<timespan>("profiling-resolution", "data collection rate")
.add<string>("profiling-output-file", "output file for the profiler");
Expand Down Expand Up @@ -131,8 +132,8 @@ settings actor_system_config::dump_content() const {
// -- scheduler parameters
auto& scheduler_group = caf_group["scheduler"].as_dictionary();
put_missing(scheduler_group, "policy", defaults::scheduler::policy);
put_missing(scheduler_group, "max-throughput",
defaults::scheduler::max_throughput);
put_missing(scheduler_group, "cooperative-yield-after",
defaults::scheduler::cooperative_yield_after);
put_missing(scheduler_group, "enable-profiling", false);
put_missing(scheduler_group, "profiling-resolution",
defaults::scheduler::profiling_resolution);
Expand Down
4 changes: 2 additions & 2 deletions libcaf_core/src/detail/private_thread.cpp
Expand Up @@ -41,13 +41,13 @@ void private_thread::run() {
CAF_PUSH_AID(job->id());
CAF_LOG_TRACE("");
scoped_execution_unit ctx{&job->system()};
auto max_throughput = std::numeric_limits<size_t>::max();
auto yield_after = std::numeric_limits<size_t>::max();
bool resume_later;
for (;;) {
state_ = await_resume_or_shutdown;
do {
resume_later = false;
switch (job->resume(&ctx, max_throughput)) {
switch (job->resume(&ctx, yield_after)) {
case resumable::resume_later:
resume_later = true;
break;
Expand Down
37 changes: 18 additions & 19 deletions libcaf_core/src/scheduled_actor.cpp
Expand Up @@ -279,9 +279,9 @@ void scheduled_actor::intrusive_ptr_release_impl() {
}

resumable::resume_result scheduled_actor::resume(execution_unit* ctx,
size_t max_throughput) {
size_t yield_after) {
CAF_PUSH_AID(id());
CAF_LOG_TRACE(CAF_ARG(max_throughput));
CAF_LOG_TRACE(CAF_ARG(yield_after));
if (!activate(ctx))
return resumable::done;
size_t consumed = 0;
Expand All @@ -299,14 +299,14 @@ resumable::resume_result scheduled_actor::resume(execution_unit* ctx,
}
};
// Callback for handling urgent and normal messages.
auto handle_async = [this, max_throughput, &consumed](mailbox_element& x) {
return run_with_metrics(x, [this, max_throughput, &consumed, &x] {
auto handle_async = [this, &consumed](mailbox_element& x) {
return run_with_metrics(x, [this, &consumed, &x] {
switch (reactivate(x)) {
case activation_result::terminated:
return intrusive::task_result::stop;
case activation_result::success:
return ++consumed < max_throughput ? intrusive::task_result::resume
: intrusive::task_result::stop_all;
++consumed;
return intrusive::task_result::resume;
case activation_result::skipped:
return intrusive::task_result::skip;
default:
Expand All @@ -315,8 +315,8 @@ resumable::resume_result scheduled_actor::resume(execution_unit* ctx,
});
};
// Callback for handling upstream messages (e.g., ACKs).
auto handle_umsg = [this, max_throughput, &consumed](mailbox_element& x) {
return run_with_metrics(x, [this, max_throughput, &consumed, &x] {
auto handle_umsg = [this, &consumed](mailbox_element& x) {
return run_with_metrics(x, [this, &consumed, &x] {
current_mailbox_element(&x);
CAF_LOG_RECEIVE_EVENT((&x));
CAF_BEFORE_PROCESSING(this, x);
Expand All @@ -327,14 +327,14 @@ resumable::resume_result scheduled_actor::resume(execution_unit* ctx,
};
visit(f, um.content);
CAF_AFTER_PROCESSING(this, invoke_message_result::consumed);
return ++consumed < max_throughput ? intrusive::task_result::resume
: intrusive::task_result::stop_all;
++consumed;
return intrusive::task_result::resume;
});
};
// Callback for handling downstream messages (e.g., batches).
auto handle_dmsg = [this, &consumed, max_throughput](stream_slot, auto& q,
mailbox_element& x) {
return run_with_metrics(x, [this, max_throughput, &consumed, &q, &x] {
auto handle_dmsg = [this, &consumed](stream_slot, auto& q,
mailbox_element& x) {
return run_with_metrics(x, [this, &consumed, &q, &x] {
current_mailbox_element(&x);
CAF_LOG_RECEIVE_EVENT((&x));
CAF_BEFORE_PROCESSING(self, x);
Expand Down Expand Up @@ -384,13 +384,13 @@ resumable::resume_result scheduled_actor::resume(execution_unit* ctx,
};
auto res = visit(f, dm.content);
CAF_AFTER_PROCESSING(self, invoke_message_result::consumed);
return ++consumed < max_throughput ? res
: intrusive::task_result::stop_all;
++consumed;
return res;
});
};
std::vector<stream_manager*> managers;
mailbox_element_ptr ptr;
while (consumed < max_throughput) {
while (consumed < yield_after) {
CAF_LOG_DEBUG("start new DRR round");
mailbox_.fetch_more();
auto prev = consumed; // Caches the value before processing more.
Expand Down Expand Up @@ -421,9 +421,8 @@ resumable::resume_result scheduled_actor::resume(execution_unit* ctx,
active_stream_managers(managers);
for (auto mgr : managers)
mgr->push();
} while (
consumed < max_throughput
&& get_downstream_queue().new_round(0, handle_dmsg).consumed_items > 0);
} while (get_downstream_queue().new_round(0, handle_dmsg).consumed_items
> 0);
}
// Update metrics or try returning if the actor consumed nothing.
auto delta = consumed - prev;
Expand Down
6 changes: 3 additions & 3 deletions libcaf_core/src/scheduler/abstract_coordinator.cpp
Expand Up @@ -247,8 +247,8 @@ void abstract_coordinator::start() {

void abstract_coordinator::init(actor_system_config& cfg) {
namespace sr = defaults::scheduler;
max_throughput_ = get_or(cfg, "caf.scheduler.max-throughput",
sr::max_throughput);
yield_after_ = get_or(cfg, "caf.scheduler.cooperative-yield-after",
sr::cooperative_yield_after);
if (auto num_workers = get_if<size_t>(&cfg, "caf.scheduler.max-threads"))
num_workers_ = *num_workers;
else
Expand All @@ -272,7 +272,7 @@ void abstract_coordinator::stop_actors() {
}

abstract_coordinator::abstract_coordinator(actor_system& sys)
: next_worker_(0), max_throughput_(0), num_workers_(0), system_(sys) {
: next_worker_(0), yield_after_(0), num_workers_(0), system_(sys) {
// nop
}

Expand Down
5 changes: 3 additions & 2 deletions libcaf_io/caf/io/network/default_multiplexer.hpp
Expand Up @@ -261,8 +261,9 @@ class CAF_IO_EXPORT default_multiplexer : public multiplexer {
/// Sequential ids for handles of datagram servants
int64_t servant_ids_;

/// Maximum messages per resume run.
size_t max_throughput_;
/// Threshold when actors should return from `resume` in order to give other
/// actors a chance to process their messages.
size_t yield_after_;
};

inline connection_handle conn_hdl_from_socket(native_socket fd) {
Expand Down
9 changes: 5 additions & 4 deletions libcaf_io/src/io/network/default_multiplexer.cpp
Expand Up @@ -153,7 +153,7 @@ default_multiplexer::default_multiplexer(actor_system* sys)
shadow_(1),
pipe_reader_(*this),
servant_ids_(0),
max_throughput_(0) {
yield_after_(0) {
init();
epollfd_ = epoll_create1(EPOLL_CLOEXEC);
if (epollfd_ == -1) {
Expand Down Expand Up @@ -589,8 +589,9 @@ void default_multiplexer::init() {
}
#endif
namespace sr = defaults::scheduler;
max_throughput_ = get_or(system().config(), "caf.scheduler.max-throughput",
sr::max_throughput);
yield_after_ = get_or(system().config(),
"caf.scheduler.cooperative-yield-after",
sr::cooperative_yield_after);
}

bool default_multiplexer::poll_once(bool block) {
Expand All @@ -616,7 +617,7 @@ bool default_multiplexer::poll_once(bool block) {

void default_multiplexer::resume(intrusive_ptr<resumable> ptr) {
CAF_LOG_TRACE("");
switch (ptr->resume(this, max_throughput_)) {
switch (ptr->resume(this, yield_after_)) {
case resumable::resume_later:
// Delay resumable until next cycle.
internally_posted_.emplace_back(ptr.release(), false);
Expand Down

0 comments on commit 243802e

Please sign in to comment.