Skip to content

Commit

Permalink
Merge pull request #1835
Browse files Browse the repository at this point in the history
Enable remote spawn for stateful actors
  • Loading branch information
Neverlord committed Apr 29, 2024
2 parents c95efd3 + f95d71a commit 84a42f0
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 7 deletions.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Expand Up @@ -65,6 +65,7 @@ if(TARGET CAF::io)

# basic remoting
add_io_example(remoting remote_spawn)
add_io_example(remoting stateful_remote_spawn)
add_io_example(remoting distributed_calculator)

if(CAF_ENABLE_CURL_EXAMPLES)
Expand Down
181 changes: 181 additions & 0 deletions examples/remoting/stateful_remote_spawn.cpp
@@ -0,0 +1,181 @@
// This program illustrates how to spawn a simple aggregator
// across the network.
//
// Run server at port 4242:
// - remote_spawn -s -p 4242
//
// Run client at the same host:
// - remote_spawn -H localhost -p 4242

#include "caf/io/middleman.hpp"

#include "caf/actor_from_state.hpp"
#include "caf/actor_system.hpp"
#include "caf/actor_system_config.hpp"
#include "caf/caf_main.hpp"
#include "caf/scoped_actor.hpp"

#include <iostream>
#include <string>
#include <vector>

struct aggregator_trait {
using signatures = caf::type_list<caf::result<void>(caf::add_atom, int32_t),
caf::result<int32_t>(caf::get_atom)>;
};

using aggregator_actor = caf::typed_actor<aggregator_trait>;

CAF_BEGIN_TYPE_ID_BLOCK(remote_spawn, first_custom_type_id)

CAF_ADD_TYPE_ID(remote_spawn, (aggregator_actor))

CAF_END_TYPE_ID_BLOCK(remote_spawn)

using std::string;

using namespace caf;
using namespace std::literals;

struct aggregator_state {
aggregator_state() = default;
explicit aggregator_state(int32_t init) : value(init) {
// nop
}

aggregator_actor::behavior_type make_behavior() {
return {
[this](add_atom, int32_t a) { value += a; },
[this](get_atom) -> result<int32_t> { return value; },
};
}

int32_t value = 0;
};

// Removes leading and trailing white spaces.
string trim(string s) {
auto not_space = [](char c) { return isspace(c) == 0; };
// trim left
s.erase(s.begin(), find_if(s.begin(), s.end(), not_space));
// trim right
s.erase(find_if(s.rbegin(), s.rend(), not_space).base(), s.end());
return s;
}

// Implements our main loop for reading user input.
void client_repl(actor_system& sys, aggregator_actor hdl) {
auto usage = [&sys] {
sys.println("Usage:");
sys.println(" quit : terminate program");
sys.println(" add <x> : adds x to remote aggregator");
sys.println(" get : prints the aggregated value");
sys.println("");
};
usage();
scoped_actor self{sys};
self->link_to(hdl);
// read next line, split it, and evaluate user input
string line;
while (std::getline(std::cin, line)) {
line = trim(std::move(line));
if (line == "quit")
return;
std::vector<string> words;
split(words, line, is_any_of(" "), token_compress_on);
if (words.size() > 2) {
usage();
continue;
}
auto to_int32_t = [](const string& str) -> std::optional<int32_t> {
char* end = nullptr;
auto res = strtol(str.c_str(), &end, 10);
if (end == str.c_str() + str.size())
return static_cast<int32_t>(res);
return std::nullopt;
};
if (words[0] == "add" && words.size() == 2) {
auto x = to_int32_t(words[1]);
if (!x) {
usage();
continue;
}
self->mail(add_atom_v, *x).send(hdl);
} else if (words[0] == "get") {
if (auto value = self->mail(get_atom_v).request(hdl, 1s).receive()) {
sys.println("Aggregated: {}", *value);
} else {
sys.println("Error fetching value: {}", value.error());
}
} else {
usage();
}
}
}

constexpr auto default_port = uint16_t{0};
constexpr auto default_host = "localhost"sv;
constexpr auto default_server_mode = false;

struct config : actor_system_config {
config() {
// Constructor parameters are listed after naming the actor.
add_actor_type("aggregator", actor_from_state<aggregator_state>,
type_list_v<>, type_list_v<int32_t>);
opt_group{custom_options_, "global"}
.add<uint16_t>("port,p", "set port")
.add<std::string>("host,H", "set node (ignored in server mode)")
.add<bool>("server-mode,s", "enable server mode");
}

caf::settings dump_content() const override {
auto result = actor_system_config::dump_content();
put_missing(result, "port", default_port);
put_missing(result, "host", default_host);
put_missing(result, "server-mode", default_server_mode);
return result;
}
};

void server(actor_system& sys, const config& cfg) {
const auto port = get_or(cfg, "port", default_port);
auto res = sys.middleman().open(port);
if (!res) {
sys.println("*** cannot open port: {}", to_string(res.error()));
return;
}
sys.println("*** running on port: {}", *res);
sys.println("*** press <enter> to shutdown server");
getchar();
}

void client(actor_system& sys, const config& cfg) {
auto host = get_or(cfg, "host", default_host);
auto port = get_or(cfg, "port", default_port);
auto node = sys.middleman().connect(host, port);
if (!node) {
sys.println("*** connect failed: {}", node.error());
return;
}
auto type = "aggregator"; // type of the actor we wish to spawn
auto args = make_message(int32_t{100}); // arguments to construct the actor
auto tout = std::chrono::seconds(5); // timeout for the remote spawn
auto aggregator = sys.middleman().remote_spawn<aggregator_actor>(*node, type,
args, tout);
if (!aggregator) {
sys.println("*** remote spawn failed: {}", aggregator.error());
return;
}
// start using worker in main loop
client_repl(sys, *aggregator);
// be a good citizen and terminate remotely spawned actor before exiting
anon_send_exit(*aggregator, exit_reason::kill);
}

