Skip to content

Commit

Permalink
Actor benchmark (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
GremSnoort committed May 27, 2022
1 parent 67b2572 commit 662f96c
Show file tree
Hide file tree
Showing 21 changed files with 735 additions and 3 deletions.
1 change: 1 addition & 0 deletions benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#available benchmarks
add_subdirectory(message)
add_subdirectory(actors)
2 changes: 2 additions & 0 deletions benchmark/actors/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
add_subdirectory(singlethreaded)
add_subdirectory(multithreaded)
23 changes: 23 additions & 0 deletions benchmark/actors/multithreaded/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
cmake_minimum_required(VERSION 3.0)

set(project benchmark_actors_multithreaded)
if (CMAKE_VERSION VERSION_LESS 3.0)
PROJECT(${project} CXX)
else ()
cmake_policy(SET CMP0048 NEW)
PROJECT(${project} VERSION "${CMAKE_PROJECT_VERSION}" LANGUAGES CXX)
endif ()
set(${PROJECT_NAME}_SOURCES
counters.cpp
main.cpp
)
set(${PROJECT_NAME}_HEADERS
define_actor.hpp
define_supervisor.hpp
fixtures.hpp
)
message(STATUS "PROJECT_NAME = ${PROJECT_NAME}")
add_executable(${PROJECT_NAME}
${${PROJECT_NAME}_SOURCES}
${${PROJECT_NAME}_HEADERS})
target_link_libraries(${PROJECT_NAME} actor-zeta CONAN_PKG::benchmark)
3 changes: 3 additions & 0 deletions benchmark/actors/multithreaded/counters.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "counters.h"

std::atomic_int counter_g(0);
5 changes: 5 additions & 0 deletions benchmark/actors/multithreaded/counters.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#pragma once

#include <atomic>

extern std::atomic_int counter_g;
60 changes: 60 additions & 0 deletions benchmark/actors/multithreaded/define_actor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#pragma once

#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <vector>

#include <actor-zeta.hpp>
#include <actor-zeta/detail/type_traits.hpp>

template<class... Args>
struct counter_args_t {
static constexpr size_t value = sizeof...(Args);
};

#define DEFINE_ACTOR(actor_name, supervisor_name, ...) \
auto set_done(supervisor_name* sup_ptr_)->void; \
class actor_name final : public actor_zeta::basic_async_actor { \
supervisor_name* sup_ptr_; \
std::map<name_t, actor_zeta::address_t> address_book_; \
template<size_t... I> \
auto perform(command_t cmd, actor_zeta::type_traits::index_sequence<I...>) -> void { \
actor_zeta::send(address_book_.begin()->second, address(), cmd, I...); \
} \
\
public: \
actor_name(supervisor_name* ptr) \
: actor_zeta::basic_async_actor(ptr, names::actor) \
, sup_ptr_(ptr) { \
add_handler(command_t::add_address, &actor_name::add_address); \
add_handler(command_t::start, &actor_name::start); \
add_handler(command_t::ping, &actor_name::ping); \
add_handler(command_t::pong, &actor_name::pong); \
} \
\
~actor_name() override = default; \
\
void add_address(actor_zeta::address_t address, name_t name) { \
if (address && this != address.get()) { \
address_book_.emplace(name, std::move(address)); \
} \
} \
\
void start() { \
perform(command_t::ping, actor_zeta::type_traits::make_index_sequence<counter_args_t<__VA_ARGS__>::value>()); \
} \
\
void ping(__VA_ARGS__) { \
std::cout << std::this_thread::get_id() << " :: PING " << reinterpret_cast<void*>(this) << std::endl; \
counter_g++; \
perform(command_t::pong, actor_zeta::type_traits::make_index_sequence<counter_args_t<__VA_ARGS__>::value>()); \
} \
\
void pong(__VA_ARGS__) { \
std::cout << std::this_thread::get_id() << " :: PONG " << reinterpret_cast<void*>(this) << std::endl; \
counter_g++; \
set_done(sup_ptr_); \
} \
}
80 changes: 80 additions & 0 deletions benchmark/actors/multithreaded/define_supervisor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#pragma once

#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <vector>

#include <actor-zeta.hpp>

auto thread_pool_deleter = [](actor_zeta::scheduler_abstract_t* ptr) {
ptr->stop();
delete ptr;
};

#define DEFINE_SUPERVISOR(supervisor_name, actor_name, num_worker_threads) \
class supervisor_name final : public actor_zeta::cooperative_supervisor<supervisor_name> { \
public: \
supervisor_name(memory_resource* ptr) \
: actor_zeta::cooperative_supervisor<supervisor_name>(ptr, names::supervisor) \
, sync_() \
, e_(new actor_zeta::scheduler_t<actor_zeta::work_sharing>( \
num_worker_threads, \
100), \
thread_pool_deleter) { \
e_->start(); \
add_handler(command_t::prepare, &supervisor_name::prepare); \
add_handler(command_t::send, &supervisor_name::send); \
add_handler(command_t::done, &supervisor_name::done); \
} \
\
void prepare() { \
auto address_0 = spawn_actor([this](actor_name* ptr) { \
actors_.emplace(name_t::actor_0, ptr); \
}); \
auto address_1 = spawn_actor([this](actor_name* ptr) { \
actors_.emplace(name_t::actor_1, ptr); \
}); \
address_book_.emplace(name_t::actor_0, std::move(address_0)); \
address_book_.emplace(name_t::actor_1, std::move(address_1)); \
\
actor_zeta::send(address_book_.at(name_t::actor_0), address(), command_t::add_address, address_book_.at(name_t::actor_1), name_t::actor_1); \
actor_zeta::send(address_book_.at(name_t::actor_1), address(), command_t::add_address, address_book_.at(name_t::actor_0), name_t::actor_0); \
} \
\
void send() { \
assert(actors_.size() == 2); \
actors_.at(name_t::actor_0)->start(); \
sync_.sem_wait_(); \
std::cout << std::this_thread::get_id() << " :: sem_wait_ RETURNED " << reinterpret_cast<void*>(this) << std::endl; \
} \
void done() { \
sync_.sem_post_(); \
std::cout << std::this_thread::get_id() << " :: sem_post_ RETURNED " << reinterpret_cast<void*>(this) << std::endl; \
} \
\
protected: \
auto scheduler_impl() noexcept -> actor_zeta::scheduler_abstract_t* override { \
return e_.get(); \
} \
\
auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final { \
{ \
auto ptr = msg.get(); \
set_current_message(std::move(msg)); \
execute(this, current_message()); \
delete ptr; \
} \
} \
\
private: \
base_single_t sync_; \
std::unique_ptr<actor_zeta::scheduler_abstract_t, decltype(thread_pool_deleter)> e_; \
std::map<name_t, actor_zeta::address_t> address_book_; \
std::map<name_t, actor_name*> actors_; \
}; \
auto set_done(supervisor_name* sup_ptr_)->void { \
std::cout << std::this_thread::get_id() << " :: Going to sem done ... .. . " << reinterpret_cast<void*>(sup_ptr_) << std::endl; \
sup_ptr_->done(); \
}
Loading

0 comments on commit 662f96c

Please sign in to comment.