Skip to content

Commit

Permalink
Merge pull request #1759
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Feb 16, 2024
2 parents d28e576 + 091958e commit ab1f6bf
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 4 deletions.
2 changes: 2 additions & 0 deletions libcaf_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ caf_add_component(
${CAF_CORE_HEADERS}
SOURCES
caf/abstract_actor.cpp
caf/abstract_blocking_actor.cpp
caf/abstract_mailbox.cpp
caf/action.cpp
caf/action.test.cpp
Expand Down Expand Up @@ -113,6 +114,7 @@ caf_add_component(
caf/binary_serializer.cpp
caf/blocking_actor.cpp
caf/blocking_actor.test.cpp
caf/blocking_mail.test.cpp
caf/byte_span.cpp
caf/chrono.cpp
caf/chrono.test.cpp
Expand Down
13 changes: 13 additions & 0 deletions libcaf_core/caf/abstract_blocking_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/master/LICENSE.

#include "caf/abstract_blocking_actor.hpp"

namespace caf {

abstract_blocking_actor::~abstract_blocking_actor() {
// nop
}

} // namespace caf
31 changes: 31 additions & 0 deletions libcaf_core/caf/abstract_blocking_actor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/master/LICENSE.

#pragma once

#include "caf/actor_clock.hpp"
#include "caf/fwd.hpp"
#include "caf/local_actor.hpp"

namespace caf {

/// A thread-mapped or context-switching actor using a blocking
/// receive rather than a behavior-stack based message processing.
/// @extends local_actor
class CAF_CORE_EXPORT abstract_blocking_actor : public local_actor {
public:
template <class>
friend class blocking_response_handle;

using super = local_actor;

using super::super;

~abstract_blocking_actor() override;

private:
virtual void do_receive(message_id mid, behavior& bhvr, timespan timeout) = 0;
};

} // namespace caf
11 changes: 11 additions & 0 deletions libcaf_core/caf/blocking_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,15 @@ void blocking_actor::force_close_mailbox() {
close_mailbox(make_error(exit_reason::unreachable));
}

void blocking_actor::do_receive(message_id mid, behavior& bhvr,
timespan timeout) {
accept_one_cond cond;
auto tmp = after(timeout) >> [&] {
auto err = make_message(make_error(sec::request_timeout));
bhvr(err);
};
auto fun = detail::make_blocking_behavior(&bhvr, std::move(tmp));
receive_impl(cond, mid, fun);
}

} // namespace caf
17 changes: 14 additions & 3 deletions libcaf_core/caf/blocking_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@

#pragma once

#include "caf/abstract_blocking_actor.hpp"
#include "caf/abstract_mailbox.hpp"
#include "caf/actor_config.hpp"
#include "caf/actor_traits.hpp"
#include "caf/after.hpp"
#include "caf/behavior.hpp"
#include "caf/blocking_mail.hpp"
#include "caf/detail/apply_args.hpp"
#include "caf/detail/blocking_behavior.hpp"
#include "caf/detail/core_export.hpp"
#include "caf/detail/default_mailbox.hpp"
#include "caf/detail/type_list.hpp"
#include "caf/detail/type_traits.hpp"
#include "caf/dynamically_typed.hpp"
#include "caf/extend.hpp"
#include "caf/fwd.hpp"
#include "caf/intrusive/stack.hpp"
Expand All @@ -38,8 +41,8 @@ namespace caf {
/// receive rather than a behavior-stack based message processing.
/// @extends local_actor
class CAF_CORE_EXPORT blocking_actor
: public extend<local_actor, blocking_actor>::with<mixin::sender,
mixin::requester>,
: public extend<abstract_blocking_actor,
blocking_actor>::with<mixin::sender, mixin::requester>,
public dynamically_typed_actor_base,
public blocking_actor_base {
public:
Expand Down Expand Up @@ -275,6 +278,12 @@ class CAF_CORE_EXPORT blocking_actor
/// is signalized to other actors after `act()` returns.
void fail_state(error err);

template <class... Args>
auto mail(Args&&... args) {
return blocking_mail(dynamically_typed{}, this,
std::forward<Args>(args)...);
}

// -- customization points ---------------------------------------------------

/// Blocks until at least one message is in the mailbox.
Expand All @@ -285,7 +294,7 @@ class CAF_CORE_EXPORT blocking_actor
virtual bool await_data(timeout_type timeout);

/// Returns the next element from the mailbox or `nullptr`.
virtual mailbox_element_ptr dequeue();
mailbox_element_ptr dequeue();

/// Returns the queue for storing incoming messages.
abstract_mailbox& mailbox() {
Expand Down Expand Up @@ -341,6 +350,8 @@ class CAF_CORE_EXPORT blocking_actor
/// @endcond

private:
void do_receive(message_id mid, behavior& bhvr, timespan timeout) override;

size_t attach_functor(const actor&);

size_t attach_functor(const actor_addr&);
Expand Down
145 changes: 145 additions & 0 deletions libcaf_core/caf/blocking_mail.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/master/LICENSE.

#pragma once

#include "caf/abstract_blocking_actor.hpp"
#include "caf/async_mail.hpp"
#include "caf/blocking_response_handle.hpp"

namespace caf {

/// Provides a fluent interface for sending asynchronous messages to actors at a
/// specific point in time.
template <message_priority Priority, class Trait, class... Args>
class blocking_scheduled_mail_t
: public async_scheduled_mail_t<Priority, Trait, Args...> {
public:
using super = async_scheduled_mail_t<Priority, Trait, Args...>;

blocking_scheduled_mail_t(abstract_blocking_actor* self, message&& content,
actor_clock::time_point timeout)
: super(self, std::move(content), timeout) {
// nop
}

/// Sends the message to `receiver` as a request message and returns a handle
/// for processing the response.
/// @param receiver The actor that should receive the message.
/// @param relative_timeout The maximum time to wait for a response.
/// @param ref_tag Either `strong_ref` or `weak_ref`. When passing
/// `strong_ref`, the system will keep a strong reference to
/// the receiver until the message has been delivered.
/// Otherwise, the system will only keep a weak reference to
/// the receiver and the message will be dropped if the
/// receiver has been garbage collected in the meantime.
/// @param self_ref_tag Either `strong_self_ref` or `weak_self_ref`. When
/// passing `strong_self_ref`, the system will keep a
/// strong reference to the sender until the message has
/// been delivered. Otherwise, the system will only keep
/// a weak reference to the sender and the message will be
/// dropped if the sender has been garbage collected in
/// the meantime.
template <class Handle, class RefTag = strong_ref_t,
class SelfRefTag = strong_self_ref_t>
[[nodiscard]] auto request(const Handle& receiver, timespan relative_timeout,
RefTag ref_tag = {},
SelfRefTag self_ref_tag = {}) && {
detail::send_type_check<typename Trait::signatures, Handle, Args...>();
using response_type = response_type_t<typename Handle::signatures, Args...>;
auto mid = self()->new_request_id(Priority);
disposable in_flight;
if (receiver) {
auto& clock = self()->clock();
in_flight = clock.schedule_message(actor_cast(super::self_, self_ref_tag),
actor_cast(receiver, ref_tag),
super::timeout_, mid,
std::move(super::content_));

} else {
self()->enqueue(make_mailbox_element(self()->ctrl(), mid.response_id(),
make_error(sec::invalid_request)),
self()->context());
}
using hdl_t = blocking_response_handle<response_type>;
auto hdl = hdl_t{self(), mid.response_id(), relative_timeout};
return std::pair{std::move(hdl), std::move(in_flight)};
}

private:
abstract_blocking_actor* self() {
return static_cast<abstract_blocking_actor*>(super::self_);
}
};

/// Provides a fluent interface for sending asynchronous messages to actors.
template <message_priority Priority, class Trait, class... Args>
class blocking_mail_t : public async_mail_base_t<Priority, Trait, Args...> {
public:
using super = async_mail_base_t<Priority, Trait, Args...>;

blocking_mail_t(abstract_blocking_actor* self, message&& content)
: super(self, std::move(content)) {
// nop
}

/// Tags the message as urgent, i.e., sends it with high priority.
template <message_priority P = Priority,
class E = std::enable_if_t<P == message_priority::normal>>
[[nodiscard]] auto urgent() && {
using result_t = blocking_mail_t<message_priority::high, Trait, Args...>;
return result_t{self(), std::move(super::content_)};
}

/// Schedules the message for delivery with an absolute timeout.
[[nodiscard]] auto schedule(actor_clock::time_point timeout) && {
using result_t = blocking_scheduled_mail_t<Priority, Trait, Args...>;
return result_t{self(), std::move(super::content_), timeout};
}

/// Schedules the message for delivery with a relative timeout.
[[nodiscard]] auto delay(actor_clock::duration_type timeout) && {
using clock = actor_clock::clock_type;
using result_t = blocking_scheduled_mail_t<Priority, Trait, Args...>;
return result_t{self(), std::move(super::content_), clock::now() + timeout};
}

/// Sends the message to `receiver` as a request message and returns a handle
/// for processing the response.
template <class Handle>
[[nodiscard]] auto
request(const Handle& receiver, timespan relative_timeout) && {
detail::send_type_check<typename Trait::signatures, Handle, Args...>();
using response_type = response_type_t<typename Handle::signatures, Args...>;
auto mid = self()->new_request_id(Priority);
if (receiver) {
auto* ptr = actor_cast<abstract_actor*>(receiver);
ptr->enqueue(make_mailbox_element(self()->ctrl(), mid,
std::move(super::content_)),
self()->context());
} else {
self()->enqueue(make_mailbox_element(self()->ctrl(), mid.response_id(),
make_error(sec::invalid_request)),
self()->context());
}
using hdl_t = blocking_response_handle<response_type>;
return hdl_t{self(), mid.response_id(), relative_timeout};
}

private:
abstract_blocking_actor* self() {
return static_cast<abstract_blocking_actor*>(super::self_);
}
};

/// Entry point for sending an event-based message to an actor.
template <class Trait, class... Args>
[[nodiscard]] auto
blocking_mail(Trait, abstract_blocking_actor* self, Args&&... args) {
using result_t = blocking_mail_t<message_priority::normal, Trait,
detail::strip_and_convert_t<Args>...>;
return result_t{self, make_message(std::forward<Args>(args)...)};
}

} // namespace caf

0 comments on commit ab1f6bf

Please sign in to comment.