From faca78cd576ed344237561ef56cad99c07930bee Mon Sep 17 00:00:00 2001 From: Yauheni Akhotnikau Date: Mon, 16 Oct 2023 15:43:07 +0300 Subject: [PATCH] A couple of new unit-tests. These unit-test check that use of the same disp_binder with fifo_t::cooperation for several coops leads to use separate event queue for every cooperation. --- .../so_5/disp/adv_thread_pool/CMakeLists.txt | 1 + .../so_5/disp/adv_thread_pool/build_tests.rb | 1 + .../cooperation_fifo_2/CMakeLists.txt | 2 + .../cooperation_fifo_2/main.cpp | 196 +++++++++++++++++ .../adv_thread_pool/cooperation_fifo_2/prj.rb | 11 + .../cooperation_fifo_2/prj.ut.rb | 7 + dev/test/so_5/disp/thread_pool/CMakeLists.txt | 1 + dev/test/so_5/disp/thread_pool/build_tests.rb | 1 + .../cooperation_fifo_2/CMakeLists.txt | 2 + .../thread_pool/cooperation_fifo_2/main.cpp | 197 ++++++++++++++++++ .../thread_pool/cooperation_fifo_2/prj.rb | 11 + .../thread_pool/cooperation_fifo_2/prj.ut.rb | 7 + 12 files changed, 437 insertions(+) create mode 100644 dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/CMakeLists.txt create mode 100644 dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/main.cpp create mode 100644 dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.rb create mode 100644 dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb create mode 100644 dev/test/so_5/disp/thread_pool/cooperation_fifo_2/CMakeLists.txt create mode 100644 dev/test/so_5/disp/thread_pool/cooperation_fifo_2/main.cpp create mode 100644 dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.rb create mode 100644 dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.ut.rb diff --git a/dev/test/so_5/disp/adv_thread_pool/CMakeLists.txt b/dev/test/so_5/disp/adv_thread_pool/CMakeLists.txt index b471e563..73716f0c 100644 --- a/dev/test/so_5/disp/adv_thread_pool/CMakeLists.txt +++ b/dev/test/so_5/disp/adv_thread_pool/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(chstate_in_safe) add_subdirectory(cooperation_fifo) +add_subdirectory(cooperation_fifo_2) add_subdirectory(individual_fifo) add_subdirectory(simple) add_subdirectory(subscr_in_safe) diff --git a/dev/test/so_5/disp/adv_thread_pool/build_tests.rb b/dev/test/so_5/disp/adv_thread_pool/build_tests.rb index d232f52e..b32054e6 100644 --- a/dev/test/so_5/disp/adv_thread_pool/build_tests.rb +++ b/dev/test/so_5/disp/adv_thread_pool/build_tests.rb @@ -7,6 +7,7 @@ required_prj( "test/so_5/disp/adv_thread_pool/chstate_in_safe/prj.ut.rb" ) required_prj( "test/so_5/disp/adv_thread_pool/subscr_in_safe/prj.ut.rb" ) required_prj( "test/so_5/disp/adv_thread_pool/cooperation_fifo/prj.ut.rb" ) + required_prj( "test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb" ) required_prj( "test/so_5/disp/adv_thread_pool/individual_fifo/prj.ut.rb" ) required_prj( "test/so_5/disp/adv_thread_pool/unsafe_after_safe/prj.ut.rb" ) required_prj( "test/so_5/disp/adv_thread_pool/custom_work_thread/prj.ut.rb" ) diff --git a/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/CMakeLists.txt b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/CMakeLists.txt new file mode 100644 index 00000000..0b65e907 --- /dev/null +++ b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/CMakeLists.txt @@ -0,0 +1,2 @@ +set(UNITTEST _unit.test.disp.adv_thread_pool.cooperation_fifo_2) +include(${CMAKE_SOURCE_DIR}/cmake/unittest.cmake) diff --git a/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/main.cpp b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/main.cpp new file mode 100644 index 00000000..151e9858 --- /dev/null +++ b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/main.cpp @@ -0,0 +1,196 @@ +/* + * A test for adv_thread_pool dispatcher when a single binder with + * cooperative_fifo is used for several coops. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "../for_each_lock_factory.hpp" + +namespace tp_disp = so_5::disp::adv_thread_pool; + +typedef std::set< so_5::current_thread_id_t > thread_id_set_t; + +class thread_id_collector_t + { + public : + void add_current_thread() + { + std::lock_guard< so_5::default_spinlock_t > l( m_lock ); + + m_set.insert( so_5::query_current_thread_id() ); + } + + std::size_t set_size() const + { + return m_set.size(); + } + + const thread_id_set_t & + query_set() const + { + return m_set; + } + + private : + so_5::default_spinlock_t m_lock; + thread_id_set_t m_set; + }; + +struct msg_shutdown : public so_5::signal_t {}; + +class a_test_t : public so_5::agent_t +{ + public: + a_test_t( + so_5::environment_t & env, + thread_id_collector_t & collector, + const so_5::mbox_t & shutdowner_mbox ) + : so_5::agent_t( env ) + , m_collector( collector ) + , m_shutdowner_mbox( shutdowner_mbox ) + { + } + + void + so_evt_start() override + { + // Block the current thread for some time. + // Because of that so_evt_start for an agent from a different + // coop has to be started on a separate thread. + std::this_thread::sleep_for( std::chrono::milliseconds{ 250 } ); + + m_collector.add_current_thread(); + + so_5::send< msg_shutdown >( m_shutdowner_mbox ); + } + + private : + thread_id_collector_t & m_collector; + const so_5::mbox_t m_shutdowner_mbox; +}; + +class a_shutdowner_t : public so_5::agent_t +{ + public : + a_shutdowner_t( + so_5::environment_t & env, + std::size_t working_agents ) + : so_5::agent_t( env ) + , m_working_agents( working_agents ) + {} + + void + so_define_agent() override + { + so_subscribe_self().event( [this](mhood_t< msg_shutdown >) { + --m_working_agents; + if( !m_working_agents ) + so_environment().stop(); + } ); + } + + private : + std::size_t m_working_agents; +}; + +const std::size_t cooperation_count = 4; + +void +run_sobjectizer( + tp_disp::queue_traits::lock_factory_t factory, + thread_id_collector_t & collector ) +{ + duration_meter_t duration( "running of test cooperations" ); + + so_5::launch( + [&]( so_5::environment_t & env ) + { + so_5::mbox_t shutdowner_mbox; + { + auto c = env.make_coop(); + auto a = c->make_agent< a_shutdowner_t >( cooperation_count ); + shutdowner_mbox = a->so_direct_mbox(); + env.register_coop( std::move( c ) ); + } + + auto disp = tp_disp::make_dispatcher( + env, + "adv_thread_pool", + tp_disp::disp_params_t{} + .thread_count( cooperation_count ) + .set_queue_params( tp_disp::queue_traits::queue_params_t{} + .lock_factory( factory ) ) ); + + auto params = tp_disp::bind_params_t{} + .fifo( tp_disp::fifo_t::cooperation ); + auto the_same_binder = disp.binder( params ); + + for( std::size_t i = 0; i != cooperation_count; ++i ) + { + auto c = env.make_coop( the_same_binder ); + c->make_agent< a_test_t >( collector, shutdowner_mbox ); + env.register_coop( std::move( c ) ); + } + } ); +} + +void +analyze_results( const thread_id_collector_t & collector ) +{ + if( cooperation_count != collector.set_size() ) + { + throw std::runtime_error{ + "there is a set with size: " + std::to_string( collector.set_size() ) + }; + } +} + +void +run_and_check( + tp_disp::queue_traits::lock_factory_t factory ) +{ + thread_id_collector_t collector; + + run_sobjectizer( factory, collector ); + + analyze_results( collector ); +} + +int +main() +{ + try + { + for_each_lock_factory( []( tp_disp::queue_traits::lock_factory_t factory ) { + run_with_time_limit( + [&]() + { + run_and_check( factory ); + }, + 240 ); + } ); + } + catch( const std::exception & ex ) + { + std::cerr << "Error: " << ex.what() << std::endl; + return 1; + } + + return 0; +} + diff --git a/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.rb b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.rb new file mode 100644 index 00000000..db14aba2 --- /dev/null +++ b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.rb @@ -0,0 +1,11 @@ +require 'mxx_ru/cpp' + +MxxRu::Cpp::exe_target { + + required_prj( "so_5/prj.rb" ) + + target( "_unit.test.disp.adv_thread_pool.cooperation_fifo_2" ) + + cpp_source( "main.cpp" ) +} + diff --git a/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb new file mode 100644 index 00000000..b4b83491 --- /dev/null +++ b/dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb @@ -0,0 +1,7 @@ +require 'mxx_ru/binary_unittest' + +Mxx_ru::setup_target( + Mxx_ru::Binary_unittest_target.new( + "test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb", + "test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.rb" ) +) diff --git a/dev/test/so_5/disp/thread_pool/CMakeLists.txt b/dev/test/so_5/disp/thread_pool/CMakeLists.txt index a2f5f6ec..596ecfae 100644 --- a/dev/test/so_5/disp/thread_pool/CMakeLists.txt +++ b/dev/test/so_5/disp/thread_pool/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(simple) add_subdirectory(cooperation_fifo) +add_subdirectory(cooperation_fifo_2) add_subdirectory(individual_fifo) add_subdirectory(threshold) add_subdirectory(custom_work_thread) diff --git a/dev/test/so_5/disp/thread_pool/build_tests.rb b/dev/test/so_5/disp/thread_pool/build_tests.rb index 5e2efaca..855aa1e0 100644 --- a/dev/test/so_5/disp/thread_pool/build_tests.rb +++ b/dev/test/so_5/disp/thread_pool/build_tests.rb @@ -7,6 +7,7 @@ required_prj( "#{path}/simple/prj.ut.rb" ) required_prj( "#{path}/cooperation_fifo/prj.ut.rb" ) + required_prj( "#{path}/cooperation_fifo_2/prj.ut.rb" ) required_prj( "#{path}/individual_fifo/prj.ut.rb" ) required_prj( "#{path}/threshold/prj.ut.rb" ) required_prj( "#{path}/custom_work_thread/prj.ut.rb" ) diff --git a/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/CMakeLists.txt b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/CMakeLists.txt new file mode 100644 index 00000000..c8d6aa33 --- /dev/null +++ b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/CMakeLists.txt @@ -0,0 +1,2 @@ +set(UNITTEST _unit.test.disp.thread_pool.cooperation_fifo_2) +include(${CMAKE_SOURCE_DIR}/cmake/unittest.cmake) diff --git a/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/main.cpp b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/main.cpp new file mode 100644 index 00000000..5c660efe --- /dev/null +++ b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/main.cpp @@ -0,0 +1,197 @@ +/* + * A test for thread_pool dispatcher when a single binder with + * cooperative_fifo is used for several coops. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "../for_each_lock_factory.hpp" + +namespace tp_disp = so_5::disp::thread_pool; + +typedef std::set< so_5::current_thread_id_t > thread_id_set_t; + +class thread_id_collector_t + { + public : + void add_current_thread() + { + std::lock_guard< so_5::default_spinlock_t > l( m_lock ); + + m_set.insert( so_5::query_current_thread_id() ); + } + + std::size_t set_size() const + { + return m_set.size(); + } + + const thread_id_set_t & + query_set() const + { + return m_set; + } + + private : + so_5::default_spinlock_t m_lock; + thread_id_set_t m_set; + }; + +struct msg_shutdown : public so_5::signal_t {}; + +class a_test_t : public so_5::agent_t +{ + public: + a_test_t( + so_5::environment_t & env, + thread_id_collector_t & collector, + const so_5::mbox_t & shutdowner_mbox ) + : so_5::agent_t( env ) + , m_collector( collector ) + , m_shutdowner_mbox( shutdowner_mbox ) + { + } + + void + so_evt_start() override + { + // Block the current thread for some time. + // Because of that so_evt_start for an agent from a different + // coop has to be started on a separate thread. + std::this_thread::sleep_for( std::chrono::milliseconds{ 250 } ); + + m_collector.add_current_thread(); + + so_5::send< msg_shutdown >( m_shutdowner_mbox ); + } + + private : + thread_id_collector_t & m_collector; + const so_5::mbox_t m_shutdowner_mbox; +}; + +class a_shutdowner_t : public so_5::agent_t +{ + public : + a_shutdowner_t( + so_5::environment_t & env, + std::size_t working_agents ) + : so_5::agent_t( env ) + , m_working_agents( working_agents ) + {} + + void + so_define_agent() override + { + so_subscribe_self().event( [this](mhood_t< msg_shutdown >) { + --m_working_agents; + if( !m_working_agents ) + so_environment().stop(); + } ); + } + + private : + std::size_t m_working_agents; +}; + +const std::size_t cooperation_count = 4; + +void +run_sobjectizer( + tp_disp::queue_traits::lock_factory_t factory, + thread_id_collector_t & collector ) +{ + duration_meter_t duration( "running of test cooperations" ); + + so_5::launch( + [&]( so_5::environment_t & env ) + { + so_5::mbox_t shutdowner_mbox; + { + auto c = env.make_coop(); + auto a = c->make_agent< a_shutdowner_t >( cooperation_count ); + shutdowner_mbox = a->so_direct_mbox(); + env.register_coop( std::move( c ) ); + } + + auto disp = tp_disp::make_dispatcher( + env, + "thread_pool", + tp_disp::disp_params_t{} + .thread_count( cooperation_count ) + .set_queue_params( tp_disp::queue_traits::queue_params_t{} + .lock_factory( factory ) ) ); + + auto params = tp_disp::bind_params_t{} + .max_demands_at_once( 1024 ) + .fifo( tp_disp::fifo_t::cooperation ); + auto the_same_binder = disp.binder( params ); + + for( std::size_t i = 0; i != cooperation_count; ++i ) + { + auto c = env.make_coop( the_same_binder ); + c->make_agent< a_test_t >( collector, shutdowner_mbox ); + env.register_coop( std::move( c ) ); + } + } ); +} + +void +analyze_results( const thread_id_collector_t & collector ) +{ + if( cooperation_count != collector.set_size() ) + { + throw std::runtime_error{ + "there is a set with size: " + std::to_string( collector.set_size() ) + }; + } +} + +void +run_and_check( + tp_disp::queue_traits::lock_factory_t factory ) +{ + thread_id_collector_t collector; + + run_sobjectizer( factory, collector ); + + analyze_results( collector ); +} + +int +main() +{ + try + { + for_each_lock_factory( []( tp_disp::queue_traits::lock_factory_t factory ) { + run_with_time_limit( + [&]() + { + run_and_check( factory ); + }, + 240 ); + } ); + } + catch( const std::exception & ex ) + { + std::cerr << "Error: " << ex.what() << std::endl; + return 1; + } + + return 0; +} + diff --git a/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.rb b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.rb new file mode 100644 index 00000000..16ac3691 --- /dev/null +++ b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.rb @@ -0,0 +1,11 @@ +require 'mxx_ru/cpp' + +MxxRu::Cpp::exe_target { + + required_prj( "so_5/prj.rb" ) + + target( "_unit.test.disp.thread_pool.cooperation_fifo_2" ) + + cpp_source( "main.cpp" ) +} + diff --git a/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.ut.rb b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.ut.rb new file mode 100644 index 00000000..d6a1cba3 --- /dev/null +++ b/dev/test/so_5/disp/thread_pool/cooperation_fifo_2/prj.ut.rb @@ -0,0 +1,7 @@ +require 'mxx_ru/binary_unittest' + +Mxx_ru::setup_target( + Mxx_ru::Binary_unittest_target.new( + "test/so_5/disp/thread_pool/cooperation_fifo_2/prj.ut.rb", + "test/so_5/disp/thread_pool/cooperation_fifo_2/prj.rb" ) +)