From a6201181c575f7d0c62e18d433f9195bfd55177b Mon Sep 17 00:00:00 2001 From: Ildar Gafurov Date: Mon, 12 Sep 2022 21:00:45 +0300 Subject: [PATCH] wasp_c_extensions/_cgc: a new class - CGCSmartPointer (a mix of ConcurrentGCItem and SmartPointer) --- tests/wasp_c_extensions_cgc_test.cpp | 93 ++++++++++++++++++++++++++-- wasp_c_extensions/_cgc/cgc.cpp | 77 +++++++++++++++++++++-- wasp_c_extensions/_cgc/cgc.hpp | 29 ++++++++- 3 files changed, 184 insertions(+), 15 deletions(-) diff --git a/tests/wasp_c_extensions_cgc_test.cpp b/tests/wasp_c_extensions_cgc_test.cpp index 9e3335c..5b56193 100644 --- a/tests/wasp_c_extensions_cgc_test.cpp +++ b/tests/wasp_c_extensions_cgc_test.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -9,21 +10,33 @@ #include "wasp_c_extensions/_cgc/cgc.hpp" +using namespace std::chrono_literals; + namespace wasp::cgc_test::test_case { const size_t gc_threads_num = 1024; const size_t gc_items_per_thread = 10240; wasp::cgc::ConcurrentGarbageCollector* global_gc = NULL; - const size_t sp_simple_test_runs = 100; - const size_t sp_simple_threads_num = 10240; + const size_t sp_simple_test_runs = 50; + const std::chrono::milliseconds sp_simple_sleep_time = std::chrono::duration_cast(300ms); + const size_t sp_simple_threads_num = 1000; const size_t sp_simple_acquire_per_thread = 3; - const size_t sp_advanced_test_runs = 1000; + const size_t sp_advanced_test_runs = 100; + const std::chrono::milliseconds sp_advanced_sleep_time = + std::chrono::duration_cast(300ms); const size_t sp_advanced_threads_num = 100; const size_t sp_advanced_acquire_per_thread = 3; wasp::cgc::SmartPointer* global_sp = NULL; + const size_t gc_sp_advanced_test_runs = 50; + const std::chrono::milliseconds gc_sp_sleep_time = std::chrono::duration_cast(300ms); + const size_t gc_sp_advanced_threads_num = 1000; + const size_t gc_sp_advanced_acquire_per_thread = 3; + + wasp::cgc::CGCSmartPointer* global_gc_sp = NULL; + std::atomic start_event_flag(false); std::condition_variable start_event_cv; std::mutex start_event_mutex; @@ -146,14 +159,18 @@ class TestSmartPointer: wasp::cgc_test::test_case::global_gc = new wasp::cgc::ConcurrentGarbageCollector(); for (size_t i=0; i < wasp::cgc_test::test_case::sp_simple_test_runs; i++){ - this->threads_test(wasp::cgc_test::test_case::sp_simple_threads_num, TestSmartPointer::sp_simple_threaded_fn); + this->threads_test( + wasp::cgc_test::test_case::sp_simple_threads_num, + TestSmartPointer::sp_simple_threaded_fn, + wasp::cgc_test::test_case::sp_simple_sleep_time + ); } delete wasp::cgc_test::test_case::global_gc; wasp::cgc_test::test_case::global_gc = NULL; } - void threads_test(const size_t threads_num, void (*f) ()){ + void threads_test(const size_t threads_num, void (*f) (), std::chrono::milliseconds sleep_time){ std::thread* threads[threads_num]; SampleCGCItem* item_ptr = SampleCGCItem::create(); wasp::cgc_test::test_case::global_sp = new wasp::cgc::SmartPointer(item_ptr); @@ -164,6 +181,7 @@ class TestSmartPointer: threads[i] = new std::thread(f); } + std::this_thread::sleep_for(sleep_time); wasp::cgc_test::test_case::global_sp->release(); for (size_t i=0; i < threads_num; i++){ @@ -195,7 +213,9 @@ class TestSmartPointer: for (size_t i=0; i < wasp::cgc_test::test_case::sp_advanced_test_runs; i++){ this->threads_test( - wasp::cgc_test::test_case::sp_advanced_threads_num, TestSmartPointer::sp_advanced_threaded_fn + wasp::cgc_test::test_case::sp_advanced_threads_num, + TestSmartPointer::sp_advanced_threaded_fn, + wasp::cgc_test::test_case::sp_advanced_sleep_time ); } @@ -226,6 +246,67 @@ class TestSmartPointer: } }; +class TestCGCSmartPointer: + public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE(TestCGCSmartPointer); + CPPUNIT_TEST(test_concurrency); + CPPUNIT_TEST_SUITE_END(); + + void test_concurrency(){ + wasp::cgc_test::test_case::global_gc = new wasp::cgc::ConcurrentGarbageCollector(); + + for (size_t i=0; i < wasp::cgc_test::test_case::gc_sp_advanced_test_runs; i++){ + this->threads_test(); + } + + delete wasp::cgc_test::test_case::global_gc; + wasp::cgc_test::test_case::global_gc = NULL; + } + + void threads_test(){ + std::thread* threads[wasp::cgc_test::test_case::gc_sp_advanced_threads_num]; + SampleCGCItem* item_ptr = SampleCGCItem::create(); + wasp::cgc_test::test_case::global_gc_sp = new wasp::cgc::CGCSmartPointer( + &SampleCGCItem::destroy, item_ptr + ); + + wasp::cgc_test::test_case::global_gc->push(item_ptr); + wasp::cgc_test::test_case::global_gc->push(wasp::cgc_test::test_case::global_gc_sp); + + for (size_t i=0; i < wasp::cgc_test::test_case::gc_sp_advanced_threads_num; i++){ + threads[i] = new std::thread(TestCGCSmartPointer::threaded_fn); + } + + std::this_thread::sleep_for(wasp::cgc_test::test_case::gc_sp_sleep_time); + wasp::cgc_test::test_case::global_gc_sp->destroyable(); + wasp::cgc_test::test_case::global_gc_sp->release(); + + for (size_t i=0; i < wasp::cgc_test::test_case::gc_sp_advanced_threads_num; i++){ + threads[i]->join(); + delete threads[i]; + } + + wasp::cgc_test::test_case::global_gc_sp = NULL; + } + + static void threaded_fn(){ + if (wasp::cgc_test::test_case::global_gc_sp->acquire()){ + + for (size_t i=1; i < wasp::cgc_test::test_case::sp_advanced_acquire_per_thread; i++){ + CPPUNIT_ASSERT(wasp::cgc_test::test_case::global_gc_sp->acquire() != NULL); + } + + for (size_t i=0; i < wasp::cgc_test::test_case::sp_advanced_acquire_per_thread; i++){ + wasp::cgc_test::test_case::global_gc_sp->release(); + wasp::cgc_test::test_case::global_gc->collect(); + } + } + } +}; + CPPUNIT_TEST_SUITE_REGISTRATION(TestWaspCGCTest); CPPUNIT_TEST_SUITE_REGISTRATION(TestSmartPointer); + +CPPUNIT_TEST_SUITE_REGISTRATION(TestCGCSmartPointer); diff --git a/wasp_c_extensions/_cgc/cgc.cpp b/wasp_c_extensions/_cgc/cgc.cpp index 316111a..0960b44 100644 --- a/wasp_c_extensions/_cgc/cgc.cpp +++ b/wasp_c_extensions/_cgc/cgc.cpp @@ -191,11 +191,6 @@ bool ResourceSmartLock::acquire(){ this->concurrency_call_counter.fetch_add(1, std::memory_order_seq_cst); - if(! this->concurrency_liveness_flag.load(std::memory_order_seq_cst)){ - this->concurrency_call_counter.fetch_sub(1, std::memory_order_seq_cst); - return false; - } - this->concurrency_liveness_flag.store(true, std::memory_order_seq_cst); this->usage_counter.fetch_add(1, std::memory_order_seq_cst); @@ -296,4 +291,74 @@ bool SmartPointer::replace(PointerDestructor* new_ptr){ } return false; -} \ No newline at end of file +} + +CGCSmartPointer::CGCSmartPointer(void (*destroy_fn)(PointerDestructor*), PointerDestructor* p): + ConcurrentGCItem(destroy_fn), + SmartPointer(p), + destroyable_request_flag(false), + destroyable_commit_flag(false), + pending_releases(1) +{} + +CGCSmartPointer::~CGCSmartPointer() +{ + assert(this->destroyable_request_flag); + assert(this->destroyable_commit_flag.load(std::memory_order_seq_cst)); + assert(this->pending_releases.load(std::memory_order_seq_cst) == 0); +} + +void CGCSmartPointer::destroyable(){ + this->destroyable_request_flag = true; + this->check_and_fall(); +} + +void CGCSmartPointer::check_and_fall() +{ + if (! this->destroyable_commit_flag.load(std::memory_order_seq_cst)){ + if (this->destroyable_request_flag){ + + if (this->pending_releases.load(std::memory_order_seq_cst) == 0){ + if (! this->destroyable_commit_flag.exchange(true, std::memory_order_seq_cst)){ + this->ConcurrentGCItem::destroyable(); + } + } + } + } +} + +PointerDestructor* CGCSmartPointer::acquire(){ + PointerDestructor* result = NULL; + + this->pending_releases.fetch_add(1, std::memory_order_seq_cst); + + if (! this->destroyable_request_flag){ + result = this->SmartPointer::acquire(); + if (result){ + return result; + } + } + + this->pending_releases.fetch_sub(1, std::memory_order_seq_cst); + this->check_and_fall(); + return NULL; +} + +void CGCSmartPointer::release(){ + this->SmartPointer::release(); + this->pending_releases.fetch_sub(1, std::memory_order_seq_cst); + this->check_and_fall(); +} + +bool CGCSmartPointer::replace(PointerDestructor* new_ptr){ + if (! this->destroyable_request_flag){ + if (this->SmartPointer::replace(new_ptr)){ + this->pending_releases.fetch_add(1, std::memory_order_seq_cst); + return true; + }; + return false; + } + + this->check_and_fall(); + return false; +} diff --git a/wasp_c_extensions/_cgc/cgc.hpp b/wasp_c_extensions/_cgc/cgc.hpp index 299cf70..0dcec8c 100644 --- a/wasp_c_extensions/_cgc/cgc.hpp +++ b/wasp_c_extensions/_cgc/cgc.hpp @@ -79,7 +79,7 @@ class ConcurrentGCItem: ConcurrentGCItem(void (*destroy_fn)(PointerDestructor*)); virtual ~ConcurrentGCItem(); - void destroyable(); + virtual void destroyable(); }; class ResourceSmartLock{ @@ -109,7 +109,6 @@ class ResourceSmartLock{ class SmartPointer { - ResourceSmartLock pointer_lock; std::atomic pointer; std::atomic zombie_pointer; // this pointer protect the pointer_lock.reset method @@ -121,8 +120,32 @@ class SmartPointer virtual PointerDestructor* acquire(); virtual void release(); + virtual bool replace(PointerDestructor* new_ptr); +}; + +class CGCSmartPointer: + public ConcurrentGCItem, + public SmartPointer +{ + volatile bool destroyable_request_flag; + std::atomic destroyable_commit_flag; + std::atomic pending_releases; + + void check_and_fall(); + + public: + CGCSmartPointer(void (*destroy_fn)(PointerDestructor*), PointerDestructor*); + virtual ~CGCSmartPointer(); + + // SmartPointer methods + + virtual PointerDestructor* acquire(); + virtual void release(); + virtual bool replace(PointerDestructor* new_ptr); + + // ConcurrentGCItem method - bool replace(PointerDestructor* new_ptr); + virtual void destroyable(); // concurrency with release/replace }; }; // namespace wasp::cgc