Skip to content

Commit

Permalink
wasp_c_extensions/_pqueue: a new class -- RenewableSmartLock (interme…
Browse files Browse the repository at this point in the history
…diate changes)
  • Loading branch information
a1ezzz committed Feb 26, 2023
1 parent b87676f commit 9618f54
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 72 deletions.
63 changes: 58 additions & 5 deletions tests/tests_fixtures.cpp
Expand Up @@ -22,31 +22,84 @@

using namespace wasp::tests_fixtures;

void ThreadsRunner::start_threads(std::string tag, size_t count, void (*threaded_fn)()){
template<>
std::list<size_t> wasp::tests_fixtures::sequence_generator<0>() {
return {};
};

void ThreadsRunner::start_threads(std::string tag, size_t count, std::function<void()>threaded_fn, bool delayed_start){
std::atomic<bool>* flag = NULL;
std::mutex* mutex = NULL;
std::condition_variable* cv = NULL;
threads_vector _threads;
tagged_threads::iterator it = this->threads.find(tag);
std::function<void()> wrapped_function = threaded_fn;

if (it != this->threads.end()){
throw std::invalid_argument("The specified threads are started already");
}

if (delayed_start){
flag = new std::atomic<bool>(false);
mutex = new std::mutex();
cv = new std::condition_variable();

wrapped_function = [&flag, &mutex, &cv, &threaded_fn](){
while (! flag->load(std::memory_order_seq_cst)){
std::unique_lock<std::mutex> lock(*mutex);
cv->wait(lock);
}
threaded_fn();
};
}

for (size_t i = 0; i < count; i++){
_threads.push_back(new std::thread(threaded_fn));
}

this->threads.insert(std::make_pair(tag, _threads));
this->threads.insert(std::make_pair(tag, threads_tuple(_threads, flag, mutex, cv)));
};

void ThreadsRunner::resume_threads(std::string tag){
tagged_threads::iterator it = this->threads.find(tag);

if (it == this->threads.end()){
throw std::invalid_argument("The specified threads hasn't been started yet");
}

if (! this->get_item<1>(it)){
throw std::invalid_argument("The specified threads doesn't await");
}

{
this->get_item<1>(it)->store(true, std::memory_order_seq_cst);
std::lock_guard<std::mutex> lock(*(this->get_item<2>(it)));
this->get_item<3>(it)->notify_all();
}
}

void ThreadsRunner::join_threads(std::string tag){
tagged_threads::iterator it = this->threads.find(tag);

if (it == this->threads.end()){
throw std::invalid_argument("The specified threads hasn't been started yet");
}

for (size_t i = 0; i < it->second.size(); i++){
it->second[i]->join();
delete it->second[i];
for (size_t i = 0; i < this->get_item<0>(it).size(); i++){
this->get_item<0>(it)[i]->join();
delete this->get_item<0>(it)[i];
}

if (this->get_item<1>(it)){
delete this->get_item<1>(it);
}

if (this->get_item<2>(it)){
delete this->get_item<2>(it);
}

if (this->get_item<3>(it)){
delete this->get_item<3>(it);
}

this->threads.erase(it);
Expand Down
26 changes: 24 additions & 2 deletions tests/tests_fixtures.hpp
Expand Up @@ -19,22 +19,44 @@
//along with wasp-c-extensions. If not, see <http://www.gnu.org/licenses/>.

#include <vector>
#include <atomic>
#include <map>
#include <list>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>

#include <cppunit/TestCase.h>

namespace wasp::tests_fixtures {

template<size_t count>
std::list<size_t> sequence_generator(){
std::list<size_t> result = sequence_generator<count - 1>();
result.push_back(count);
return result;
};

template<>
std::list<size_t> sequence_generator<0>();

class ThreadsRunner:
public CppUnit::TestFixture
{
typedef std::vector<std::thread*> threads_vector;
typedef std::map<std::string, threads_vector> tagged_threads;
typedef std::tuple<threads_vector, std::atomic<bool>*, std::mutex*, std::condition_variable*> threads_tuple;
typedef std::map<std::string, threads_tuple> tagged_threads;
tagged_threads threads;

template<int a>
auto get_item(tagged_threads::iterator it){
return std::get<a>(it->second);
};

public:
void start_threads(std::string tag, size_t count, void (*threaded_fn)());
void start_threads(std::string tag, size_t count, std::function<void()>threaded_fn, bool delayed_start=false);
void resume_threads(std::string tag);
void join_threads(std::string tag);
};

Expand Down
46 changes: 45 additions & 1 deletion tests/wasp_c_extensions_smart_ptr__dp_test.cpp
Expand Up @@ -19,10 +19,10 @@ class TestSmartDualPointer:
CPPUNIT_TEST(test_create_setup_destroy);
CPPUNIT_TEST(test_multiple_setup_seq);
CPPUNIT_TEST(test_multiple_setup_all);
// CPPUNIT_TEST_PARAMETERIZED(test_concurrency, wasp::tests_fixtures::sequence_generator<10>());
CPPUNIT_TEST_SUITE_END();

static const size_t items_count = 100;
static const size_t acquire_threads_count = 50;
wasp::cgc::ConcurrentGarbageCollector* gc;
wasp::cgc::ConcurrentGCItem* items[items_count];
wasp::cgc::CGCSmartPointer<wasp::cgc::ConcurrentGCItem>* smart_ptrs[items_count];
Expand Down Expand Up @@ -81,6 +81,50 @@ class TestSmartDualPointer:
delete dp;
}

void test_concurrency(size_t){
wasp::cgc::SmartDualPointer<wasp::cgc::ConcurrentGCItem> *dp =
new wasp::cgc::SmartDualPointer<wasp::cgc::ConcurrentGCItem>();

volatile bool acquire_is_on = true;

this->start_threads(
"acquire_threads",
3,
[&dp, &acquire_is_on](){TestSmartDualPointer::concurrency_acquire_thread_fn(dp, &acquire_is_on);},
true
);

dp->setup_next(this->smart_ptrs[0]);
this->resume_threads("acquire_threads");

// TODO: test it with the parallel setup_next
for (size_t i = 1; i < this->items_count; i++){
dp->setup_next(this->smart_ptrs[i]);
std::this_thread::sleep_for(10ms);
}

acquire_is_on = false;
this->join_threads("acquire_threads" );

delete dp;
}

static void concurrency_acquire_thread_fn(
wasp::cgc::SmartDualPointer<wasp::cgc::ConcurrentGCItem>* dp,
volatile bool *acquire_is_on
){
wasp::cgc::CGCSmartPointer<wasp::cgc::ConcurrentGCItem>* acquire_result = NULL;

while (*acquire_is_on){
do {
acquire_result = dp->acquire_next();
}
while (! acquire_result);

acquire_result->release();
}
}

public:
void setUp(){
this->gc = new wasp::cgc::ConcurrentGarbageCollector();
Expand Down
82 changes: 82 additions & 0 deletions tests/wasp_c_extensions_smart_ptr__r7esl_test.cpp
@@ -0,0 +1,82 @@

#include <cppunit/extensions/HelperMacros.h>

#include "wasp_c_extensions/_cgc/smart_ptr.hpp"

#include "tests/tests_fixtures.hpp"


using namespace std::chrono_literals;


namespace wasp::smart_ptr_test_case {

class TestRenewableSmartLock:
public wasp::tests_fixtures::ThreadsRunner
{
CPPUNIT_TEST_SUITE(TestRenewableSmartLock);
CPPUNIT_TEST(test_create_destroy);
CPPUNIT_TEST(test_acquire_release);
CPPUNIT_TEST_PARAMETERIZED(test_concurrency, wasp::tests_fixtures::sequence_generator<10>());
CPPUNIT_TEST_SUITE_END();

void test_create_destroy(){
wasp::cgc::RenewableSmartLock* sl = new wasp::cgc::RenewableSmartLock();
CPPUNIT_ASSERT(sl->release());
delete sl;
}

void test_acquire_release(){
wasp::cgc::RenewableSmartLock* sl = new wasp::cgc::RenewableSmartLock();

for (size_t i = 0; i < 10; i++){
CPPUNIT_ASSERT(sl->acquire());
CPPUNIT_ASSERT(! sl->release());
}

CPPUNIT_ASSERT(sl->release());
delete sl;
}

void test_concurrency(size_t){
volatile bool acquire_is_on = true;
wasp::cgc::RenewableSmartLock* sl = new wasp::cgc::RenewableSmartLock();

this->start_threads(
"acquire_threads",
50,
[&sl, &acquire_is_on](){TestRenewableSmartLock::acquire_thread_fn(sl, 10, &acquire_is_on);}
);

for (size_t i = 0; i < 10000; i++){
sl->release();
while (! sl->renew());
std::this_thread::sleep_for(1ms);
}

acquire_is_on = false;
this->join_threads("acquire_threads");

CPPUNIT_ASSERT(sl->release());
delete sl;
}

// TODO: add test where renew is called from concurrent threads

public:
static void acquire_thread_fn(
wasp::cgc::RenewableSmartLock* sl, size_t sleep_ms, volatile bool* is_running
){
do {
if (sl->acquire()){
sl->release();
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
}
while(*is_running);
}
};

}; // namespace wasp::smart_ptr_test_case

CPPUNIT_TEST_SUITE_REGISTRATION(wasp::smart_ptr_test_case::TestRenewableSmartLock);
48 changes: 46 additions & 2 deletions wasp_c_extensions/_cgc/smart_ptr.cpp
Expand Up @@ -39,10 +39,10 @@ size_t ResourceSmartLock::counter(){

void ResourceSmartLock::resurrect(){
bool true_v = true;
assert(this->dead_flag.compare_exchange_strong(true_v, false
, std::memory_order_seq_cst));
assert(this->dead_flag.load(std::memory_order_seq_cst));
this->usage_counter.store(1, std::memory_order_seq_cst);
this->concurrency_liveness_flag.store(false, std::memory_order_seq_cst);
assert(this->dead_flag.compare_exchange_strong(true_v, false, std::memory_order_seq_cst));
}

bool ResourceSmartLock::acquire(){
Expand Down Expand Up @@ -81,6 +81,50 @@ bool ResourceSmartLock::is_dead(){
return this->dead_flag.load(std::memory_order_seq_cst);
}

RenewableSmartLock::RenewableSmartLock():
ResourceSmartLock(),
is_renewing(false),
renew_lock()
{}

bool RenewableSmartLock::acquire(){
if (this->renew_lock.acquire()){
if (this->ResourceSmartLock::acquire()){
this->renew_lock.release();
return true;
}
this->renew_lock.release();
}
return false;
}

bool RenewableSmartLock::release(){
bool result = this->ResourceSmartLock::release();

if (result){
this->renew_lock.release();
}
return result;
}

bool RenewableSmartLock::renew(){

if (this->is_renewing.test_and_set(std::memory_order_seq_cst)){
return false;
}

if (! this->renew_lock.is_dead()){
this->is_renewing.clear();
return false;
}

this->resurrect();
this->renew_lock.resurrect();

this->is_renewing.clear();
return true;
};

SmartPointerBase::SmartPointerBase():
pointer_lock(),
pointer(NULL)
Expand Down

0 comments on commit 9618f54

Please sign in to comment.