Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: actor-framework/actor-framework
...
head fork: actor-framework/actor-framework
Checking mergeability… Don't worry, you can still create the pull request.
  • 15 commits
  • 27 files changed
  • 0 commit comments
  • 1 contributor
Showing with 608 additions and 568 deletions.
  1. +33 −33 cppa.files
  2. +7 −4 cppa/cppa.hpp
  3. +21 −0 cppa/guard_expr.hpp
  4. +7 −0 cppa/local_actor.hpp
  5. +7 −0 cppa/option.hpp
  6. +5 −5 cppa/partial_function.hpp
  7. +6 −0 cppa/self.hpp
  8. +5 −0 cppa/util/callable_trait.hpp
  9. +3 −3 examples/CMakeLists.txt
  10. +7 −11 examples/hello_world.cpp
  11. +23 −29 examples/message_passing/{math_actor.cpp → calculator.cpp}
  12. +33 −15 examples/message_passing/dancing_kirby.cpp
  13. +14 −16 examples/{event_based_api → message_passing}/dining_philosophers.cpp
  14. +223 −0 examples/remote_actors/distributed_calculator.cpp
  15. +0 −269 examples/remote_actors/distributed_math_actor.cpp
  16. +28 −42 examples/remote_actors/group_chat.cpp
  17. +2 −1  examples/remote_actors/group_server.cpp
  18. +40 −31 examples/type_system/announce_1.cpp
  19. +15 −12 examples/type_system/announce_2.cpp
  20. +15 −11 examples/type_system/announce_3.cpp
  21. +28 −20 examples/type_system/announce_4.cpp
  22. +58 −46 examples/type_system/announce_5.cpp
  23. +2 −2 src/scheduled_actor.cpp
  24. +16 −8 src/self.cpp
  25. +1 −1  unit_testing/ping_pong.cpp
  26. +7 −7 unit_testing/test_spawn.cpp
  27. +2 −2 unit_testing/test_sync_send.cpp
