diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 1ec6239ee..e3e4703df 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -1,2 +1,3 @@ #available benchmarks add_subdirectory(message) +add_subdirectory(actors) diff --git a/benchmark/actors/CMakeLists.txt b/benchmark/actors/CMakeLists.txt new file mode 100644 index 000000000..3d4d66ade --- /dev/null +++ b/benchmark/actors/CMakeLists.txt @@ -0,0 +1,2 @@ +add_subdirectory(singlethreaded) +add_subdirectory(multithreaded) diff --git a/benchmark/actors/multithreaded/CMakeLists.txt b/benchmark/actors/multithreaded/CMakeLists.txt new file mode 100644 index 000000000..3e92b4da4 --- /dev/null +++ b/benchmark/actors/multithreaded/CMakeLists.txt @@ -0,0 +1,23 @@ +cmake_minimum_required(VERSION 3.0) + +set(project benchmark_actors_multithreaded) +if (CMAKE_VERSION VERSION_LESS 3.0) + PROJECT(${project} CXX) +else () + cmake_policy(SET CMP0048 NEW) + PROJECT(${project} VERSION "${CMAKE_PROJECT_VERSION}" LANGUAGES CXX) +endif () +set(${PROJECT_NAME}_SOURCES + counters.cpp + main.cpp +) +set(${PROJECT_NAME}_HEADERS + define_actor.hpp + define_supervisor.hpp + fixtures.hpp +) +message(STATUS "PROJECT_NAME = ${PROJECT_NAME}") +add_executable(${PROJECT_NAME} + ${${PROJECT_NAME}_SOURCES} + ${${PROJECT_NAME}_HEADERS}) +target_link_libraries(${PROJECT_NAME} actor-zeta CONAN_PKG::benchmark) diff --git a/benchmark/actors/multithreaded/counters.cpp b/benchmark/actors/multithreaded/counters.cpp new file mode 100644 index 000000000..e7c29e4d7 --- /dev/null +++ b/benchmark/actors/multithreaded/counters.cpp @@ -0,0 +1,3 @@ +#include "counters.h" + +std::atomic_int counter_g(0); diff --git a/benchmark/actors/multithreaded/counters.h b/benchmark/actors/multithreaded/counters.h new file mode 100644 index 000000000..d9094afbc --- /dev/null +++ b/benchmark/actors/multithreaded/counters.h @@ -0,0 +1,5 @@ +#pragma once + +#include + +extern std::atomic_int counter_g; diff --git a/benchmark/actors/multithreaded/define_actor.hpp b/benchmark/actors/multithreaded/define_actor.hpp new file mode 100644 index 000000000..afca47683 --- /dev/null +++ b/benchmark/actors/multithreaded/define_actor.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +template +struct counter_args_t { + static constexpr size_t value = sizeof...(Args); +}; + +#define DEFINE_ACTOR(actor_name, supervisor_name, ...) \ + auto set_done(supervisor_name* sup_ptr_)->void; \ + class actor_name final : public actor_zeta::basic_async_actor { \ + supervisor_name* sup_ptr_; \ + std::map address_book_; \ + template \ + auto perform(command_t cmd, actor_zeta::type_traits::index_sequence) -> void { \ + actor_zeta::send(address_book_.begin()->second, address(), cmd, I...); \ + } \ + \ + public: \ + actor_name(supervisor_name* ptr) \ + : actor_zeta::basic_async_actor(ptr, names::actor) \ + , sup_ptr_(ptr) { \ + add_handler(command_t::add_address, &actor_name::add_address); \ + add_handler(command_t::start, &actor_name::start); \ + add_handler(command_t::ping, &actor_name::ping); \ + add_handler(command_t::pong, &actor_name::pong); \ + } \ + \ + ~actor_name() override = default; \ + \ + void add_address(actor_zeta::address_t address, name_t name) { \ + if (address && this != address.get()) { \ + address_book_.emplace(name, std::move(address)); \ + } \ + } \ + \ + void start() { \ + perform(command_t::ping, actor_zeta::type_traits::make_index_sequence::value>()); \ + } \ + \ + void ping(__VA_ARGS__) { \ + std::cout << std::this_thread::get_id() << " :: PING " << reinterpret_cast(this) << std::endl; \ + counter_g++; \ + perform(command_t::pong, actor_zeta::type_traits::make_index_sequence::value>()); \ + } \ + \ + void pong(__VA_ARGS__) { \ + std::cout << std::this_thread::get_id() << " :: PONG " << reinterpret_cast(this) << std::endl; \ + counter_g++; \ + set_done(sup_ptr_); \ + } \ + } diff --git a/benchmark/actors/multithreaded/define_supervisor.hpp b/benchmark/actors/multithreaded/define_supervisor.hpp new file mode 100644 index 000000000..9b015c037 --- /dev/null +++ b/benchmark/actors/multithreaded/define_supervisor.hpp @@ -0,0 +1,80 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +auto thread_pool_deleter = [](actor_zeta::scheduler_abstract_t* ptr) { + ptr->stop(); + delete ptr; +}; + +#define DEFINE_SUPERVISOR(supervisor_name, actor_name, num_worker_threads) \ + class supervisor_name final : public actor_zeta::cooperative_supervisor { \ + public: \ + supervisor_name(memory_resource* ptr) \ + : actor_zeta::cooperative_supervisor(ptr, names::supervisor) \ + , sync_() \ + , e_(new actor_zeta::scheduler_t( \ + num_worker_threads, \ + 100), \ + thread_pool_deleter) { \ + e_->start(); \ + add_handler(command_t::prepare, &supervisor_name::prepare); \ + add_handler(command_t::send, &supervisor_name::send); \ + add_handler(command_t::done, &supervisor_name::done); \ + } \ + \ + void prepare() { \ + auto address_0 = spawn_actor([this](actor_name* ptr) { \ + actors_.emplace(name_t::actor_0, ptr); \ + }); \ + auto address_1 = spawn_actor([this](actor_name* ptr) { \ + actors_.emplace(name_t::actor_1, ptr); \ + }); \ + address_book_.emplace(name_t::actor_0, std::move(address_0)); \ + address_book_.emplace(name_t::actor_1, std::move(address_1)); \ + \ + actor_zeta::send(address_book_.at(name_t::actor_0), address(), command_t::add_address, address_book_.at(name_t::actor_1), name_t::actor_1); \ + actor_zeta::send(address_book_.at(name_t::actor_1), address(), command_t::add_address, address_book_.at(name_t::actor_0), name_t::actor_0); \ + } \ + \ + void send() { \ + assert(actors_.size() == 2); \ + actors_.at(name_t::actor_0)->start(); \ + sync_.sem_wait_(); \ + std::cout << std::this_thread::get_id() << " :: sem_wait_ RETURNED " << reinterpret_cast(this) << std::endl; \ + } \ + void done() { \ + sync_.sem_post_(); \ + std::cout << std::this_thread::get_id() << " :: sem_post_ RETURNED " << reinterpret_cast(this) << std::endl; \ + } \ + \ + protected: \ + auto scheduler_impl() noexcept -> actor_zeta::scheduler_abstract_t* override { \ + return e_.get(); \ + } \ + \ + auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final { \ + { \ + auto ptr = msg.get(); \ + set_current_message(std::move(msg)); \ + execute(this, current_message()); \ + delete ptr; \ + } \ + } \ + \ + private: \ + base_single_t sync_; \ + std::unique_ptr e_; \ + std::map address_book_; \ + std::map actors_; \ + }; \ + auto set_done(supervisor_name* sup_ptr_)->void { \ + std::cout << std::this_thread::get_id() << " :: Going to sem done ... .. . " << reinterpret_cast(sup_ptr_) << std::endl; \ + sup_ptr_->done(); \ + } diff --git a/benchmark/actors/multithreaded/fixtures.hpp b/benchmark/actors/multithreaded/fixtures.hpp new file mode 100644 index 000000000..d1eb628c8 --- /dev/null +++ b/benchmark/actors/multithreaded/fixtures.hpp @@ -0,0 +1,141 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include "define_actor.hpp" +#include "define_supervisor.hpp" +#include "counters.h" + +using actor_zeta::detail::pmr::memory_resource; + +enum class command_t : uint64_t { + add_link, + add_address, + start, + ping, + pong, + prepare, + send, + done, +}; + +enum class name_t : uint64_t { + actor_0, + actor_1, + sup, +}; + +namespace names { + + static const std::string actor("storage"); + static const std::string supervisor("supervisor"); + +} // namespace names + +struct sync_t { + sem_t prodToRoutine; +}; + +struct base_single_t { + sync_t sync_; + + base_single_t() { + sem_init_(); + } + virtual ~base_single_t() { + sem_destroy_(); + } + + auto sem_init_() -> int { + return sem_init(&sync_.prodToRoutine, 0, 0); + } + + auto sem_post_() -> int { + return sem_post(&sync_.prodToRoutine); + } + + auto sem_wait_() -> int { + return sem_wait(&sync_.prodToRoutine); + } + + auto sem_destroy_() -> int { + return sem_destroy(&sync_.prodToRoutine); + } +}; + +#define DEFINE_SET(actor, supervisor, num_worker_threads, fixture, ...) \ + class supervisor; \ + DEFINE_ACTOR(actor, supervisor, __VA_ARGS__); \ + DEFINE_SUPERVISOR(supervisor, actor, num_worker_threads); \ + class fixture : public ::benchmark::Fixture { \ + public: \ + fixture() \ + : sup_(nullptr, actor_zeta::deleter(nullptr)) {} \ + virtual void SetUp(__attribute__((unused)) const ::benchmark::State& state) final { \ + auto* resource = actor_zeta::detail::pmr::get_default_resource(); \ + sup_ = actor_zeta::spawn_supervisor(resource); \ + actor_zeta::send(sup_, actor_zeta::address_t::empty_address(), command_t::prepare); \ + std::this_thread::sleep_for(std::chrono::milliseconds(15)); \ + counter_g = 0; \ + } \ + virtual void SetUp(::benchmark::State& state) final { \ + SetUp(static_cast(state)); \ + } \ + virtual void TearDown(__attribute__((unused)) const ::benchmark::State& state) final { \ + sup_.reset(); \ + } \ + virtual void TearDown(::benchmark::State& state) final { \ + TearDown(static_cast(state)); \ + } \ + \ + std::unique_ptr sup_; \ + } + +#define REGISTER_BENCHMARK(fixture, bm_name) \ + BENCHMARK_DEFINE_F(fixture, bm_name) \ + (benchmark::State & state) { \ + while (state.KeepRunning()) { \ + std::cout << "+++ STARTED +++" << std::endl; \ + counter_g = 0; \ + sup_->send(); \ + /*std::this_thread::sleep_for(std::chrono::milliseconds(5));*/ \ + std::cout << "--- STOPPED counter_g = " << counter_g << std::endl; \ + assert(counter_g == 2); \ + } \ + } \ + BENCHMARK_REGISTER_F(fixture, bm_name)->DenseRange(0, 32, 8) + +#define REGISTER_BENCHMARKS(type, num_worker_threads) \ + DEFINE_SET(actor_0_##type, supervisor_0_##type, num_worker_threads, fixture_0_##type); \ + DEFINE_SET(actor_1_##type, supervisor_1_##type, num_worker_threads, fixture_1_##type, type); \ + DEFINE_SET(actor_2_##type, supervisor_2_##type, num_worker_threads, fixture_2_##type, type, type); \ + DEFINE_SET(actor_3_##type, supervisor_3_##type, num_worker_threads, fixture_3_##type, type, type, type); \ + DEFINE_SET(actor_4_##type, supervisor_4_##type, num_worker_threads, fixture_4_##type, type, type, type, type); \ + DEFINE_SET(actor_5_##type, supervisor_5_##type, num_worker_threads, fixture_5_##type, type, type, type, type, type); \ + DEFINE_SET(actor_6_##type, supervisor_6_##type, num_worker_threads, fixture_6_##type, type, type, type, type, type, type); \ + DEFINE_SET(actor_7_##type, supervisor_7_##type, num_worker_threads, fixture_7_##type, type, type, type, type, type, type, type); \ + DEFINE_SET(actor_8_##type, supervisor_8_##type, num_worker_threads, fixture_8_##type, type, type, type, type, type, type, type, type); \ + DEFINE_SET(actor_9_##type, supervisor_9_##type, num_worker_threads, fixture_9_##type, type, type, type, type, type, type, type, type, type); \ + DEFINE_SET(actor_10_##type, supervisor_10_##type, num_worker_threads, fixture_10_##type, type, type, type, type, type, type, type, type, type, type); \ + REGISTER_BENCHMARK(fixture_0_##type, ping_pong_single_threaded_0_##type); \ + REGISTER_BENCHMARK(fixture_1_##type, ping_pong_single_threaded_1_##type); \ + REGISTER_BENCHMARK(fixture_2_##type, ping_pong_single_threaded_2_##type); \ + REGISTER_BENCHMARK(fixture_3_##type, ping_pong_single_threaded_3_##type); \ + REGISTER_BENCHMARK(fixture_4_##type, ping_pong_single_threaded_4_##type); \ + REGISTER_BENCHMARK(fixture_5_##type, ping_pong_single_threaded_5_##type); \ + REGISTER_BENCHMARK(fixture_6_##type, ping_pong_single_threaded_6_##type); \ + REGISTER_BENCHMARK(fixture_7_##type, ping_pong_single_threaded_7_##type); \ + REGISTER_BENCHMARK(fixture_8_##type, ping_pong_single_threaded_8_##type); \ + REGISTER_BENCHMARK(fixture_9_##type, ping_pong_single_threaded_9_##type); \ + REGISTER_BENCHMARK(fixture_10_##type, ping_pong_single_threaded_10_##type) diff --git a/benchmark/actors/multithreaded/main.cpp b/benchmark/actors/multithreaded/main.cpp new file mode 100644 index 000000000..aaffc4bb1 --- /dev/null +++ b/benchmark/actors/multithreaded/main.cpp @@ -0,0 +1,38 @@ +#include + +#include "fixtures.hpp" + +/** + * @brief + * + * This multithreaded benchmark demonstrates CRITICAL BUG in thread synchronization on workers + * and blocks indefinitly on condition variable wait. + * @TODO CRITICAL BUG on condition in cooperative_actor::enqueue_impl: 'flags() != static_cast(state::empty)' + * infinite waiting on condition_variable in work_sharing::dequeue + */ + +REGISTER_BENCHMARKS(size_t, 2); +REGISTER_BENCHMARKS(int8_t, 2); +REGISTER_BENCHMARKS(int16_t, 2); +REGISTER_BENCHMARKS(int32_t, 2); +REGISTER_BENCHMARKS(int64_t, 2); +REGISTER_BENCHMARKS(short, 2); + +class memory_manager_t : public benchmark::MemoryManager { + void Start() BENCHMARK_OVERRIDE {} + void Stop(Result* result) BENCHMARK_OVERRIDE { + } +}; + +// Run the benchmark +int main(int argc, char** argv) { + benchmark::Initialize(&argc, argv); + if (benchmark::ReportUnrecognizedArguments(argc, argv)) + return 1; + std::unique_ptr mm(new memory_manager_t()); + benchmark::RegisterMemoryManager(mm.get()); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); + benchmark::RegisterMemoryManager(nullptr); + return 0; +} diff --git a/benchmark/actors/singlethreaded/CMakeLists.txt b/benchmark/actors/singlethreaded/CMakeLists.txt new file mode 100644 index 000000000..ffa861315 --- /dev/null +++ b/benchmark/actors/singlethreaded/CMakeLists.txt @@ -0,0 +1,22 @@ +cmake_minimum_required(VERSION 3.0) + +set(project benchmark_actors_singlethreaded) +if (CMAKE_VERSION VERSION_LESS 3.0) + PROJECT(${project} CXX) +else () + cmake_policy(SET CMP0048 NEW) + PROJECT(${project} VERSION "${CMAKE_PROJECT_VERSION}" LANGUAGES CXX) +endif () +set(${PROJECT_NAME}_SOURCES + main.cpp +) +set(${PROJECT_NAME}_HEADERS + define_actor.hpp + define_supervisor.hpp + fixtures.hpp +) +message(STATUS "PROJECT_NAME = ${PROJECT_NAME}") +add_executable(${PROJECT_NAME} + ${${PROJECT_NAME}_SOURCES} + ${${PROJECT_NAME}_HEADERS}) +target_link_libraries(${PROJECT_NAME} actor-zeta CONAN_PKG::benchmark) diff --git a/benchmark/actors/singlethreaded/define_actor.hpp b/benchmark/actors/singlethreaded/define_actor.hpp new file mode 100644 index 000000000..ec3426eef --- /dev/null +++ b/benchmark/actors/singlethreaded/define_actor.hpp @@ -0,0 +1,64 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +template +struct counter_args_t { + static constexpr size_t value = sizeof...(Args); +}; + +#define DEFINE_ACTOR(actor_name, supervisor_name, ...) \ + class actor_name final : public actor_zeta::basic_async_actor { \ + std::map address_book_; \ + template \ + auto perform(command_t cmd, actor_zeta::type_traits::index_sequence) -> void { \ + actor_zeta::send(address_book_.begin()->second, address(), cmd, I...); \ + } \ + \ + public: \ + actor_name(supervisor_name* ptr) \ + : actor_zeta::basic_async_actor(ptr, names::actor) { \ + add_handler(command_t::add_address, &actor_name::add_address); \ + add_handler(command_t::start, &actor_name::start); \ + add_handler(command_t::ping, &actor_name::ping); \ + add_handler(command_t::pong, &actor_name::pong); \ + } \ + \ + ~actor_name() override = default; \ + \ + void add_address(actor_zeta::address_t address, name_t name) { \ + if (address && this != address.get()) { \ + address_book_.emplace(name, std::move(address)); \ + } \ + } \ + \ + void start() { \ + perform(command_t::ping, actor_zeta::type_traits::make_index_sequence::value>()); \ + } \ + \ + void ping(__VA_ARGS__) { \ + /*std::cout << " PING " << reinterpret_cast(this) << std::endl;*/ \ + perform(command_t::pong, actor_zeta::type_traits::make_index_sequence::value>()); \ + } \ + \ + void pong(__VA_ARGS__) { \ + /*std::cout << " PONG " << reinterpret_cast(this) << std::endl;*/ \ + } \ + \ + private: \ + auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final { \ + { \ + auto ptr = msg.get(); \ + set_current_message(std::move(msg)); \ + execute(this, current_message()); \ + delete ptr; \ + } \ + } \ + } diff --git a/benchmark/actors/singlethreaded/define_supervisor.hpp b/benchmark/actors/singlethreaded/define_supervisor.hpp new file mode 100644 index 000000000..d4a363d9f --- /dev/null +++ b/benchmark/actors/singlethreaded/define_supervisor.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +#define DEFINE_SUPERVISOR(supervisor_name, actor_name) \ + class supervisor_name final : public actor_zeta::cooperative_supervisor { \ + public: \ + supervisor_name(memory_resource* ptr) \ + : actor_zeta::cooperative_supervisor(ptr, names::supervisor) { \ + add_handler(command_t::prepare, &supervisor_name::prepare); \ + add_handler(command_t::send, &supervisor_name::send); \ + } \ + \ + void prepare() { \ + auto address_0 = spawn_actor([this](actor_name* ptr) { \ + actors_.emplace(name_t::actor_0, ptr); \ + }); \ + auto address_1 = spawn_actor([this](actor_name* ptr) { \ + actors_.emplace(name_t::actor_1, ptr); \ + }); \ + address_book_.emplace(name_t::actor_0, std::move(address_0)); \ + address_book_.emplace(name_t::actor_1, std::move(address_1)); \ + \ + actor_zeta::send(address_book_.at(name_t::actor_0), address(), command_t::add_address, address_book_.at(name_t::actor_1), name_t::actor_1); \ + actor_zeta::send(address_book_.at(name_t::actor_1), address(), command_t::add_address, address_book_.at(name_t::actor_0), name_t::actor_0); \ + } \ + \ + void send() { \ + assert(actors_.size() == 2); \ + actors_.at(name_t::actor_0)->start(); \ + } \ + \ + protected: \ + auto scheduler_impl() noexcept -> actor_zeta::scheduler_abstract_t* override { \ + return nullptr; \ + } \ + \ + auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final { \ + { \ + auto ptr = msg.get(); \ + set_current_message(std::move(msg)); \ + execute(this, current_message()); \ + delete ptr; \ + } \ + } \ + \ + private: \ + std::map address_book_; \ + std::map actors_; \ + } diff --git a/benchmark/actors/singlethreaded/fixtures.hpp b/benchmark/actors/singlethreaded/fixtures.hpp new file mode 100644 index 000000000..72ac88ddb --- /dev/null +++ b/benchmark/actors/singlethreaded/fixtures.hpp @@ -0,0 +1,98 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include "define_actor.hpp" +#include "define_supervisor.hpp" + +using actor_zeta::detail::pmr::memory_resource; + +enum class command_t : uint64_t { + add_link, + add_address, + start, + ping, + pong, + prepare, + send, +}; + +enum class name_t : uint64_t { + actor_0, + actor_1, +}; + +namespace names { + + static const std::string actor("storage"); + static const std::string supervisor("supervisor"); + +} // namespace names + +#define DEFINE_SET(actor, supervisor, fixture, ...) \ + class supervisor; \ + DEFINE_ACTOR(actor, supervisor, __VA_ARGS__); \ + DEFINE_SUPERVISOR(supervisor, actor); \ + class fixture : public ::benchmark::Fixture { \ + public: \ + fixture() \ + : sup_(nullptr, actor_zeta::deleter(nullptr)) {} \ + virtual void SetUp(__attribute__((unused)) const ::benchmark::State& state) final { \ + auto* resource = actor_zeta::detail::pmr::get_default_resource(); \ + sup_ = actor_zeta::spawn_supervisor(resource); \ + actor_zeta::send(sup_, actor_zeta::address_t::empty_address(), command_t::prepare); \ + } \ + virtual void SetUp(::benchmark::State& state) final { \ + SetUp(static_cast(state)); \ + } \ + virtual void TearDown(__attribute__((unused)) const ::benchmark::State& state) final { \ + sup_.reset(); \ + } \ + virtual void TearDown(::benchmark::State& state) final { \ + TearDown(static_cast(state)); \ + } \ + \ + std::unique_ptr sup_; \ + } + +#define REGISTER_BENCHMARK(fixture, bm_name) \ + BENCHMARK_DEFINE_F(fixture, bm_name) \ + (benchmark::State & state) { \ + while (state.KeepRunning()) { \ + sup_->send(); \ + } \ + } \ + BENCHMARK_REGISTER_F(fixture, bm_name)->DenseRange(0, 32, 8) + +#define REGISTER_BENCHMARKS(type) \ + DEFINE_SET(actor_0_##type, supervisor_0_##type, fixture_0_##type); \ + DEFINE_SET(actor_1_##type, supervisor_1_##type, fixture_1_##type, type); \ + DEFINE_SET(actor_2_##type, supervisor_2_##type, fixture_2_##type, type, type); \ + DEFINE_SET(actor_3_##type, supervisor_3_##type, fixture_3_##type, type, type, type); \ + DEFINE_SET(actor_4_##type, supervisor_4_##type, fixture_4_##type, type, type, type, type); \ + DEFINE_SET(actor_5_##type, supervisor_5_##type, fixture_5_##type, type, type, type, type, type); \ + DEFINE_SET(actor_6_##type, supervisor_6_##type, fixture_6_##type, type, type, type, type, type, type); \ + DEFINE_SET(actor_7_##type, supervisor_7_##type, fixture_7_##type, type, type, type, type, type, type, type); \ + DEFINE_SET(actor_8_##type, supervisor_8_##type, fixture_8_##type, type, type, type, type, type, type, type, type); \ + DEFINE_SET(actor_9_##type, supervisor_9_##type, fixture_9_##type, type, type, type, type, type, type, type, type, type); \ + DEFINE_SET(actor_10_##type, supervisor_10_##type, fixture_10_##type, type, type, type, type, type, type, type, type, type, type); \ + REGISTER_BENCHMARK(fixture_0_##type, ping_pong_single_threaded_0_##type); \ + REGISTER_BENCHMARK(fixture_1_##type, ping_pong_single_threaded_1_##type); \ + REGISTER_BENCHMARK(fixture_2_##type, ping_pong_single_threaded_2_##type); \ + REGISTER_BENCHMARK(fixture_3_##type, ping_pong_single_threaded_3_##type); \ + REGISTER_BENCHMARK(fixture_4_##type, ping_pong_single_threaded_4_##type); \ + REGISTER_BENCHMARK(fixture_5_##type, ping_pong_single_threaded_5_##type); \ + REGISTER_BENCHMARK(fixture_6_##type, ping_pong_single_threaded_6_##type); \ + REGISTER_BENCHMARK(fixture_7_##type, ping_pong_single_threaded_7_##type); \ + REGISTER_BENCHMARK(fixture_8_##type, ping_pong_single_threaded_8_##type); \ + REGISTER_BENCHMARK(fixture_9_##type, ping_pong_single_threaded_9_##type); \ + REGISTER_BENCHMARK(fixture_10_##type, ping_pong_single_threaded_10_##type) diff --git a/benchmark/actors/singlethreaded/main.cpp b/benchmark/actors/singlethreaded/main.cpp new file mode 100644 index 000000000..63d69e81c --- /dev/null +++ b/benchmark/actors/singlethreaded/main.cpp @@ -0,0 +1,29 @@ +#include + +#include "fixtures.hpp" + +REGISTER_BENCHMARKS(size_t); +REGISTER_BENCHMARKS(int8_t); +REGISTER_BENCHMARKS(int16_t); +REGISTER_BENCHMARKS(int32_t); +REGISTER_BENCHMARKS(int64_t); +REGISTER_BENCHMARKS(short); + +class memory_manager_t : public benchmark::MemoryManager { + void Start() BENCHMARK_OVERRIDE {} + void Stop(Result* result) BENCHMARK_OVERRIDE { + } +}; + +// Run the benchmark +int main(int argc, char** argv) { + benchmark::Initialize(&argc, argv); + if (benchmark::ReportUnrecognizedArguments(argc, argv)) + return 1; + std::unique_ptr mm(new memory_manager_t()); + benchmark::RegisterMemoryManager(mm.get()); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); + benchmark::RegisterMemoryManager(nullptr); + return 0; +} diff --git a/header/actor-zeta/base/cooperative_actor.hpp b/header/actor-zeta/base/cooperative_actor.hpp index c47c6c161..6522048e9 100644 --- a/header/actor-zeta/base/cooperative_actor.hpp +++ b/header/actor-zeta/base/cooperative_actor.hpp @@ -29,11 +29,13 @@ namespace actor_zeta { namespace base { cooperative_actor(Supervisor* ptr, std::string type) : cooperative_actor(static_cast(ptr), std::move(type)){}; - void enqueue_impl(mailbox::message_ptr, scheduler::execution_unit*) final; + void enqueue_impl(mailbox::message_ptr, scheduler::execution_unit*); // Non thread-safe method auto current_message() -> mailbox::message* ; + auto set_current_message(mailbox::message_ptr msg) -> void; + private: cooperative_actor(supervisor_abstract*, std::string); diff --git a/header/actor-zeta/impl/actor/cooperative_actor.ipp b/header/actor-zeta/impl/actor/cooperative_actor.ipp index 7dcebdbc4..d48f072ea 100644 --- a/header/actor-zeta/impl/actor/cooperative_actor.ipp +++ b/header/actor-zeta/impl/actor/cooperative_actor.ipp @@ -1,6 +1,6 @@ #pragma once + #include -#include // clang-format off #include @@ -79,6 +79,8 @@ namespace actor_zeta { namespace base { } } + // @TODO CRITICAL BUG on condition 'flags() != static_cast(state::empty)' + /* switch ( mailbox().enqueue(msg.release())) { case detail::enqueue_result::unblocked_reader: { @@ -199,6 +201,10 @@ namespace actor_zeta { namespace base { return current_message_; } + auto cooperative_actor::set_current_message(mailbox::message_ptr msg) -> void { + current_message_ = msg.release(); + } + scheduler::execution_unit* cooperative_actor::context() const { return executor_; } diff --git a/header/actor-zeta/scheduler/policy/work_sharing.hpp b/header/actor-zeta/scheduler/policy/work_sharing.hpp index c0c020333..7e6287e20 100644 --- a/header/actor-zeta/scheduler/policy/work_sharing.hpp +++ b/header/actor-zeta/scheduler/policy/work_sharing.hpp @@ -62,6 +62,8 @@ namespace actor_zeta { namespace scheduler { resumable* dequeue(Worker* self) { auto& parent_data = cast(self->parent()); std::unique_lock guard(parent_data.lock); + // @TODO CRITICAL BUG on condition in cooperative_actor::enqueue_impl: 'flags() != static_cast(state::empty)' + // infinite waiting on condition_variable parent_data.cv.wait(guard, [&] { return !parent_data.queue.empty(); }); resumable* job = parent_data.queue.front(); parent_data.queue.pop_front(); diff --git a/test/spawn-actor/CMakeLists.txt b/test/spawn-actor/CMakeLists.txt new file mode 100644 index 000000000..2da0e2919 --- /dev/null +++ b/test/spawn-actor/CMakeLists.txt @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 3.0) + +PROJECT(spawn-actor CXX) +set(SOURCE_FILES main.cpp) +message(STATUS "PROJECT_NAME = ${PROJECT_NAME}") +add_executable(${PROJECT_NAME} ${SOURCE_FILES}) +target_link_libraries(${PROJECT_NAME} actor-zeta tooltestsuites CONAN_PKG::catch2) + +include(CTest) +include(Catch) +catch_discover_tests(${PROJECT_NAME}) diff --git a/test/spawn-actor/classes.hpp b/test/spawn-actor/classes.hpp new file mode 100644 index 000000000..6f70f09be --- /dev/null +++ b/test/spawn-actor/classes.hpp @@ -0,0 +1 @@ +#pragma once diff --git a/test/spawn-actor/main.cpp b/test/spawn-actor/main.cpp new file mode 100644 index 000000000..19db4036d --- /dev/null +++ b/test/spawn-actor/main.cpp @@ -0,0 +1,88 @@ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include + +#include "classes.hpp" +#include + +#include +#include + +#include +#include +#include + +using actor_zeta::detail::pmr::memory_resource; +class dummy_supervisor; + +static std::atomic_int actor_counter{0}; + +constexpr const static auto update_id = actor_zeta::make_message_id(0); +constexpr const static auto find_id = actor_zeta::make_message_id(1); +constexpr const static auto remove_id = actor_zeta::make_message_id(2); + +class storage_t final : public actor_zeta::basic_async_actor { +public: + storage_t(dummy_supervisor* ptr) + : actor_zeta::basic_async_actor(ptr, "storage") { + add_handler(update_id, []() -> void {}); + add_handler(find_id, []() -> void {}); + add_handler(remove_id, []() -> void {}); + + REQUIRE(std::string("storage") == type()); + auto tmp = message_types(); + REQUIRE(tmp.size() == 3); + std::set control = {update_id, remove_id, find_id}; + std::set diff; + std::set_difference(tmp.begin(), tmp.end(), control.begin(), control.end(), std::inserter(diff, diff.begin())); + REQUIRE(diff.empty()); + actor_counter++; + } + + ~storage_t() override = default; +}; + +constexpr const static auto create_id = actor_zeta::make_message_id(0); + +class dummy_supervisor final : public actor_zeta::cooperative_supervisor { +public: + dummy_supervisor(memory_resource* ptr) + : actor_zeta::cooperative_supervisor(ptr, "dummy_supervisor") + , executor_(new actor_zeta::test::scheduler_test_t(1, 1)) { + add_handler(create_id, &dummy_supervisor::create); + scheduler()->start(); + } + + auto scheduler_test() noexcept -> actor_zeta::test::scheduler_test_t* { + return executor_.get(); + } + + void create() { + spawn_actor([this](storage_t* ptr) { + actors_.emplace_back(ptr); + }); + } + +protected: + auto scheduler_impl() noexcept -> actor_zeta::scheduler_abstract_t* override { + return executor_.get(); + } + + auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final { + { + set_current_message(std::move(msg)); + execute(this,current_message()); + } + } + +private: + std::unique_ptr executor_; + std::vector actors_; +}; + +TEST_CASE("spawn-actor actor") { + auto* mr_ptr = actor_zeta::detail::pmr::get_default_resource(); + auto supervisor = actor_zeta::spawn_supervisor(mr_ptr); + actor_zeta::send(supervisor.get(), actor_zeta::address_t::empty_address(), create_id); + supervisor->scheduler_test()->run_once(); + REQUIRE(actor_counter == 1); +} diff --git a/test/spawn/classes.hpp b/test/spawn/classes.hpp index 6f70f09be..7b9637ef9 100644 --- a/test/spawn/classes.hpp +++ b/test/spawn/classes.hpp @@ -1 +1 @@ -#pragma once +#pragma once \ No newline at end of file