From 3a04ab5e06857dd87d15b87be0cab1c0d3027f9d Mon Sep 17 00:00:00 2001 From: AntonBikineev Date: Sat, 26 Sep 2015 12:23:59 -0500 Subject: [PATCH 1/3] Restricting thread_data to creating only with intrusive_pointers --- hpx/runtime/threads/policies/thread_queue.hpp | 4 +- hpx/runtime/threads/thread_data.hpp | 49 ++++++++------- src/runtime/threads/thread_data.cpp | 59 ++++--------------- 3 files changed, 40 insertions(+), 72 deletions(-) diff --git a/hpx/runtime/threads/policies/thread_queue.hpp b/hpx/runtime/threads/policies/thread_queue.hpp index e2ecee69bd4b..6a93605aec9e 100644 --- a/hpx/runtime/threads/policies/thread_queue.hpp +++ b/hpx/runtime/threads/policies/thread_queue.hpp @@ -214,8 +214,8 @@ namespace hpx { namespace threads { namespace policies // Allocate a new thread object. if (data.stacksize != 0) - thrd.reset(new (memory_pool_) threads::thread_data( - data, memory_pool_, state)); + thrd = threads::thread_data::create( + data, memory_pool_, state); else thrd.reset(new threads::stackless_thread_data( data, &memory_pool_, state)); diff --git a/hpx/runtime/threads/thread_data.hpp b/hpx/runtime/threads/thread_data.hpp index 02a02f320ee7..0fd4b95e7d2d 100644 --- a/hpx/runtime/threads/thread_data.hpp +++ b/hpx/runtime/threads/thread_data.hpp @@ -664,15 +664,22 @@ namespace hpx { namespace threads public: typedef boost::lockfree::caching_freelist pool_type; - thread_data(thread_init_data& init_data, - pool_type& pool, thread_state_enum newstate) - : thread_data_base(init_data, newstate), - coroutine_(std::move(init_data.func), std::move(init_data.target), - this_(), init_data.stacksize), - pool_(&pool) + static boost::intrusive_ptr create( + thread_init_data& init_data, pool_type& pool, + thread_state_enum newstate) { - HPX_ASSERT(init_data.stacksize != 0); - HPX_ASSERT(coroutine_.is_ready()); + thread_data* ret = pool.allocate(); + if (ret == 0) + { + HPX_THROW_EXCEPTION(out_of_memory, + "thread_data::operator new", + "could not allocate memory for thread_data"); + } +#ifdef HPX_DEBUG_THREAD_POOL + using namespace std; // some systems have memset in namespace std + memset (ret, initial_value, sizeof(thread_data)); +#endif + return new (ret) thread_data(init_data, pool, newstate); } ~thread_data() @@ -697,19 +704,6 @@ namespace hpx { namespace threads HPX_ASSERT(coroutine_.is_ready()); } - /////////////////////////////////////////////////////////////////////// - // Memory management - static void* operator new(std::size_t size, pool_type&); - static void operator delete(void* p, std::size_t size); - static void operator delete(void*, pool_type&); - - // Won't be called. - static void* operator new(std::size_t) throw() - { - HPX_ASSERT(false); - return NULL; - } - bool is_created_from(void* pool) const { return pool_ == pool; @@ -767,6 +761,19 @@ namespace hpx { namespace threads //} private: + friend HPX_EXPORT void intrusive_ptr_release(thread_data_base*); + + thread_data(thread_init_data& init_data, + pool_type& pool, thread_state_enum newstate) + : thread_data_base(init_data, newstate), + coroutine_(std::move(init_data.func), std::move(init_data.target), + this_(), init_data.stacksize), + pool_(&pool) + { + HPX_ASSERT(init_data.stacksize != 0); + HPX_ASSERT(coroutine_.is_ready()); + } + coroutine_type coroutine_; pool_type* pool_; }; diff --git a/src/runtime/threads/thread_data.cpp b/src/runtime/threads/thread_data.cpp index f1ccc320028c..a21e248050ce 100644 --- a/src/runtime/threads/thread_data.cpp +++ b/src/runtime/threads/thread_data.cpp @@ -35,56 +35,17 @@ namespace hpx { namespace threads void intrusive_ptr_release(thread_data_base* p) { if (0 == --p->count_) - delete p; - } - - /////////////////////////////////////////////////////////////////////////// - void* thread_data::operator new(std::size_t size, thread_pool& pool) - { - HPX_ASSERT(sizeof(thread_data) == size); - - void *ret = reinterpret_cast(pool.allocate()); - if (0 == ret) - { - HPX_THROW_EXCEPTION(out_of_memory, - "thread_data::operator new", - "could not allocate memory for thread_data"); - } - -#ifdef HPX_DEBUG_THREAD_POOL - using namespace std; // some systems have memset in namespace std - memset (ret, initial_value, sizeof(thread_data)); -#endif - return ret; - } - - void thread_data::operator delete(void *p, std::size_t size) - { - HPX_ASSERT(sizeof(thread_data) == size); - - if (0 != p) { - thread_data* pt = static_cast(p); - thread_pool* pool = pt->pool_; - HPX_ASSERT(pool); - -#ifdef HPX_DEBUG_THREAD_POOL - using namespace std; // some systems have memset in namespace std - memset (static_cast(pt), freed_value, sizeof(thread_data)); //-V598 -#endif - pool->deallocate(pt); - } - } - - void thread_data::operator delete(void *p, thread_pool& pool) - { - if (0 != p) - { -#ifdef HPX_DEBUG_THREAD_POOL - using namespace std; // some systems have memset in namespace std - memset (p, freed_value, sizeof(thread_data)); -#endif - pool.deallocate(static_cast(p)); + if (thread_data* td = dynamic_cast(p)) + { + thread_pool* pool = td->pool_; + td->~thread_data(); + pool->deallocate(td); + } + else + { + delete p; + } } } From 973de3fb326cc8f3d3e224d674497c17163ad5cf Mon Sep 17 00:00:00 2001 From: AntonBikineev Date: Mon, 28 Sep 2015 11:55:48 -0500 Subject: [PATCH 2/3] Removing stackless_thread_data class --- hpx/runtime/threads/policies/thread_queue.hpp | 9 +- hpx/runtime/threads/thread_data.hpp | 102 ------------------ 2 files changed, 3 insertions(+), 108 deletions(-) diff --git a/hpx/runtime/threads/policies/thread_queue.hpp b/hpx/runtime/threads/policies/thread_queue.hpp index 6a93605aec9e..4f3e390a0905 100644 --- a/hpx/runtime/threads/policies/thread_queue.hpp +++ b/hpx/runtime/threads/policies/thread_queue.hpp @@ -146,6 +146,7 @@ namespace hpx { namespace threads { namespace policies threads::thread_init_data& data, thread_state_enum state, Lock& lk) { HPX_ASSERT(lk.owns_lock()); + HPX_ASSERT(data.stacksize != 0); std::ptrdiff_t stacksize = data.stacksize; @@ -213,12 +214,8 @@ namespace hpx { namespace threads { namespace policies hpx::util::unlock_guard ull(lk); // Allocate a new thread object. - if (data.stacksize != 0) - thrd = threads::thread_data::create( - data, memory_pool_, state); - else - thrd.reset(new threads::stackless_thread_data( - data, &memory_pool_, state)); + thrd = threads::thread_data::create( + data, memory_pool_, state); } } diff --git a/hpx/runtime/threads/thread_data.hpp b/hpx/runtime/threads/thread_data.hpp index 0fd4b95e7d2d..7898edd958aa 100644 --- a/hpx/runtime/threads/thread_data.hpp +++ b/hpx/runtime/threads/thread_data.hpp @@ -779,108 +779,6 @@ namespace hpx { namespace threads }; typedef thread_data::pool_type thread_pool; - - /////////////////////////////////////////////////////////////////////////// - class stackless_thread_data : public thread_data_base - { - // Avoid warning about using 'this' in initializer list - thread_data_base* this_() { return this; } - - public: - stackless_thread_data(thread_init_data& init_data, - void* pool, thread_state_enum newstate) - : thread_data_base(init_data, newstate), - coroutine_(std::move(init_data.func), std::move(init_data.target), this_()), - pool_(pool) - { - HPX_ASSERT(init_data.stacksize == 0); - } - - ~stackless_thread_data() - { - LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128 - << get_description() << "), phase(" - << get_thread_phase() << ")"; - } - - void rebind(thread_init_data& init_data, thread_state_enum newstate) - { - LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128 - << get_description() << "), phase(" - << get_thread_phase() << "), rebind"; - - this->thread_data_base::rebind_base(init_data, newstate); - - coroutine_.rebind(std::move(init_data.func), - std::move(init_data.target), this_()); - - HPX_ASSERT(init_data.stacksize == 0); - } - - /////////////////////////////////////////////////////////////////////// - // Memory management - bool is_created_from(void* pool) const - { - return pool_ == pool; - } - - /// \brief Execute the thread function - /// - /// \returns This function returns the thread state the thread - /// should be scheduled from this point on. The thread - /// manager will use the returned value to set the - /// thread's scheduling status. - thread_state_enum operator()() - { - thread_state_ex current_state_ex = get_state_ex(); - current_state_ex_.store(thread_state_ex(wait_signaled, - current_state_ex.get_tag() + 1), boost::memory_order_release); - - return coroutine_(current_state_ex); - } - - thread_id_type get_thread_id() const - { - return thread_id_type( - reinterpret_cast(coroutine_.get_thread_id()) - ); - } - - std::size_t get_thread_phase() const - { -#ifndef HPX_HAVE_THREAD_PHASE_INFORMATION - return 0; -#else - return coroutine_.get_thread_phase(); -#endif - } - -#ifdef HPX_HAVE_THREAD_LOCAL_STORAGE - std::size_t get_thread_data() const - { - return coroutine_.get_thread_data(); - } - std::size_t set_thread_data(std::size_t data) - { - return coroutine_.set_thread_data(data); - } -#endif - - /// This function will be called when the thread is about to be deleted - //void reset() - //{ - // thread_data_base::reset(); - // coroutine_.reset(); - //} - - private: - typedef util::coroutines::stackless_coroutine< - thread_function_sig - > coroutine_type; - - coroutine_type coroutine_; - void* pool_; - }; }} #include From f9c47b75d1a2e766834e92eef868d060b56ecc18 Mon Sep 17 00:00:00 2001 From: AntonBikineev Date: Mon, 28 Sep 2015 11:59:34 -0500 Subject: [PATCH 3/3] Removing dynamic_cast from thread_data deallocation implementation --- hpx/runtime/threads/policies/thread_queue.hpp | 6 ++-- hpx/runtime/threads/thread_data.hpp | 33 ++++++++++++++++--- src/runtime/threads/thread_data.cpp | 13 ++------ 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/hpx/runtime/threads/policies/thread_queue.hpp b/hpx/runtime/threads/policies/thread_queue.hpp index 4f3e390a0905..b6f177c9d30e 100644 --- a/hpx/runtime/threads/policies/thread_queue.hpp +++ b/hpx/runtime/threads/policies/thread_queue.hpp @@ -277,7 +277,7 @@ namespace hpx { namespace threads { namespace policies // this thread has to be in the map now HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end()); - HPX_ASSERT(thrd->is_created_from(&memory_pool_)); + HPX_ASSERT(thrd->get_pool() == &memory_pool_); } if (added) { @@ -727,7 +727,7 @@ namespace hpx { namespace threads { namespace policies // this thread has to be in the map now HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end()); - HPX_ASSERT(thrd->is_created_from(&memory_pool_)); + HPX_ASSERT(thrd->get_pool() == &memory_pool_); // push the new thread in the pending queue thread if (initial_state == pending) @@ -858,7 +858,7 @@ namespace hpx { namespace threads { namespace policies /// Destroy the passed thread as it has been terminated bool destroy_thread(threads::thread_data_base* thrd, boost::int64_t& busy_count) { - if (thrd->is_created_from(&memory_pool_)) + if (thrd->get_pool() == &memory_pool_) { terminated_items_.push(thrd); diff --git a/hpx/runtime/threads/thread_data.hpp b/hpx/runtime/threads/thread_data.hpp index 7898edd958aa..1f957efc3068 100644 --- a/hpx/runtime/threads/thread_data.hpp +++ b/hpx/runtime/threads/thread_data.hpp @@ -121,6 +121,13 @@ namespace hpx { namespace threads struct tag {}; typedef util::spinlock_pool mutex_type; + struct pool_base + { + virtual ~pool_base() {} + virtual thread_data_base* allocate() = 0; + virtual void deallocate(thread_data_base*) = 0; + }; + /// Construct a new \a thread thread_data_base(thread_init_data& init_data, thread_state_enum newstate) : current_state_(thread_state(newstate)), @@ -584,7 +591,7 @@ namespace hpx { namespace threads return stacksize_; } - virtual bool is_created_from(void* pool) const = 0; + virtual pool_base* get_pool() = 0; virtual thread_state_enum operator()() = 0; virtual thread_id_type get_thread_id() const = 0; virtual std::size_t get_thread_phase() const = 0; @@ -662,7 +669,25 @@ namespace hpx { namespace threads thread_data_base* this_() { return this; } public: - typedef boost::lockfree::caching_freelist pool_type; + struct pool_type: thread_data_base::pool_base + { + pool_type(std::size_t size) + : pool_(size) + {} + + virtual thread_data* allocate() + { + return pool_.allocate(); + } + + virtual void deallocate(thread_data_base* p) + { + pool_.deallocate(static_cast(p)); + } + + private: + boost::lockfree::caching_freelist pool_; + }; static boost::intrusive_ptr create( thread_init_data& init_data, pool_type& pool, @@ -704,9 +729,9 @@ namespace hpx { namespace threads HPX_ASSERT(coroutine_.is_ready()); } - bool is_created_from(void* pool) const + virtual pool_base* get_pool() { - return pool_ == pool; + return pool_; } /// \brief Execute the thread function diff --git a/src/runtime/threads/thread_data.cpp b/src/runtime/threads/thread_data.cpp index a21e248050ce..f758b1a35b46 100644 --- a/src/runtime/threads/thread_data.cpp +++ b/src/runtime/threads/thread_data.cpp @@ -36,16 +36,9 @@ namespace hpx { namespace threads { if (0 == --p->count_) { - if (thread_data* td = dynamic_cast(p)) - { - thread_pool* pool = td->pool_; - td->~thread_data(); - pool->deallocate(td); - } - else - { - delete p; - } + thread_data_base::pool_base* pool = p->get_pool(); + p->~thread_data_base(); + pool->deallocate(p); } }