View
66 cppa.files
@@ -47,14 +47,12 @@ cppa/detail/pseudo_tuple.hpp
cppa/detail/ptype_to_type.hpp
cppa/detail/receive_loop_helper.hpp
cppa/detail/receive_policy.hpp
-cppa/mailbox_element.hpp
-cppa/mailbox_based.hpp
cppa/detail/scheduled_actor_dummy.hpp
cppa/detail/serialize_tuple.hpp
cppa/detail/singleton_manager.hpp
cppa/detail/singleton_mixin.hpp
-cppa/stacked.hpp
cppa/detail/swap_bytes.hpp
+cppa/detail/sync_request_bouncer.hpp
cppa/detail/tdata.hpp
cppa/detail/thread_pool_scheduler.hpp
cppa/detail/to_uniform_name.hpp
@@ -72,6 +70,7 @@ cppa/enable_weak_ptr.hpp
cppa/event_based_actor.hpp
cppa/exception.hpp
cppa/exit_reason.hpp
+cppa/extend.hpp
cppa/factory.hpp
cppa/from_string.hpp
cppa/get.hpp
@@ -82,8 +81,11 @@ cppa/intrusive/single_reader_queue.hpp
cppa/intrusive_ptr.hpp
cppa/local_actor.hpp
cppa/logging.hpp
+cppa/mailbox_based.hpp
+cppa/mailbox_element.hpp
cppa/match.hpp
cppa/match_expr.hpp
+cppa/memory_cached.hpp
cppa/memory_managed.hpp
cppa/message_future.hpp
cppa/message_id.hpp
@@ -107,6 +109,13 @@ cppa/network/output_stream.hpp
cppa/network/protocol.hpp
cppa/object.hpp
cppa/on.hpp
+cppa/opencl.hpp
+cppa/opencl/actor_facade.hpp
+cppa/opencl/command.hpp
+cppa/opencl/command_dispatcher.hpp
+cppa/opencl/global.hpp
+cppa/opencl/program.hpp
+cppa/opencl/smart_ptr.hpp
cppa/opt.hpp
cppa/option.hpp
cppa/partial_function.hpp
@@ -122,7 +131,11 @@ cppa/scheduled_actor.hpp
cppa/scheduler.hpp
cppa/self.hpp
cppa/serializer.hpp
+cppa/singletons.hpp
+cppa/spawn_options.hpp
+cppa/stacked.hpp
cppa/thread_mapped_actor.hpp
+cppa/threaded.hpp
cppa/timeout_definition.hpp
cppa/to_string.hpp
cppa/tpartial_function.hpp
@@ -139,13 +152,16 @@ cppa/util/compare_tuples.hpp
cppa/util/conjunction.hpp
cppa/util/deduce_ref_type.hpp
cppa/util/disjunction.hpp
+cppa/util/dptr.hpp
cppa/util/duration.hpp
cppa/util/element_at.hpp
cppa/util/fiber.hpp
cppa/util/fixed_vector.hpp
+cppa/util/get_result_type.hpp
cppa/util/int_list.hpp
cppa/util/is_array_of.hpp
cppa/util/is_builtin.hpp
+cppa/util/is_callable.hpp
cppa/util/is_comparable.hpp
cppa/util/is_forward_iterator.hpp
cppa/util/is_iterable.hpp
@@ -158,6 +174,7 @@ cppa/util/producer_consumer_list.hpp
cppa/util/pt_dispatch.hpp
cppa/util/pt_token.hpp
cppa/util/purge_refs.hpp
+cppa/util/rebindable_reference.hpp
cppa/util/replace_type.hpp
cppa/util/ripemd_160.hpp
cppa/util/rm_option.hpp
@@ -174,15 +191,15 @@ cppa/util/wrapped.hpp
cppa/weak_intrusive_ptr.hpp
cppa/weak_ptr_anchor.hpp
cppa/wildcard_position.hpp
-examples/event_based_api/dining_philosophers.cpp
examples/hello_world.cpp
+examples/message_passing/calculator.cpp
examples/message_passing/dancing_kirby.cpp
-examples/message_passing/math_actor.cpp
+examples/message_passing/dining_philosophers.cpp
examples/qtsupport/chatwidget.cpp
examples/qtsupport/chatwidget.hpp
examples/qtsupport/chatwindow.ui
examples/qtsupport/qt_group_chat.cpp
-examples/remote_actors/distributed_math_actor.cpp
+examples/remote_actors/distributed_calculator.cpp
examples/remote_actors/group_chat.cpp
examples/remote_actors/group_server.cpp
examples/type_system/announce_1.cpp
@@ -219,6 +236,7 @@ src/duration.cpp
src/empty_tuple.cpp
src/event_based_actor.cpp
src/exception.cpp
+src/exit_reason.cpp
src/factory.cpp
src/fd_util.cpp
src/fiber.cpp
@@ -228,6 +246,7 @@ src/ipv4_acceptor.cpp
src/ipv4_io_stream.cpp
src/local_actor.cpp
src/logging.cpp
+src/mailbox_element.cpp
src/match.cpp
src/memory.cpp
src/memory_managed.cpp
@@ -235,12 +254,16 @@ src/message_header.cpp
src/middleman.cpp
src/object.cpp
src/object_array.cpp
+src/on.cpp
+src/opencl/actor_facade.cpp
+src/opencl/command_dispatcher.cpp
+src/opencl/global.cpp
+src/opencl/program.cpp
src/opt.cpp
src/partial_function.cpp
src/primitive_variant.cpp
src/process_information.cpp
src/protocol.cpp
-src/mailbox_element.cpp
src/ref_counted.cpp
src/response_handle.cpp
src/ripemd_160.cpp
@@ -261,6 +284,7 @@ src/weak_ptr_anchor.cpp
src/yield_interface.cpp
unit_testing/ping_pong.cpp
unit_testing/ping_pong.hpp
+unit_testing/test.cpp
unit_testing/test.hpp
unit_testing/test_atom.cpp
unit_testing/test_fixed_vector.cpp
@@ -268,6 +292,8 @@ unit_testing/test_intrusive_containers.cpp
unit_testing/test_intrusive_ptr.cpp
unit_testing/test_local_group.cpp
unit_testing/test_match.cpp
+unit_testing/test_metaprogramming.cpp
+unit_testing/test_opencl.cpp
unit_testing/test_primitive_variant.cpp
unit_testing/test_remote_actor.cpp
unit_testing/test_ripemd_160.cpp
@@ -275,31 +301,5 @@ unit_testing/test_serialization.cpp
unit_testing/test_spawn.cpp
unit_testing/test_sync_send.cpp
unit_testing/test_tuple.cpp
-unit_testing/test_metaprogramming.cpp
unit_testing/test_uniform_type.cpp
unit_testing/test_yield_interface.cpp
-unit_testing/test.cpp
-cppa/opencl/program.hpp
-cppa/opencl/actor_facade.hpp
-cppa/opencl/command.hpp
-cppa/opencl/command_dispatcher.hpp
-src/opencl/command_dispatcher.cpp
-unit_testing/test_opencl.cpp
-cppa/opencl/global.hpp
-src/opencl/global.cpp
-src/opencl/program.cpp
-src/opencl/actor_facade.cpp
-cppa/opencl/smart_ptr.hpp
-src/exit_reason.cpp
-src/on.cpp
-cppa/spawn_options.hpp
-cppa/extend.hpp
-cppa/detail/sync_request_bouncer.hpp
-cppa/memory_cached.hpp
-cppa/singletons.hpp
-cppa/threaded.hpp
-cppa/util/dptr.hpp
-cppa/util/is_callable.hpp
-cppa/util/get_result_type.hpp
-cppa/util/rebindable_reference.hpp
-cppa/opencl.hpp
View
11 cppa/cppa.hpp
@@ -856,12 +856,15 @@ actor_ptr remote_actor(network::io_stream_ptr_pair connection);
void shutdown(); // note: implemented in singleton_manager.cpp
/**
- * @brief Causes @p whom to quit with @p reason.
- * @note Does nothing if <tt>reason == exit_reason::normal</tt>.
+ * @brief Sends an exit message to @p whom with @p reason.
+ *
+ * This function is syntactic sugar for
+ * <tt>send(whom, atom("EXIT"), reason)</tt>.
+ * @pre <tt>reason != exit_reason::normal</tt>
*/
-inline void quit_actor(const actor_ptr& whom, std::uint32_t reason) {
+inline void send_exit(const actor_ptr& whom, std::uint32_t reason) {
CPPA_REQUIRE(reason != exit_reason::normal);
- send(whom.get(), atom("EXIT"), reason);
+ send(whom, atom("EXIT"), reason);
}
/**
View
21 cppa/guard_expr.hpp
@@ -338,6 +338,14 @@ struct guard_placeholder {
};
+template<typename T>
+struct ge_value {
+ T value;
+};
+
+template<typename T>
+ge_value<T> gval(T val) { return {std::move(val)}; }
+
// result type computation
template<typename T, class Tuple>
@@ -352,6 +360,9 @@ struct ge_unbound<std::reference_wrapper<T>, Tuple> { typedef T type; };
template<typename T, class Tuple>
struct ge_unbound<std::reference_wrapper<const T>, Tuple> { typedef T type; };
+template<typename T, class Tuple>
+struct ge_unbound<ge_value<T>, Tuple> { typedef T type; };
+
// unbound type of placeholder
template<int X, typename... Ts>
struct ge_unbound<guard_placeholder<X>, detail::tdata<Ts...> > {
@@ -386,6 +397,11 @@ struct is_ge_type<guard_expr<OP, First, Second> > {
static constexpr bool value = true;
};
+template<typename T>
+struct is_ge_type<ge_value<T>> {
+ static constexpr bool value = true;
+};
+
template<operator_id OP, typename T1, typename T2>
guard_expr<OP, typename detail::strip_and_convert<T1>::type,
typename detail::strip_and_convert<T2>::type>
@@ -516,6 +532,11 @@ inline const T& ge_resolve(const Tuple&, const util::rebindable_reference<const
return value.get();
}
+template<class Tuple, typename T>
+inline const T& ge_resolve(const Tuple&, const ge_value<T>& wrapped_value) {
+ return wrapped_value.value;
+}
+
template<class Tuple, int X>
inline auto ge_resolve(const Tuple& tup, guard_placeholder<X>)
-> decltype(get<X>(tup).get()) {
View
7 cppa/local_actor.hpp
@@ -222,6 +222,13 @@ class local_actor : public extend<actor>::with<memory_cached> {
}
/**
+ * @brief Checks wheter this actor has a user-defined sync failure handler.
+ */
+ inline bool has_sync_failure_handler() {
+ return static_cast<bool>(m_sync_failure_handler);
+ }
+
+ /**
* @brief Calls <tt>on_sync_timeout(fun); on_sync_failure(fun);</tt>.
*/
inline void on_sync_timeout_or_failure(std::function<void()> fun) {
View
7 cppa/option.hpp
@@ -62,6 +62,13 @@ class option {
*/
option(T value) : m_valid(false) { cr(std::move(value)); }
+ template<typename T0, typename T1, typename... Ts>
+ option(T0&& arg0, T1&& arg1, Ts&&... args) {
+ cr(T(std::forward<T0>(arg0),
+ std::forward<T1>(arg1),
+ std::forward<Ts>(args)...));
+ }
+
option(const option& other) : m_valid(false) {
if (other.m_valid) cr(other.m_value);
}
View
10 cppa/partial_function.hpp
@@ -79,8 +79,8 @@ class partial_function {
partial_function& operator=(partial_function&&) = default;
partial_function& operator=(const partial_function&) = default;
- template<typename... Cs>
- partial_function(const match_expr<Cs...>& mexpr);
+ template<typename... Cs, typename... Ts>
+ partial_function(const match_expr<Cs...>& mexpr, const Ts&... args);
/**
* @brief Returns @p true if this partial function is defined for the
@@ -141,9 +141,9 @@ match_expr_convert(const T0& arg0, const T1& arg1, const Ts&... args) {
* inline and template member function implementations *
******************************************************************************/
-template<typename... Cs>
-partial_function::partial_function(const match_expr<Cs...>& mexpr)
-: m_impl(mexpr.as_behavior_impl()) { }
+template<typename... Cs, typename... Ts>
+partial_function::partial_function(const match_expr<Cs...>& arg, const Ts&... args)
+: m_impl(detail::match_expr_concat(arg, args...)) { }
inline bool partial_function::defined_at(const any_tuple& value) {
return (m_impl) && m_impl->defined_at(value);
View
6 cppa/self.hpp
@@ -146,6 +146,12 @@ class scoped_self_setter {
};
+// disambiguation (compiler gets confused with cast operator otherwise)
+bool operator==(const actor_ptr& lhs, const self_type& rhs);
+bool operator==(const self_type& lhs, const actor_ptr& rhs);
+bool operator!=(const actor_ptr& lhs, const self_type& rhs);
+bool operator!=(const self_type& lhs, const actor_ptr& rhs);
+
#endif // CPPA_DOCUMENTATION
} // namespace cppa
View
5 cppa/util/callable_trait.hpp
@@ -31,6 +31,7 @@
#ifndef CPPA_UTIL_CALLABLE_TRAIT
#define CPPA_UTIL_CALLABLE_TRAIT
+#include <functional>
#include <type_traits>
#include "cppa/util/rm_ref.hpp"
@@ -46,6 +47,7 @@ template<class C, typename Result, typename... Ts>
struct callable_trait<Result (C::*)(Ts...) const> {
typedef Result result_type;
typedef type_list<Ts...> arg_types;
+ typedef std::function<Result (Ts...)> fun_type;
};
// member function pointer
@@ -53,6 +55,7 @@ template<class C, typename Result, typename... Ts>
struct callable_trait<Result (C::*)(Ts...)> {
typedef Result result_type;
typedef type_list<Ts...> arg_types;
+ typedef std::function<Result (Ts...)> fun_type;
};
// good ol' function
@@ -60,6 +63,7 @@ template<typename Result, typename... Ts>
struct callable_trait<Result (Ts...)> {
typedef Result result_type;
typedef type_list<Ts...> arg_types;
+ typedef std::function<Result (Ts...)> fun_type;
};
// good ol' function pointer
@@ -67,6 +71,7 @@ template<typename Result, typename... Ts>
struct callable_trait<Result (*)(Ts...)> {
typedef Result result_type;
typedef type_list<Ts...> arg_types;
+ typedef std::function<Result (Ts...)> fun_type;
};
// matches (IsFun || IsMemberFun)
View
6 examples/CMakeLists.txt
@@ -15,10 +15,10 @@ add(announce_3 type_system)
add(announce_4 type_system)
add(announce_5 type_system)
add(dancing_kirby message_passing)
-add(dining_philosophers event_based_api)
+add(dining_philosophers message_passing)
add(hello_world .)
-add(math_actor message_passing)
-add(distributed_math_actor remote_actors)
+add(calculator message_passing)
+add(distributed_calculator remote_actors)
add(group_server remote_actors)
add(group_chat remote_actors)
View
18 examples/hello_world.cpp
@@ -9,26 +9,23 @@ void mirror() {
become (
// invoke this lambda expression if we receive a string
on_arg_match >> [](const std::string& what) {
- // prints "Hello World!"
- std::cout << what << std::endl;
+ // prints "Hello World!" via aout (thread-safe cout wrapper)
+ aout << what << std::endl;
// replies "!dlroW olleH"
reply(std::string(what.rbegin(), what.rend()));
- // terminates this actor
+ // terminates this actor (become otherwise 'loops' forever)
self->quit();
}
);
}
void hello_world(const actor_ptr& buddy) {
- // send "Hello World!" to the mirror
- send(buddy, "Hello World!");
- // wait for messages
- become (
+ // send "Hello World!" to our buddy ...
+ sync_send(buddy, "Hello World!").then(
+ // ... and wait for a response
on_arg_match >> [](const std::string& what) {
// prints "!dlroW olleH"
- std::cout << what << std::endl;
- // terminate this actor
- self->quit();
+ aout << what << std::endl;
}
);
}
@@ -42,5 +39,4 @@ int main() {
await_all_others_done();
// run cleanup code before exiting main
shutdown();
- return 0;
}
View
52 examples/message_passing/math_actor.cpp → examples/message_passing/calculator.cpp
@@ -34,8 +34,7 @@ void blocking_math_fun() {
).until(gref(done));
}
-// implementation using the event-based API
-void math_fun() {
+void calculator() {
// execute this behavior until actor terminates
become (
on(atom("plus"), arg_match) >> [](int a, int b) {
@@ -44,43 +43,38 @@ void math_fun() {
on(atom("minus"), arg_match) >> [](int a, int b) {
reply(atom("result"), a - b);
},
- // the [=] capture copies the 'this' pointer into the lambda
- // thus, it has access to all members and member functions
- on(atom("quit")) >> [=] {
+ on(atom("quit")) >> [] {
// terminate actor with normal exit reason
self->quit();
}
);
}
-// utility function
-int fetch_result(actor_ptr& calculator, atom_value operation, int a, int b) {
- // send request
- send(calculator, operation, a, b);
- // wait for result
- int result;
- receive(on<atom("result"), int>() >> [&](int r) { result = r; });
- // print and return result
- cout << a << " " << to_string(operation) << " " << b << " = " << result << endl;
- return result;
+void tester(const actor_ptr& testee) {
+ self->link_to(testee);
+ // will be invoked if we receive an unexpected response message
+ self->on_sync_failure([] {
+ aout << "AUT (actor under test) failed" << endl;
+ self->quit(exit_reason::user_defined);
+ });
+ // first test: 2 + 1 = 3
+ sync_send(testee, atom("plus"), 2, 1).then(
+ on(atom("result"), 3) >> [=] {
+ // second test: 2 - 1 = 1
+ sync_send(testee, atom("minus"), 2, 1).then(
+ on(atom("result"), 1) >> [=] {
+ // both tests succeeded
+ aout << "AUT (actor under test) seems to be ok" << endl;
+ send(testee, atom("quit"));
+ }
+ );
+ }
+ );
}
int main() {
- // spawn a context-switching actor that invokes math_fun
- auto a1 = spawn<blocking_api>(blocking_math_fun);
- // spawn an event-based math actor
- auto a2 = spawn(math_fun);
- // do some testing on both implementations
- assert((fetch_result(a1, atom("plus"), 1, 2) == 3));
- assert((fetch_result(a2, atom("plus"), 1, 2) == 3));
- assert((fetch_result(a1, atom("minus"), 2, 1) == 1));
- assert((fetch_result(a2, atom("minus"), 2, 1) == 1));
- // tell both actors to terminate
- send(a1, atom("quit"));
- send(a2, atom("quit"));
- // wait until all spawned actors are terminated
+ spawn(tester, spawn(calculator));
await_all_others_done();
- // done
shutdown();
return 0;
}
View
48 examples/message_passing/dancing_kirby.cpp
@@ -9,6 +9,8 @@
using std::cout;
using std::endl;
+using std::pair;
+
using namespace cppa;
// ASCII art figures
@@ -18,44 +20,60 @@ constexpr const char* figures[] = {
"(>^.^)>"
};
+struct animation_step { size_t figure_idx; size_t offset; };
+
// array of {figure, offset} pairs
-constexpr size_t animation_steps[][2] = {
+constexpr animation_step animation_steps[] = {
{1, 7}, {0, 7}, {0, 6}, {0, 5}, {1, 5}, {2, 5}, {2, 6},
{2, 7}, {2, 8}, {2, 9}, {2, 10}, {1, 10}, {0, 10}, {0, 9},
{1, 9}, {2, 10}, {2, 11}, {2, 12}, {2, 13}, {1, 13}, {0, 13},
{0, 12}, {0, 11}, {0, 10}, {0, 9}, {0, 8}, {0, 7}, {1, 7}
};
+template<typename T, size_t S>
+constexpr size_t array_size(const T (&) [S]) {
+ return S;
+}
+
constexpr size_t animation_width = 20;
-// "draws" an animation step: {offset_whitespaces}{figure}{padding}
-void draw_kirby(size_t const (&animation)[2]) {
+// "draws" an animation step by printing "{offset_whitespaces}{figure}{padding}"
+void draw_kirby(const animation_step& animation) {
cout.width(animation_width);
+ // override last figure
cout << '\r';
- std::fill_n(std::ostream_iterator<char>{cout}, animation[1], ' ');
- cout << figures[animation[0]];
+ // print offset
+ std::fill_n(std::ostream_iterator<char>{cout}, animation.offset, ' ');
+ // print figure
+ cout << figures[animation.figure_idx];
+ // print padding
cout.fill(' ');
+ // make sure figure is printed
cout.flush();
}
+// uses a message-based loop to iterate over all animation steps
void dancing_kirby() {
// let's get it started
- send(self, atom("Step"));
- // iterate over animation_steps
- auto i = std::begin(animation_steps);
- receive_for(i, std::end(animation_steps)) (
- on<atom("Step")>() >> [&]() {
- draw_kirby(*i);
+ send(self, atom("Step"), size_t{0});
+ become (
+ on(atom("Step"), array_size(animation_steps)) >> [] {
+ // we've printed all animation steps (done)
+ cout << endl;
+ self->quit();
+ },
+ on(atom("Step"), arg_match) >> [](size_t step) {
+ // print given step
+ draw_kirby(animation_steps[step]);
// animate next step in 150ms
- delayed_send(self, std::chrono::milliseconds(150), atom("Step"));
+ delayed_send(self, std::chrono::milliseconds(150), atom("Step"), step + 1);
}
);
}
int main() {
- cout << endl;
- dancing_kirby();
- cout << endl;
+ spawn(dancing_kirby);
+ await_all_others_done();
shutdown();
return 0;
}
View
30 examples/event_based_api/dining_philosophers.cpp → examples/message_passing/dining_philosophers.cpp
@@ -24,7 +24,8 @@ void chopstick() {
send(philos, atom("taken"), self);
// await 'put' message and reject other 'take' messages
become(
- keep_behavior, // "enables" unbecome()
+ // allows us to return to the previous behavior
+ keep_behavior,
on(atom("take"), arg_match) >> [=](const actor_ptr& other) {
send(other, atom("busy"), self);
},
@@ -70,7 +71,7 @@ void chopstick() {
* [ X = right => Y = left ]
*/
-struct philosopher : sb_actor<philosopher> {
+struct philosopher : event_based_actor {
std::string name; // the name of this philosopher
actor_ptr left; // left chopstick
@@ -82,7 +83,6 @@ struct philosopher : sb_actor<philosopher> {
behavior hungry;
behavior denied;
behavior eating;
- behavior init_state;
// wait for second chopstick
behavior waiting_for(const actor_ptr& what) {
@@ -107,7 +107,7 @@ struct philosopher : sb_actor<philosopher> {
}
philosopher(const std::string& n, const actor_ptr& l, const actor_ptr& r)
- : name(n), left(l), right(r) {
+ : name(n), left(l), right(r) {
// a philosopher that receives {eat} stops thinking and becomes hungry
thinking = (
on(atom("eat")) >> [=] {
@@ -151,14 +151,19 @@ struct philosopher : sb_actor<philosopher> {
become(thinking);
}
);
+ }
+
+ void init() {
// philosophers start to think after receiving {think}
- init_state = (
+ become (
on(atom("think")) >> [=] {
aout << name << " starts to think\n";
delayed_send(this, seconds(5), atom("eat"));
become(thinking);
}
);
+ // start thinking
+ send(this, atom("think"));
}
};
@@ -172,19 +177,12 @@ int main(int, char**) {
aout << " " << chopsticks.back()->id();
}
aout << endl;
- // a group to address all philosophers
- auto dinner_club = group::anonymous();
- // spawn five philosopher, each joining the Dinner Club
- std::vector<std::string> names = { "Plato", "Hume", "Kant",
- "Nietzsche", "Descartes" };
+ // spawn five philosophers
+ std::vector<std::string> names { "Plato", "Hume", "Kant",
+ "Nietzsche", "Descartes" };
for (size_t i = 0; i < 5; ++i) {
- spawn_in_group<philosopher>(dinner_club,
- names[i],
- chopsticks[i],
- chopsticks[(i+1) % chopsticks.size()]);
+ spawn<philosopher>(names[i], chopsticks[i], chopsticks[(i+1)%5]);
}
- // tell all philosophers to start thinking
- send(dinner_club, atom("think"));
// real philosophers are never done
await_all_others_done();
shutdown();
View
223 examples/remote_actors/distributed_calculator.cpp
@@ -0,0 +1,223 @@
+/******************************************************************************\
+ * This program is a distributed version of the math_actor example. *
+ * Client and server use a stateless request/response protocol and the client *
+ * is failure resilient by using a FIFO request queue. *
+ * The client auto-reconnects and also allows for server reconfiguration. *
+ * *
+ * Run server at port 4242: *
+ * - ./build/bin/distributed_math_actor -s -p 4242 *
+ * *
+ * Run client at the same host: *
+ * - ./build/bin/distributed_math_actor -c -p 4242 *
+\******************************************************************************/
+
+#include <vector>
+#include <string>
+#include <sstream>
+#include <cassert>
+#include <iostream>
+#include <functional>
+
+#include "cppa/opt.hpp"
+#include "cppa/cppa.hpp"
+
+using namespace std;
+using namespace cppa;
+using namespace cppa::placeholders;
+
+// our "service"
+void calculator() {
+ become (
+ on(atom("plus"), arg_match) >> [](int a, int b) {
+ reply(atom("result"), a + b);
+ },
+ on(atom("minus"), arg_match) >> [](int a, int b) {
+ reply(atom("result"), a - b);
+ },
+ on(atom("quit")) >> [=]() {
+ self->quit();
+ }
+ );
+}
+
+inline string trim(std::string s) {
+ auto not_space = [](char c) { return !isspace(c); };
+ // trim left
+ s.erase(s.begin(), find_if(s.begin(), s.end(), not_space));
+ // trim right
+ s.erase(find_if(s.rbegin(), s.rend(), not_space).base(), s.end());
+ return std::move(s);
+}
+
+void client_bhvr(const string& host, uint16_t port, const actor_ptr& server) {
+ // recover from sync failures by trying to reconnect to server
+ if (!self->has_sync_failure_handler()) {
+ self->on_sync_failure([=] {
+ aout << "*** lost connection to " << host << ":" << port << endl;
+ client_bhvr(host, port, nullptr);
+ });
+ }
+ // connect to server if needed
+ if (!server) {
+ aout << "*** try to connect to " << host << ":" << port << endl;
+ try {
+ auto new_serv = remote_actor(host, port);
+ self->monitor(new_serv);
+ aout << "reconnection succeeded" << endl;
+ client_bhvr(host, port, new_serv);
+ return;
+ }
+ catch (exception&) {
+ aout << "connection failed, try again in 3s" << endl;
+ delayed_send(self, chrono::seconds(3), atom("reconnect"));
+ }
+ }
+ become (
+ on_arg_match.when(_x1.in({atom("plus"), atom("minus")}) && gval(server) != nullptr) >> [=](atom_value op, int lhs, int rhs) {
+ sync_send_tuple(server, self->last_dequeued()).then(
+ on(atom("result"), arg_match) >> [=](int result) {
+ aout << lhs << " "
+ << to_string(op) << " "
+ << rhs << " = "
+ << result << endl;
+ }
+ );
+ },
+ on(atom("DOWN"), arg_match) >> [=](uint32_t) {
+ aout << "*** server down, try to reconnect ..." << endl;
+ client_bhvr(host, port, nullptr);
+ },
+ on(atom("rebind"), arg_match) >> [=](const string& host, uint16_t port) {
+ aout << "*** rebind to new server: " << host << ":" << port << endl;
+ client_bhvr(host, port, nullptr);
+ },
+ on(atom("reconnect")) >> [=] {
+ client_bhvr(host, port, nullptr);
+ }
+ );
+}
+
+void client_repl(const string& host, uint16_t port) {
+ typedef cow_tuple<atom_value, int, int> request;
+ // keeps track of requests and tries to reconnect on server failures
+ aout << "Usage:\n"
+ "quit Quit the program\n"
+ "<x> + <y> Calculate <x>+<y> and print result\n"
+ "<x> - <y> Calculate <x>-<y> and print result\n"
+ "connect <host> <port> Reconfigure server"
+ << endl << endl;
+ string line;
+ auto client = spawn(client_bhvr, host, port, nullptr);
+ const char connect[] = "connect ";
+ while (getline(cin, line)) {
+ line = trim(std::move(line)); // ignore leading and trailing whitespaces
+ if (line == "quit") {
+ // force client to quit
+ send_exit(client, exit_reason::user_defined);
+ return;
+ }
+ // the STL way of line.starts_with("connect")
+ else if (equal(begin(connect), end(connect) - 1, begin(line))) {
+ match_split(line, ' ') (
+ on("connect", arg_match) >> [&](string& host, string& sport) {
+ try {
+ auto lport = std::stoul(sport);
+ if (lport < std::numeric_limits<uint16_t>::max()) {
+ auto port = static_cast<uint16_t>(lport);
+ send(client, atom("rebind"), move(host), port);
+ }
+ else {
+ aout << lport << " is not a valid port" << endl;
+ }
+ }
+ catch (std::exception& e) {
+ aout << "\"" << sport << "\" is not an unsigned integer"
+ << endl;
+ }
+ },
+ others() >> [] {
+ aout << "*** usage: connect <host> <port>" << endl;
+ }
+ );
+ }
+ else {
+ auto toint = [](const string& str) -> option<int> {
+ try { return {std::stoi(str)}; }
+ catch (std::exception&) {
+ aout << "\"" << str << "\" is not an integer" << endl;
+ return {};
+ }
+ };
+
+ bool success = false;
+ auto first = begin(line);
+ auto last = end(line);
+ auto pos = find_if(first, last, [](char c) { return c == '+' || c == '-'; });
+ if (pos != last) {
+ auto lsub = trim(string(first, pos));
+ auto rsub = trim(string(pos + 1, last));
+ auto lhs = toint(lsub);
+ auto rhs = toint(rsub);
+ if (lhs && rhs) {
+ auto op = (*pos == '+') ? atom("plus") : atom("minus");
+ send(client, op, *lhs, *rhs);
+ }
+ }
+ else if (!success) {
+ aout << "*** invalid format; usage: <x> [+|-] <y>" << endl;
+ }
+ }
+ }
+}
+
+int main(int argc, char** argv) {
+ string mode;
+ string host;
+ uint16_t port = 0;
+ options_description desc;
+ auto set_mode = [&](const string& arg) -> function<bool()> {
+ return [arg,&mode]() -> bool {
+ if (!mode.empty()) {
+ cerr << "mode already set to " << mode << endl;
+ return false;
+ }
+ mode = move(arg);
+ return true;
+ };
+ };
+ string copts = "client options";
+ string sopts = "server options";
+ bool args_valid = match_stream<string> (argv + 1, argv + argc) (
+ on_opt1('p', "port", &desc, "set port") >> rd_arg(port),
+ on_opt1('H', "host", &desc, "set host (default: localhost)", copts) >> rd_arg(host),
+ on_opt0('s', "server", &desc, "run in server mode", sopts) >> set_mode("server"),
+ on_opt0('c', "client", &desc, "run in client mode", copts) >> set_mode("client"),
+ on_opt0('h', "help", &desc, "print help") >> print_desc_and_exit(&desc)
+ );
+ if (!args_valid || port == 0 || mode.empty()) {
+ if (port == 0) cerr << "*** no port specified" << endl;
+ if (mode.empty()) cerr << "*** no mode specified" << endl;
+ cerr << endl;
+ auto description_printer = print_desc(&desc, cerr);
+ description_printer();
+ return -1;
+ }
+ if (mode == "server") {
+ try {
+ // try to publish math actor at given port
+ publish(spawn(calculator), port);
+ }
+ catch (exception& e) {
+ cerr << "*** unable to publish math actor at port " << port << "\n"
+ << to_verbose_string(e) // prints exception type and e.what()
+ << endl;
+ }
+ }
+ else {
+ if (host.empty()) host = "localhost";
+ client_repl(host, port);
+ }
+ await_all_others_done();
+ shutdown();
+ return 0;
+}
View
269 examples/remote_actors/distributed_math_actor.cpp
@@ -1,269 +0,0 @@
-/******************************************************************************\
- * This program is a distributed version of the math_actor example. *
- * Client and server use a stateless request/response protocol and the client *
- * is failure resilient by using a FIFO request queue. *
- * The client auto-reconnects and also allows for server reconfiguration. *
- * *
- * Run server at port 4242: *
- * - ./build/bin/distributed_math_actor -s -p 4242 *
- * *
- * Run client at the same host: *
- * - ./build/bin/distributed_math_actor -c -p 4242 *
-\******************************************************************************/
-
-#include <vector>
-#include <string>
-#include <sstream>
-#include <cassert>
-#include <iostream>
-#include <functional>
-
-#include "cppa/opt.hpp"
-#include "cppa/cppa.hpp"
-
-using namespace std;
-using namespace cppa;
-using namespace cppa::placeholders;
-
-// our service provider
-struct math_actor : event_based_actor {
- void init() {
- // execute this behavior until actor terminates
- become (
- on(atom("plus"), arg_match) >> [](int a, int b) {
- reply(atom("result"), a + b);
- },
- on(atom("minus"), arg_match) >> [](int a, int b) {
- reply(atom("result"), a - b);
- },
- on(atom("quit")) >> [=]() {
- quit();
- }
- );
- }
-};
-
-inline string& ltrim(string &s) {
- s.erase(s.begin(), find_if(s.begin(), s.end(), [](char c) { return !isspace(c); }));
- return s;
-}
-
-inline string& rtrim(std::string& s) {
- s.erase(find_if(s.rbegin(), s.rend(), [](char c) { return !isspace(c); }).base(), s.end());
- return s;
-}
-
-inline string& trim(std::string& s) {
- return ltrim(rtrim(s));
-}
-
-option<int> toint(const string& str) {
- if (str.empty()) return {};
- char* endptr = nullptr;
- int result = static_cast<int>(strtol(str.c_str(), &endptr, 10));
- if (endptr != nullptr && *endptr == '\0') {
- return result;
- }
- return {};
-}
-
-template<typename T>
-struct project_helper;
-
-template<>
-struct project_helper<string> {
- template<typename T>
- inline option<T> convert(const string& from, typename enable_if<is_integral<T>::value>::type* = 0) {
- char* endptr = nullptr;
- auto result = static_cast<T>(strtol(from.c_str(), &endptr, 10));
- if (endptr != nullptr && *endptr == '\0') return result;
- return {};
- }
-};
-
-template<typename From, typename To>
-option<To> projection(const From& from) {
- project_helper<From> f;
- return f.template convert<To>(from);
-}
-
-void client_repl(actor_ptr server, string host, uint16_t port) {
- typedef cow_tuple<atom_value, int, int> request;
- // keeps track of requests and tries to reconnect on server failures
- auto client = factory::event_based([=](actor_ptr* serv, vector<request>* q) {
- self->monitor(*serv);
- auto send_next_request = [=] {
- if (!q->empty()) {
- *serv << q->front();
- }
- };
- self->become (
- on<atom_value, int, int>().when(_x1.in({atom("plus"), atom("minus")})) >> [=] {
- if (q->empty()) {
- *serv << self->last_dequeued();
- }
- q->push_back(*tuple_cast<atom_value, int, int>(self->last_dequeued()));
- },
- on(atom("result"), arg_match) >> [=](int result) {
- if (q->empty()) {
- aout << "received a result, but didn't send a request\n";
- return;
- }
- ostringstream oss;
- auto& r = q->front();
- aout << get<1>(r) << " "
- << to_string(get<0>(r)) << " "
- << get<2>(r)
- << " = " << result
- << endl;
- q->erase(q->begin());
- send_next_request();
- },
- on(atom("DOWN"), arg_match) >> [=](uint32_t reason) {
- if (*serv == self->last_sender()) {
- serv->reset(); // sets *serv = nullptr
- ostringstream oss;
- aout << "*** server exited with reason = " << reason
- << ", try to reconnect"
- << endl;
- send(self, atom("reconnect"));
- }
- },
- on(atom("reconnect")) >> [=] {
- if (*serv != nullptr) return;
- try {
- *serv = remote_actor(host, port);
- self->monitor(*serv);
- aout << "reconnection succeeded" << endl;
- send_next_request();
- }
- catch (exception&) {
- delayed_send(self, chrono::seconds(3), atom("reconnect"));
- }
- },
- on(atom("rebind"), arg_match) >> [=](string& host, uint16_t port) {
- actor_ptr new_serv;
- try {
- new_serv = remote_actor(move(host), port);
- self->monitor(new_serv);
- aout << "rebind succeeded" << endl;
- *serv = new_serv;
- send_next_request();
- }
- catch (exception& e) {
- aout << "*** rebind failed: "
- << to_verbose_string(e) << endl;
- }
- },
- on(atom("quit")) >> [=] {
- self->quit();
- },
- others() >> [] {
- aout << "unexpected message: " << self->last_dequeued() << endl;
- }
- );
- }).spawn(server);
- aout << "quit Quit the program\n"
- "<x> + <y> Calculate <x>+<y> and print result\n"
- "<x> - <y> Calculate <x>-<y> and print result\n"
- "connect <host> <port> Reconfigure server"
- << endl;
- string line;
- const char connect[] = "connect ";
- while (getline(cin, line)) {
- trim(line);
- if (line == "quit") {
- send(client, atom("quit"));
- return;
- }
- // the STL way of line.starts_with("connect")
- else if (equal(begin(connect), end(connect) - 1, begin(line))) {
- match_split(line, ' ') (
- on("connect", val<string>, projection<string,uint16_t>) >> [&](string& host, uint16_t port) {
- send(client, atom("rebind"), move(host), port);
- },
- others() >> [] {
- aout << "illegal host/port definition" << endl;
- }
- );
- }
- else {
- bool success = false;
- auto pos = find_if(begin(line), end(line), [](char c) { return c == '+' || c == '-'; });
- if (pos != end(line)) {
- string lsub(begin(line), pos);
- string rsub(pos + 1, end(line));
- auto lhs = toint(trim(lsub));
- auto rhs = toint(trim(rsub));
- if (lhs && rhs) {
- auto op = (*pos == '+') ? atom("plus") : atom("minus");
- send(client, op, *lhs, *rhs);
- }
- else {
- if (!lhs) aout << "\"" + lsub + "\" is not an integer" << endl;
- if (!rhs) aout << "\"" + rsub + "\" is not an integer" << endl;
- }
- }
- else if (!success) aout << "*** invalid format; use: X +/- Y" << endl;
- }
- }
-}
-
-int main(int argc, char** argv) {
- string mode;
- string host;
- uint16_t port = 0;
- options_description desc;
- auto set_mode = [&](const string& arg) -> function<bool()> {
- return [arg,&mode]() -> bool {
- if (!mode.empty()) {
- cerr << "mode already set to " << mode << endl;
- return false;
- }
- mode = move(arg);
- return true;
- };
- };
- string copts = "client options";
- string sopts = "server options";
- bool args_valid = match_stream<string> (argv + 1, argv + argc) (
- on_opt1('p', "port", &desc, "set port") >> rd_arg(port),
- on_opt1('H', "host", &desc, "set host (default: localhost)", copts) >> rd_arg(host),
- on_opt0('s', "server", &desc, "run in server mode", sopts) >> set_mode("server"),
- on_opt0('c', "client", &desc, "run in client mode", copts) >> set_mode("client"),
- on_opt0('h', "help", &desc, "print help") >> print_desc_and_exit(&desc)
- );
- if (!args_valid || port == 0 || mode.empty()) {
- if (port == 0) cerr << "*** no port specified" << endl;
- if (mode.empty()) cerr << "*** no mode specified" << endl;
- cerr << endl;
- print_desc(&desc, cerr)();
- return -1;
- }
- if (mode == "server") {
- try {
- // try to publish math actor at given port
- publish(spawn<math_actor>(), port);
- }
- catch (exception& e) {
- cerr << "*** unable to publish math actor at port " << port << "\n"
- << to_verbose_string(e) // prints exception type and e.what()
- << endl;
- }
- }
- else {
- if (host.empty()) host = "localhost";
- try {
- auto server = remote_actor(host, port);
- client_repl(server, host, port);
- }
- catch (exception& e) {
- cerr << "unable to connect to remote actor at host \""
- << host << "\" on port " << port << "\n"
- << to_verbose_string(e) << endl;
- }
- }
- await_all_others_done();
- shutdown();
- return 0;
-}
View
70 examples/remote_actors/group_chat.cpp
@@ -29,7 +29,7 @@ istream& operator>>(istream& is, line& l) {
return is;
}
-string s_last_line;
+namespace { string s_last_line; }
any_tuple split_line(const line& l) {
istringstream strs(l.str);
@@ -42,46 +42,32 @@ any_tuple split_line(const line& l) {
return any_tuple::view(std::move(result));
}
-class client : public event_based_actor {
-
- public:
-
- client(string name) : m_name(move(name)) { }
-
- protected:
-
- void init() {
- become (
- on(atom("broadcast"), arg_match) >> [=](const string& message) {
- for(auto& dest : joined_groups()) {
- send(dest, m_name + ": " + message);
- }
- },
- on(atom("join"), arg_match) >> [=](const group_ptr& what) {
- for (auto g : joined_groups()) {
- cout << "*** leave " << to_string(g) << endl;
- send(g, m_name + " has left the chatroom");
- leave(g);
- }
- cout << "*** join " << to_string(what) << endl;
- join(what);
- send(what, m_name + " has entered the chatroom");
- },
- on<string>() >> [=](const string& txt) {
- // don't print own messages
- if (last_sender() != this) cout << txt << endl;
- },
- others() >> [=]() {
- cout << "unexpected: " << to_string(last_dequeued()) << endl;
+void client(const string& name) {
+ become (
+ on(atom("broadcast"), arg_match) >> [=](const string& message) {
+ for(auto& dest : self->joined_groups()) {
+ send(dest, name + ": " + message);
}
- );
- }
-
- private:
-
- string m_name;
-
-};
+ },
+ on(atom("join"), arg_match) >> [=](const group_ptr& what) {
+ for (auto g : self->joined_groups()) {
+ cout << "*** leave " << to_string(g) << endl;
+ send(g, name + " has left the chatroom");
+ self->leave(g);
+ }
+ cout << "*** join " << to_string(what) << endl;
+ self->join(what);
+ send(what, name + " has entered the chatroom");
+ },
+ on<string>() >> [=](const string& txt) {
+ // don't print own messages
+ if (self->last_sender() != self) cout << txt << endl;
+ },
+ others() >> [=]() {
+ cout << "unexpected: " << to_string(self->last_dequeued()) << endl;
+ }
+ );
+}
int main(int argc, char** argv) {
@@ -105,7 +91,7 @@ int main(int argc, char** argv) {
}
cout << "*** starting client, type '/help' for a list of commands" << endl;
- auto client_actor = spawn<client>(name);
+ auto client_actor = spawn(client, name);
// evaluate group parameters
if (!group_id.empty()) {
@@ -156,7 +142,7 @@ int main(int argc, char** argv) {
}
);
// force actor to quit
- quit_actor(client_actor, exit_reason::user_defined);
+ send_exit(client_actor, exit_reason::user_defined);
await_all_others_done();
shutdown();
return 0;
View
3  examples/remote_actors/group_server.cpp
@@ -30,7 +30,8 @@ int main(int argc, char** argv) {
}
if (!args_valid) {
// print_desc(&desc) returns a function printing the stored help text
- print_desc(&desc)();
+ auto desc_printer = print_desc(&desc);
+ desc_printer();
return 1;
}
try {
View
71 examples/type_system/announce_1.cpp
@@ -41,6 +41,38 @@ bool operator==( const foo2& lhs, const foo2& rhs ) {
return lhs.a == rhs.a && lhs.b == rhs.b;
}
+// receives `remaining` messages
+void testee(size_t remaining) {
+ auto set_next_behavior = [=] {
+ if (remaining > 1) testee(remaining - 1);
+ else self->quit();
+ };
+ become (
+ // note: we sent a foo_pair2, but match on foo_pair
+ // that's safe because both are aliases for std::pair<int,int>
+ on<foo_pair>() >> [=](const foo_pair& val) {
+ cout << "foo_pair("
+ << val.first << ","
+ << val.second << ")"
+ << endl;
+ set_next_behavior();
+ },
+ on<foo>() >> [=](const foo& val) {
+ cout << "foo({";
+ auto i = val.a.begin();
+ auto end = val.a.end();
+ if (i != end) {
+ cout << *i;
+ while (++i != end) {
+ cout << "," << *i;
+ }
+ }
+ cout << "}," << val.b << ")" << endl;
+ set_next_behavior();
+ }
+ );
+}
+
int main(int, char**) {
// announces foo to the libcppa type system;
@@ -75,41 +107,18 @@ int main(int, char**) {
// std::pair<int,int> is already announced
assert(announce<foo_pair2>(&foo_pair2::first, &foo_pair2::second) == false);
- // send a foo to ourselves
- send(self, foo{std::vector<int>{1, 2, 3, 4}, 5});
- // send a foo_pair2 to ourselves
- send(self, foo_pair2{3, 4});
- // quits the program
- send(self, atom("done"));
-
// libcppa returns the same uniform_type_info
// instance for foo_pair and foo_pair2
assert(uniform_typeid<foo_pair>() == uniform_typeid<foo_pair2>());
- // receive two messages
- int i = 0;
- receive_for(i, 2) (
- // note: we sent a foo_pair2, but match on foo_pair
- // that's safe because both are aliases for std::pair<int,int>
- on<foo_pair>() >> [](const foo_pair& val) {
- cout << "foo_pair("
- << val.first << ","
- << val.second << ")"
- << endl;
- },
- on<foo>() >> [](const foo& val) {
- cout << "foo({";
- auto i = val.a.begin();
- auto end = val.a.end();
- if (i != end) {
- cout << *i;
- while (++i != end) {
- cout << "," << *i;
- }
- }
- cout << "}," << val.b << ")" << endl;
- }
- );
+ // spawn a testee that receives two messages
+ auto t = spawn(testee, 2);
+ // send t a foo
+ send(t, foo{std::vector<int>{1, 2, 3, 4}, 5});
+ // send t a foo_pair2
+ send(t, foo_pair2{3, 4});
+
+ await_all_others_done();
shutdown();
return 0;
}
View
27 examples/type_system/announce_2.cpp
@@ -35,29 +35,32 @@ class foo {
};
-// announce requires foo to have the equal operator implemented
+// announce requires foo to be comparable
bool operator==(const foo& lhs, const foo& rhs) {
return lhs.a() == rhs.a()
&& lhs.b() == rhs.b();
}
-int main(int, char**) {
- // if a class uses getter and setter member functions,
- // we pass those to the announce function as { getter, setter } pairs.
- announce<foo>(make_pair(&foo::a, &foo::set_a),
- make_pair(&foo::b, &foo::set_b));
-
- // send a foo to ourselves ...
- send(self, foo{1,2});
- receive (
- // ... and receive it
+void testee() {
+ become (
on<foo>() >> [](const foo& val) {
- cout << "foo("
+ aout << "foo("
<< val.a() << ","
<< val.b() << ")"
<< endl;
+ self->quit();
}
);
+}
+
+int main(int, char**) {
+ // if a class uses getter and setter member functions,
+ // we pass those to the announce function as { getter, setter } pairs.
+ announce<foo>(make_pair(&foo::a, &foo::set_a),
+ make_pair(&foo::b, &foo::set_b));
+ auto t = spawn(testee);
+ send(t, foo{1,2});
+ await_all_others_done();
shutdown();
return 0;
}
View
26 examples/type_system/announce_3.cpp
@@ -46,6 +46,18 @@ typedef int (foo::*foo_getter)() const;
// a member function pointer to set an attribute of foo
typedef void (foo::*foo_setter)(int);
+void testee() {
+ become (
+ on<foo>() >> [](const foo& val) {
+ aout << "foo("
+ << val.a() << ","
+ << val.b() << ")"
+ << endl;
+ self->quit();
+ }
+ );
+}
+
int main(int, char**) {
// since the member function "a" is ambiguous, the compiler
// also needs a type to select the correct overload
@@ -65,17 +77,9 @@ int main(int, char**) {
make_pair(static_cast<foo_getter>(&foo::b),
static_cast<foo_setter>(&foo::b)));
- // send a foo to ourselves ...
- send(self, foo{1,2});
- receive (
- // ... and receive it
- on<foo>() >> [](const foo& val) {
- cout << "foo("
- << val.a() << ","
- << val.b() << ")"
- << endl;
- }
- );
+ // spawn a new testee and send it a foo
+ send(spawn(testee), foo{1,2});
+ await_all_others_done();
shutdown();
return 0;
}
View
48 examples/type_system/announce_4.cpp
@@ -78,6 +78,29 @@ bool operator==(const baz& lhs, const baz& rhs) {
&& lhs.b == rhs.b;
}
+// receives `remaining` messages
+void testee(size_t remaining) {
+ auto set_next_behavior = [=] {
+ if (remaining > 1) testee(remaining - 1);
+ else self->quit();
+ };
+ become (
+ on<bar>() >> [=](const bar& val) {
+ aout << "bar(foo("
+ << val.f.a() << ","
+ << val.f.b() << "),"
+ << val.i << ")"
+ << endl;
+ set_next_behavior();
+ },
+ on<baz>() >> [=](const baz& val) {
+ // prints: baz ( foo ( 1, 2 ), bar ( foo ( 3, 4 ), 5 ) )
+ aout << to_string(object::from(val)) << endl;
+ set_next_behavior();
+ }
+ );
+}
+
int main(int, char**) {
// bar has a non-trivial data member f, thus, we have to told
// announce how to serialize/deserialize this member;
@@ -102,26 +125,11 @@ int main(int, char**) {
make_pair(&foo::b, &foo::set_b)),
&bar::i));
- // send a bar to ourselves
- send(self, bar{foo{1,2},3});
- // send a baz to ourselves
- send(self, baz{foo{1,2},bar{foo{3,4},5}});
-
- // receive two messages
- int i = 0;
- receive_for(i, 2) (
- on<bar>() >> [](const bar& val) {
- cout << "bar(foo("
- << val.f.a() << ","
- << val.f.b() << "),"
- << val.i << ")"
- << endl;
- },
- on<baz>() >> [](const baz& val) {
- // prints: baz ( foo ( 1, 2 ), bar ( foo ( 3, 4 ), 5 ) )
- cout << to_string(object::from(val)) << endl;
- }
- );
+ // spawn a testee that receives two messages
+ auto t = spawn(testee, 2);
+ send(t, bar{foo{1,2},3});
+ send(t, baz{foo{1,2},bar{foo{3,4},5}});
+ await_all_others_done();
shutdown();
return 0;
}
View
104 examples/type_system/announce_5.cpp
@@ -39,18 +39,18 @@ struct tree_node {
void print() const {
// format is: value { children0, children1, ..., childrenN }
// e.g., 10 { 20 { 21, 22 }, 30 }
- cout << value;
+ aout << value;
if (children.empty() == false) {
- cout << " { ";
+ aout << " { ";
auto begin = children.begin();
auto end = children.end();
for (auto i = begin; i != end; ++i) {
if (i != begin) {
- cout << ", ";
+ aout << ", ";
}
i->print();
}
- cout << " } ";
+ aout << " } ";
}
}
@@ -62,9 +62,9 @@ struct tree {
// print tree to stdout
void print() const {
- cout << "tree::print: ";
+ aout << "tree::print: ";
root.print();
- cout << endl;
+ aout << endl;
}
};
@@ -138,17 +138,53 @@ class tree_type_info : public util::abstract_uniform_type_info<tree> {
};
+typedef std::vector<tree> tree_vector;
+
+// receives `remaining` messages
+void testee(size_t remaining) {
+ auto set_next_behavior = [=] {
+ if (remaining > 1) testee(remaining - 1);
+ else self->quit();
+ };
+ become (
+ on_arg_match >> [=](const tree& tmsg) {
+ // prints the tree in its serialized format:
+ // @<> ( { tree ( 0, { 10, { 11, { }, 12, { }, 13, { } }, 20, { 21, { }, 22, { } } } ) } )
+ aout << "to_string(self->last_dequeued()): "
+ << to_string(self->last_dequeued())
+ << endl;
+ // prints the tree using the print member function:
+ // 0 { 10 { 11, 12, 13 } , 20 { 21, 22 } }
+ tmsg.print();
+ set_next_behavior();
+ },
+ on_arg_match >> [=](const tree_vector& trees) {
+ // prints "received 2 trees"
+ aout << "received " << trees.size() << " trees" << endl;
+ // prints:
+ // @<> ( {
+ // std::vector<tree,std::allocator<tree>> ( {
+ // tree ( 0, { 10, { 11, { }, 12, { }, 13, { } }, 20, { 21, { }, 22, { } } } ),
+ // tree ( 0, { 10, { 11, { }, 12, { }, 13, { } }, 20, { 21, { }, 22, { } } } )
+ // )
+ // } )
+ aout << "to_string: " << to_string(self->last_dequeued()) << endl;
+ set_next_behavior();
+ }
+ );
+}
+
int main() {
// the tree_type_info is owned by libcppa after this function call
announce(typeid(tree), new tree_type_info);
- tree t; // create a tree and fill it with some data
+ tree t0; // create a tree and fill it with some data
- t.root.add_child(10);
- t.root.children.back().add_child(11).add_child(12).add_child(13);
+ t0.root.add_child(10);
+ t0.root.children.back().add_child(11).add_child(12).add_child(13);
- t.root.add_child(20);
- t.root.children.back().add_child(21).add_child(22);
+ t0.root.add_child(20);
+ t0.root.children.back().add_child(21).add_child(22);
/*
tree t is now:
@@ -162,44 +198,20 @@ int main() {
11 12 13 21 22
*/
- // send a tree to ourselves ...
- send(self, t);
+ // spawn a testee that receives two messages
+ auto t = spawn(testee, 2);
- // send a vector of trees to ourselves
- typedef std::vector<tree> tree_vector;
+ // send a tree
+ send(t, t0);
+
+ // send a vector of trees
announce<tree_vector>();
tree_vector tvec;
- tvec.push_back(t);
- tvec.push_back(t);
- send(self, tvec);
-
- // receive both messages
- int i = 0;
- receive_for(i, 2) (
- // ... and receive it
- on<tree>() >> [](const tree& tmsg) {
- // prints the tree in its serialized format:
- // @<> ( { tree ( 0, { 10, { 11, { }, 12, { }, 13, { } }, 20, { 21, { }, 22, { } } } ) } )
- cout << "to_string(self->last_dequeued()): "
- << to_string(self->last_dequeued())
- << endl;
- // prints the tree using the print member function:
- // 0 { 10 { 11, 12, 13 } , 20 { 21, 22 } }
- tmsg.print();
- },
- on<tree_vector>() >> [](const tree_vector& trees) {
- // prints "received 2 trees"
- cout << "received " << trees.size() << " trees" << endl;
- // prints:
- // @<> ( {
- // std::vector<tree,std::allocator<tree>> ( {
- // tree ( 0, { 10, { 11, { }, 12, { }, 13, { } }, 20, { 21, { }, 22, { } } } ),
- // tree ( 0, { 10, { 11, { }, 12, { }, 13, { } }, 20, { 21, { }, 22, { } } } )
- // )
- // } )
- cout << "to_string: " << to_string(self->last_dequeued()) << endl;
- }
- );
+ tvec.push_back(t0);
+ tvec.push_back(t0);
+ send(t, tvec);
+
+ await_all_others_done();
shutdown();
return 0;
}
View
4 src/scheduled_actor.cpp
@@ -43,6 +43,7 @@ scheduled_actor::scheduled_actor(actor_state init_state, bool chained_send)
void scheduled_actor::attach_to_scheduler(scheduler* sched, bool hidden) {
CPPA_REQUIRE(sched != nullptr);
+ m_scheduler = sched;
m_hidden = hidden;
// init is called by the spawning actor, manipulate self to
// point to this actor
@@ -52,7 +53,6 @@ void scheduled_actor::attach_to_scheduler(scheduler* sched, bool hidden) {
catch (...) { }
// make sure scheduler is not set until init() is done
std::atomic_thread_fence(std::memory_order_seq_cst);
- m_scheduler = sched;
}
bool scheduled_actor::initialized() const {
@@ -95,7 +95,7 @@ bool scheduled_actor::enqueue(actor_state next_state,
mailbox_element* e) {
CPPA_REQUIRE( next_state == actor_state::ready
|| next_state == actor_state::pending);
- CPPA_REQUIRE(e->marked == false);
+ CPPA_REQUIRE(e != nullptr && e->marked == false);
switch (m_mailbox.enqueue(e)) {
case intrusive::first_enqueued: {
auto state = m_state.load();
View
24 src/self.cpp
@@ -96,14 +96,6 @@ void tss_reset(local_actor* ptr, bool inc_ref_count = true) {
} // namespace <anonymous>
-bool operator==(const actor_ptr& lhs, const self_type& rhs) {
- return lhs.get() == rhs.get();
-}
-
-bool operator!=(const self_type& lhs, const actor_ptr& rhs) {
- return lhs.get() != rhs.get();
-}
-
void self_type::cleanup_fun(cppa::local_actor* what) {
if (what) {
auto ptr = dynamic_cast<thread_mapped_actor*>(what);
@@ -139,4 +131,20 @@ self_type::pointer self_type::release_impl() {
return tss_release();
}
+bool operator==(const actor_ptr& lhs, const self_type& rhs) {
+ return lhs.get() == rhs.get();
+}
+
+bool operator==(const self_type& lhs, const actor_ptr& rhs) {
+ return rhs == lhs;
+}
+
+bool operator!=(const actor_ptr& lhs, const self_type& rhs) {
+ return !(lhs == rhs);
+}
+
+bool operator!=(const self_type& lhs, const actor_ptr& rhs) {
+ return !(rhs == lhs);
+}
+
} // namespace cppa
View
2  unit_testing/ping_pong.cpp
@@ -22,7 +22,7 @@ behavior ping_behavior(size_t num_pings) {
CPPA_LOGF_ERROR_IF(!self->last_sender(), "last_sender() == nullptr");
//cout << to_string(self->last_dequeued()) << endl;
if (++s_pongs >= num_pings) {
- quit_actor(self->last_sender(), exit_reason::user_defined);
+ send_exit(self->last_sender(), exit_reason::user_defined);
self->quit();
}
else reply(atom("ping"), value);
View
14 unit_testing/test_spawn.cpp
@@ -202,7 +202,7 @@ string behavior_test(actor_ptr et) {
throw runtime_error(testee_name + " does not reply");
}
);
- quit_actor(et, exit_reason::user_defined);
+ send_exit(et, exit_reason::user_defined);
await_all_others_done();
return result;
}
@@ -309,7 +309,7 @@ int main() {
on("hello mirror") >> CPPA_CHECKPOINT_CB(),
others() >> CPPA_UNEXPECTED_MSG_CB()
);
- quit_actor(mirror, exit_reason::user_defined);
+ send_exit(mirror, exit_reason::user_defined);
receive (
on(atom("DOWN"), exit_reason::user_defined) >> CPPA_CHECKPOINT_CB(),
others() >> CPPA_UNEXPECTED_MSG_CB()
@@ -325,7 +325,7 @@ int main() {
on("hello mirror") >> CPPA_CHECKPOINT_CB(),
others() >> CPPA_UNEXPECTED_MSG_CB()
);
- quit_actor(mirror, exit_reason::user_defined);
+ send_exit(mirror, exit_reason::user_defined);
receive (
on(atom("DOWN"), exit_reason::user_defined) >> CPPA_CHECKPOINT_CB(),
others() >> CPPA_UNEXPECTED_MSG_CB()
@@ -429,7 +429,7 @@ int main() {
CPPA_CHECK(values == expected);
}
// terminate st
- quit_actor(st, exit_reason::user_defined);
+ send_exit(st, exit_reason::user_defined);
await_all_others_done();
CPPA_CHECKPOINT();
@@ -625,8 +625,8 @@ int main() {
CPPA_CHECK_EQUAL(name, "bob");
}
);
- quit_actor(a1, exit_reason::user_defined);
- quit_actor(a2, exit_reason::user_defined);
+ send_exit(a1, exit_reason::user_defined);
+ send_exit(a2, exit_reason::user_defined);
await_all_others_done();
factory::event_based([](int* i) {
@@ -652,7 +652,7 @@ int main() {
}
become(others() >> CPPA_UNEXPECTED_MSG_CB());
});
- quit_actor(legion, exit_reason::user_defined);
+ send_exit(legion, exit_reason::user_defined);
await_all_others_done();
CPPA_CHECKPOINT();
self->trap_exit(true);
View
4 unit_testing/test_sync_send.cpp
@@ -215,7 +215,7 @@ int main() {
.continue_with([&] { continuation_called = true; });
self->exec_behavior_stack();
CPPA_CHECK_EQUAL(continuation_called, true);
- quit_actor(mirror, exit_reason::user_defined);
+ send_exit(mirror, exit_reason::user_defined);
await_all_others_done();
CPPA_CHECKPOINT();
auto await_success_message = [&] {
@@ -271,7 +271,7 @@ int main() {
sync_send(c, atom("gogo")).then(CPPA_CHECKPOINT_CB())
.continue_with(CPPA_CHECKPOINT_CB());
self->exec_behavior_stack();
- quit_actor(c, exit_reason::user_defined);
+ send_exit(c, exit_reason::user_defined);
await_all_others_done();
CPPA_CHECKPOINT();

No commit comments for this range

Something went wrong with that request. Please try again.