void caf_main(actor_system& sys, const config& cfg) {
const auto server_mode = get_or(cfg, "server-mode", default_server_mode);
auto f = server_mode ? server : client;
f(sys, cfg);
}

CAF_MAIN(id_block::remote_spawn, io::middleman)
42 changes: 38 additions & 4 deletions libcaf_core/caf/actor_factory.hpp
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include "caf/actor_addr.hpp"
#include "caf/actor_system.hpp"
#include "caf/detail/type_traits.hpp"
#include "caf/infer_handle.hpp"
Expand Down Expand Up @@ -89,6 +88,7 @@ struct message_verifier<spawn_mode::function_with_selfptr,
}
};

// Helper function for creating actor factories for function based actors.
template <class F>
actor_factory make_actor_factory(F fun) {
return [fun](actor_system& sys, actor_config& cfg,
Expand Down Expand Up @@ -118,6 +118,8 @@ actor_factory make_actor_factory(F fun) {
};
}

namespace detail {

template <class Handle, class T, class... Ts>
struct dyn_spawn_class_helper {
Handle& result;
Expand All @@ -140,13 +142,45 @@ dyn_spawn_class(actor_system& sys, actor_config& cfg, message& msg) {
sys.message_types<handle>()};
}

template <class... Ts>
struct actor_from_state_helper;

template <class Handle, class T, class... Ts>
struct actor_from_state_helper<Handle, T, type_list<Ts...>> {
Handle& result;
actor_system& sys;
void operator()(Ts... xs) {
result = sys.spawn(T{}, xs...);
}
};

template <class T, class... Ts>
actor_factory_result
spawn_actor_from_state(actor_system& sys, actor_config&, message& msg) {
using handle = typename T::handle_type;
handle hdl;
message_handler factory{actor_from_state_helper<handle, T, Ts>{hdl, sys}...};
factory(msg);
return {actor_cast<strong_actor_ptr>(std::move(hdl)),
sys.message_types<handle>()};
}

} // namespace detail

// Helper function for creating `actor_from_state<T>` factories.
template <class T, class... Ts>
actor_factory make_actor_factory(T, Ts...) {
static_assert((detail::is_type_list_v<Ts> && ...),
"All Ts must be type_list values");
return &detail::spawn_actor_from_state<T, Ts...>;
}

// Helper function for creating factories for class based actor implementations.
template <class T, class... Ts>
actor_factory make_actor_factory() {
static_assert((std::is_lvalue_reference_v<Ts> && ...),
"all Ts must be lvalue references");
static_assert(std::is_base_of_v<local_actor, T>,
"T is not derived from local_actor");
return &dyn_spawn_class<T, Ts...>;
return &detail::dyn_spawn_class<T, Ts...>;
}

} // namespace caf
24 changes: 21 additions & 3 deletions libcaf_core/caf/actor_system_config.hpp
Expand Up @@ -98,14 +98,32 @@ class CAF_CORE_EXPORT actor_system_config {
return add_actor_factory(std::move(name), make_actor_factory<T, Ts...>());
}

/// Allows other nodes to spawn actors implemented as an `actor_from_state<T>`
/// dynamically by using `name` as identifier.
/// @param t conveys the state constructor signature as a type list.
/// @param ts type lists conveying alternative constructor signatures.
/// @experimental
template <class F, class T, class... Ts>
actor_system_config& add_actor_type(std::string name, F f, T t, Ts... ts) {
return add_actor_factory(std::move(name), make_actor_factory(f, t, ts...));
}

/// Allows other nodes to spawn actors implemented by function `f`
/// dynamically by using `name` as identifier.
/// @experimental
template <class F>
actor_system_config& add_actor_type(std::string name, F f) {
using handle = infer_handle_from_fun_t<F>;
static_assert(detail::is_complete<type_id<handle>>);
return add_actor_factory(std::move(name), make_actor_factory(std::move(f)));
if constexpr (detail::has_handle_type_alias_v<F>) {
// F represents an actor_from_state spawnable wrapper.
return add_actor_factory(std::move(name),
make_actor_factory(f, type_list<>{}));
} else {
// F represents a function based actor callable
using handle = infer_handle_from_fun_t<F>;
static_assert(detail::is_complete<type_id<handle>>);
return add_actor_factory(std::move(name),
make_actor_factory(std::move(f)));
}
}

/// Loads module `T`.
Expand Down
2 changes: 2 additions & 0 deletions libcaf_core/caf/detail/type_traits.hpp
Expand Up @@ -493,6 +493,8 @@ CAF_HAS_ALIAS_TRAIT(key_type);

CAF_HAS_ALIAS_TRAIT(mapped_type);

CAF_HAS_ALIAS_TRAIT(handle_type);

// -- constexpr functions for use in enable_if & friends -----------------------

/// Checks whether T behaves like `std::map`.
Expand Down
3 changes: 3 additions & 0 deletions libcaf_core/caf/type_list.hpp
Expand Up @@ -14,4 +14,7 @@ struct type_list {
}
};

template <class... Ts>
constexpr auto type_list_v = type_list<Ts...>{};

} // namespace caf

0 comments on commit 84a42f0

Please sign in to comment.