Skip to content

Commit

Permalink
wasp_c_extensions/_cgc: a new class - CGCSmartPointer (a mix of Concu…
Browse files Browse the repository at this point in the history
…rrentGCItem and SmartPointer)
  • Loading branch information
a1ezzz committed Sep 12, 2022
1 parent d19bf27 commit a620118
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 15 deletions.
93 changes: 87 additions & 6 deletions tests/wasp_c_extensions_cgc_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
Expand All @@ -9,21 +10,33 @@

#include "wasp_c_extensions/_cgc/cgc.hpp"

using namespace std::chrono_literals;

namespace wasp::cgc_test::test_case {
const size_t gc_threads_num = 1024;
const size_t gc_items_per_thread = 10240;
wasp::cgc::ConcurrentGarbageCollector* global_gc = NULL;

const size_t sp_simple_test_runs = 100;
const size_t sp_simple_threads_num = 10240;
const size_t sp_simple_test_runs = 50;
const std::chrono::milliseconds sp_simple_sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(300ms);
const size_t sp_simple_threads_num = 1000;
const size_t sp_simple_acquire_per_thread = 3;

const size_t sp_advanced_test_runs = 1000;
const size_t sp_advanced_test_runs = 100;
const std::chrono::milliseconds sp_advanced_sleep_time =
std::chrono::duration_cast<std::chrono::milliseconds>(300ms);
const size_t sp_advanced_threads_num = 100;
const size_t sp_advanced_acquire_per_thread = 3;

wasp::cgc::SmartPointer* global_sp = NULL;

const size_t gc_sp_advanced_test_runs = 50;
const std::chrono::milliseconds gc_sp_sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(300ms);
const size_t gc_sp_advanced_threads_num = 1000;
const size_t gc_sp_advanced_acquire_per_thread = 3;

wasp::cgc::CGCSmartPointer* global_gc_sp = NULL;

std::atomic<bool> start_event_flag(false);
std::condition_variable start_event_cv;
std::mutex start_event_mutex;
Expand Down Expand Up @@ -146,14 +159,18 @@ class TestSmartPointer:
wasp::cgc_test::test_case::global_gc = new wasp::cgc::ConcurrentGarbageCollector();

for (size_t i=0; i < wasp::cgc_test::test_case::sp_simple_test_runs; i++){
this->threads_test(wasp::cgc_test::test_case::sp_simple_threads_num, TestSmartPointer::sp_simple_threaded_fn);
this->threads_test(
wasp::cgc_test::test_case::sp_simple_threads_num,
TestSmartPointer::sp_simple_threaded_fn,
wasp::cgc_test::test_case::sp_simple_sleep_time
);
}

delete wasp::cgc_test::test_case::global_gc;
wasp::cgc_test::test_case::global_gc = NULL;
}

void threads_test(const size_t threads_num, void (*f) ()){
void threads_test(const size_t threads_num, void (*f) (), std::chrono::milliseconds sleep_time){
std::thread* threads[threads_num];
SampleCGCItem* item_ptr = SampleCGCItem::create();
wasp::cgc_test::test_case::global_sp = new wasp::cgc::SmartPointer(item_ptr);
Expand All @@ -164,6 +181,7 @@ class TestSmartPointer:
threads[i] = new std::thread(f);
}

std::this_thread::sleep_for(sleep_time);
wasp::cgc_test::test_case::global_sp->release();

for (size_t i=0; i < threads_num; i++){
Expand Down Expand Up @@ -195,7 +213,9 @@ class TestSmartPointer:

for (size_t i=0; i < wasp::cgc_test::test_case::sp_advanced_test_runs; i++){
this->threads_test(
wasp::cgc_test::test_case::sp_advanced_threads_num, TestSmartPointer::sp_advanced_threaded_fn
wasp::cgc_test::test_case::sp_advanced_threads_num,
TestSmartPointer::sp_advanced_threaded_fn,
wasp::cgc_test::test_case::sp_advanced_sleep_time
);
}

Expand Down Expand Up @@ -226,6 +246,67 @@ class TestSmartPointer:
}
};

class TestCGCSmartPointer:
public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(TestCGCSmartPointer);
CPPUNIT_TEST(test_concurrency);
CPPUNIT_TEST_SUITE_END();

void test_concurrency(){
wasp::cgc_test::test_case::global_gc = new wasp::cgc::ConcurrentGarbageCollector();

for (size_t i=0; i < wasp::cgc_test::test_case::gc_sp_advanced_test_runs; i++){
this->threads_test();
}

delete wasp::cgc_test::test_case::global_gc;
wasp::cgc_test::test_case::global_gc = NULL;
}

void threads_test(){
std::thread* threads[wasp::cgc_test::test_case::gc_sp_advanced_threads_num];
SampleCGCItem* item_ptr = SampleCGCItem::create();
wasp::cgc_test::test_case::global_gc_sp = new wasp::cgc::CGCSmartPointer(
&SampleCGCItem::destroy, item_ptr
);

wasp::cgc_test::test_case::global_gc->push(item_ptr);
wasp::cgc_test::test_case::global_gc->push(wasp::cgc_test::test_case::global_gc_sp);

for (size_t i=0; i < wasp::cgc_test::test_case::gc_sp_advanced_threads_num; i++){
threads[i] = new std::thread(TestCGCSmartPointer::threaded_fn);
}

std::this_thread::sleep_for(wasp::cgc_test::test_case::gc_sp_sleep_time);
wasp::cgc_test::test_case::global_gc_sp->destroyable();
wasp::cgc_test::test_case::global_gc_sp->release();

for (size_t i=0; i < wasp::cgc_test::test_case::gc_sp_advanced_threads_num; i++){
threads[i]->join();
delete threads[i];
}

wasp::cgc_test::test_case::global_gc_sp = NULL;
}

static void threaded_fn(){
if (wasp::cgc_test::test_case::global_gc_sp->acquire()){

for (size_t i=1; i < wasp::cgc_test::test_case::sp_advanced_acquire_per_thread; i++){
CPPUNIT_ASSERT(wasp::cgc_test::test_case::global_gc_sp->acquire() != NULL);
}

for (size_t i=0; i < wasp::cgc_test::test_case::sp_advanced_acquire_per_thread; i++){
wasp::cgc_test::test_case::global_gc_sp->release();
wasp::cgc_test::test_case::global_gc->collect();
}
}
}
};

CPPUNIT_TEST_SUITE_REGISTRATION(TestWaspCGCTest);

CPPUNIT_TEST_SUITE_REGISTRATION(TestSmartPointer);

CPPUNIT_TEST_SUITE_REGISTRATION(TestCGCSmartPointer);
77 changes: 71 additions & 6 deletions wasp_c_extensions/_cgc/cgc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,6 @@ bool ResourceSmartLock::acquire(){

this->concurrency_call_counter.fetch_add(1, std::memory_order_seq_cst);

if(! this->concurrency_liveness_flag.load(std::memory_order_seq_cst)){
this->concurrency_call_counter.fetch_sub(1, std::memory_order_seq_cst);
return false;
}

this->concurrency_liveness_flag.store(true, std::memory_order_seq_cst);

this->usage_counter.fetch_add(1, std::memory_order_seq_cst);
Expand Down Expand Up @@ -296,4 +291,74 @@ bool SmartPointer::replace(PointerDestructor* new_ptr){
}

return false;
}
}

CGCSmartPointer::CGCSmartPointer(void (*destroy_fn)(PointerDestructor*), PointerDestructor* p):
ConcurrentGCItem(destroy_fn),
SmartPointer(p),
destroyable_request_flag(false),
destroyable_commit_flag(false),
pending_releases(1)
{}

CGCSmartPointer::~CGCSmartPointer()
{
assert(this->destroyable_request_flag);
assert(this->destroyable_commit_flag.load(std::memory_order_seq_cst));
assert(this->pending_releases.load(std::memory_order_seq_cst) == 0);
}

void CGCSmartPointer::destroyable(){
this->destroyable_request_flag = true;
this->check_and_fall();
}

void CGCSmartPointer::check_and_fall()
{
if (! this->destroyable_commit_flag.load(std::memory_order_seq_cst)){
if (this->destroyable_request_flag){

if (this->pending_releases.load(std::memory_order_seq_cst) == 0){
if (! this->destroyable_commit_flag.exchange(true, std::memory_order_seq_cst)){
this->ConcurrentGCItem::destroyable();
}
}
}
}
}

PointerDestructor* CGCSmartPointer::acquire(){
PointerDestructor* result = NULL;

this->pending_releases.fetch_add(1, std::memory_order_seq_cst);

if (! this->destroyable_request_flag){
result = this->SmartPointer::acquire();
if (result){
return result;
}
}

this->pending_releases.fetch_sub(1, std::memory_order_seq_cst);
this->check_and_fall();
return NULL;
}

void CGCSmartPointer::release(){
this->SmartPointer::release();
this->pending_releases.fetch_sub(1, std::memory_order_seq_cst);
this->check_and_fall();
}

bool CGCSmartPointer::replace(PointerDestructor* new_ptr){
if (! this->destroyable_request_flag){
if (this->SmartPointer::replace(new_ptr)){
this->pending_releases.fetch_add(1, std::memory_order_seq_cst);
return true;
};
return false;
}

this->check_and_fall();
return false;
}
29 changes: 26 additions & 3 deletions wasp_c_extensions/_cgc/cgc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ConcurrentGCItem:
ConcurrentGCItem(void (*destroy_fn)(PointerDestructor*));
virtual ~ConcurrentGCItem();

void destroyable();
virtual void destroyable();
};

class ResourceSmartLock{
Expand Down Expand Up @@ -109,7 +109,6 @@ class ResourceSmartLock{

class SmartPointer
{

ResourceSmartLock pointer_lock;
std::atomic<PointerDestructor*> pointer;
std::atomic<PointerDestructor*> zombie_pointer; // this pointer protect the pointer_lock.reset method
Expand All @@ -121,8 +120,32 @@ class SmartPointer

virtual PointerDestructor* acquire();
virtual void release();
virtual bool replace(PointerDestructor* new_ptr);
};

class CGCSmartPointer:
public ConcurrentGCItem,
public SmartPointer
{
volatile bool destroyable_request_flag;
std::atomic<bool> destroyable_commit_flag;
std::atomic<size_t> pending_releases;

void check_and_fall();

public:
CGCSmartPointer(void (*destroy_fn)(PointerDestructor*), PointerDestructor*);
virtual ~CGCSmartPointer();

// SmartPointer methods

virtual PointerDestructor* acquire();
virtual void release();
virtual bool replace(PointerDestructor* new_ptr);

// ConcurrentGCItem method

bool replace(PointerDestructor* new_ptr);
virtual void destroyable(); // concurrency with release/replace
};

}; // namespace wasp::cgc
Expand Down

0 comments on commit a620118

Please sign in to comment.