diff --git a/hpx/runtime/threads/policies/thread_queue.hpp b/hpx/runtime/threads/policies/thread_queue.hpp index e2ecee69bd4b..b6f177c9d30e 100644 --- a/hpx/runtime/threads/policies/thread_queue.hpp +++ b/hpx/runtime/threads/policies/thread_queue.hpp @@ -146,6 +146,7 @@ namespace hpx { namespace threads { namespace policies threads::thread_init_data& data, thread_state_enum state, Lock& lk) { HPX_ASSERT(lk.owns_lock()); + HPX_ASSERT(data.stacksize != 0); std::ptrdiff_t stacksize = data.stacksize; @@ -213,12 +214,8 @@ namespace hpx { namespace threads { namespace policies hpx::util::unlock_guard ull(lk); // Allocate a new thread object. - if (data.stacksize != 0) - thrd.reset(new (memory_pool_) threads::thread_data( - data, memory_pool_, state)); - else - thrd.reset(new threads::stackless_thread_data( - data, &memory_pool_, state)); + thrd = threads::thread_data::create( + data, memory_pool_, state); } } @@ -280,7 +277,7 @@ namespace hpx { namespace threads { namespace policies // this thread has to be in the map now HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end()); - HPX_ASSERT(thrd->is_created_from(&memory_pool_)); + HPX_ASSERT(thrd->get_pool() == &memory_pool_); } if (added) { @@ -730,7 +727,7 @@ namespace hpx { namespace threads { namespace policies // this thread has to be in the map now HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end()); - HPX_ASSERT(thrd->is_created_from(&memory_pool_)); + HPX_ASSERT(thrd->get_pool() == &memory_pool_); // push the new thread in the pending queue thread if (initial_state == pending) @@ -861,7 +858,7 @@ namespace hpx { namespace threads { namespace policies /// Destroy the passed thread as it has been terminated bool destroy_thread(threads::thread_data_base* thrd, boost::int64_t& busy_count) { - if (thrd->is_created_from(&memory_pool_)) + if (thrd->get_pool() == &memory_pool_) { terminated_items_.push(thrd); diff --git a/hpx/runtime/threads/thread_data.hpp b/hpx/runtime/threads/thread_data.hpp index 02a02f320ee7..1f957efc3068 100644 --- a/hpx/runtime/threads/thread_data.hpp +++ b/hpx/runtime/threads/thread_data.hpp @@ -121,6 +121,13 @@ namespace hpx { namespace threads struct tag {}; typedef util::spinlock_pool mutex_type; + struct pool_base + { + virtual ~pool_base() {} + virtual thread_data_base* allocate() = 0; + virtual void deallocate(thread_data_base*) = 0; + }; + /// Construct a new \a thread thread_data_base(thread_init_data& init_data, thread_state_enum newstate) : current_state_(thread_state(newstate)), @@ -584,7 +591,7 @@ namespace hpx { namespace threads return stacksize_; } - virtual bool is_created_from(void* pool) const = 0; + virtual pool_base* get_pool() = 0; virtual thread_state_enum operator()() = 0; virtual thread_id_type get_thread_id() const = 0; virtual std::size_t get_thread_phase() const = 0; @@ -662,17 +669,42 @@ namespace hpx { namespace threads thread_data_base* this_() { return this; } public: - typedef boost::lockfree::caching_freelist pool_type; + struct pool_type: thread_data_base::pool_base + { + pool_type(std::size_t size) + : pool_(size) + {} - thread_data(thread_init_data& init_data, - pool_type& pool, thread_state_enum newstate) - : thread_data_base(init_data, newstate), - coroutine_(std::move(init_data.func), std::move(init_data.target), - this_(), init_data.stacksize), - pool_(&pool) + virtual thread_data* allocate() + { + return pool_.allocate(); + } + + virtual void deallocate(thread_data_base* p) + { + pool_.deallocate(static_cast(p)); + } + + private: + boost::lockfree::caching_freelist pool_; + }; + + static boost::intrusive_ptr create( + thread_init_data& init_data, pool_type& pool, + thread_state_enum newstate) { - HPX_ASSERT(init_data.stacksize != 0); - HPX_ASSERT(coroutine_.is_ready()); + thread_data* ret = pool.allocate(); + if (ret == 0) + { + HPX_THROW_EXCEPTION(out_of_memory, + "thread_data::operator new", + "could not allocate memory for thread_data"); + } +#ifdef HPX_DEBUG_THREAD_POOL + using namespace std; // some systems have memset in namespace std + memset (ret, initial_value, sizeof(thread_data)); +#endif + return new (ret) thread_data(init_data, pool, newstate); } ~thread_data() @@ -697,22 +729,9 @@ namespace hpx { namespace threads HPX_ASSERT(coroutine_.is_ready()); } - /////////////////////////////////////////////////////////////////////// - // Memory management - static void* operator new(std::size_t size, pool_type&); - static void operator delete(void* p, std::size_t size); - static void operator delete(void*, pool_type&); - - // Won't be called. - static void* operator new(std::size_t) throw() + virtual pool_base* get_pool() { - HPX_ASSERT(false); - return NULL; - } - - bool is_created_from(void* pool) const - { - return pool_ == pool; + return pool_; } /// \brief Execute the thread function @@ -767,113 +786,24 @@ namespace hpx { namespace threads //} private: - coroutine_type coroutine_; - pool_type* pool_; - }; - - typedef thread_data::pool_type thread_pool; - - /////////////////////////////////////////////////////////////////////////// - class stackless_thread_data : public thread_data_base - { - // Avoid warning about using 'this' in initializer list - thread_data_base* this_() { return this; } + friend HPX_EXPORT void intrusive_ptr_release(thread_data_base*); - public: - stackless_thread_data(thread_init_data& init_data, - void* pool, thread_state_enum newstate) + thread_data(thread_init_data& init_data, + pool_type& pool, thread_state_enum newstate) : thread_data_base(init_data, newstate), - coroutine_(std::move(init_data.func), std::move(init_data.target), this_()), - pool_(pool) - { - HPX_ASSERT(init_data.stacksize == 0); - } - - ~stackless_thread_data() - { - LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128 - << get_description() << "), phase(" - << get_thread_phase() << ")"; - } - - void rebind(thread_init_data& init_data, thread_state_enum newstate) - { - LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128 - << get_description() << "), phase(" - << get_thread_phase() << "), rebind"; - - this->thread_data_base::rebind_base(init_data, newstate); - - coroutine_.rebind(std::move(init_data.func), - std::move(init_data.target), this_()); - - HPX_ASSERT(init_data.stacksize == 0); - } - - /////////////////////////////////////////////////////////////////////// - // Memory management - bool is_created_from(void* pool) const - { - return pool_ == pool; - } - - /// \brief Execute the thread function - /// - /// \returns This function returns the thread state the thread - /// should be scheduled from this point on. The thread - /// manager will use the returned value to set the - /// thread's scheduling status. - thread_state_enum operator()() - { - thread_state_ex current_state_ex = get_state_ex(); - current_state_ex_.store(thread_state_ex(wait_signaled, - current_state_ex.get_tag() + 1), boost::memory_order_release); - - return coroutine_(current_state_ex); - } - - thread_id_type get_thread_id() const - { - return thread_id_type( - reinterpret_cast(coroutine_.get_thread_id()) - ); - } - - std::size_t get_thread_phase() const - { -#ifndef HPX_HAVE_THREAD_PHASE_INFORMATION - return 0; -#else - return coroutine_.get_thread_phase(); -#endif - } - -#ifdef HPX_HAVE_THREAD_LOCAL_STORAGE - std::size_t get_thread_data() const - { - return coroutine_.get_thread_data(); - } - std::size_t set_thread_data(std::size_t data) + coroutine_(std::move(init_data.func), std::move(init_data.target), + this_(), init_data.stacksize), + pool_(&pool) { - return coroutine_.set_thread_data(data); + HPX_ASSERT(init_data.stacksize != 0); + HPX_ASSERT(coroutine_.is_ready()); } -#endif - - /// This function will be called when the thread is about to be deleted - //void reset() - //{ - // thread_data_base::reset(); - // coroutine_.reset(); - //} - - private: - typedef util::coroutines::stackless_coroutine< - thread_function_sig - > coroutine_type; coroutine_type coroutine_; - void* pool_; + pool_type* pool_; }; + + typedef thread_data::pool_type thread_pool; }} #include diff --git a/src/runtime/threads/thread_data.cpp b/src/runtime/threads/thread_data.cpp index f1ccc320028c..f758b1a35b46 100644 --- a/src/runtime/threads/thread_data.cpp +++ b/src/runtime/threads/thread_data.cpp @@ -35,56 +35,10 @@ namespace hpx { namespace threads void intrusive_ptr_release(thread_data_base* p) { if (0 == --p->count_) - delete p; - } - - /////////////////////////////////////////////////////////////////////////// - void* thread_data::operator new(std::size_t size, thread_pool& pool) - { - HPX_ASSERT(sizeof(thread_data) == size); - - void *ret = reinterpret_cast(pool.allocate()); - if (0 == ret) - { - HPX_THROW_EXCEPTION(out_of_memory, - "thread_data::operator new", - "could not allocate memory for thread_data"); - } - -#ifdef HPX_DEBUG_THREAD_POOL - using namespace std; // some systems have memset in namespace std - memset (ret, initial_value, sizeof(thread_data)); -#endif - return ret; - } - - void thread_data::operator delete(void *p, std::size_t size) - { - HPX_ASSERT(sizeof(thread_data) == size); - - if (0 != p) - { - thread_data* pt = static_cast(p); - thread_pool* pool = pt->pool_; - HPX_ASSERT(pool); - -#ifdef HPX_DEBUG_THREAD_POOL - using namespace std; // some systems have memset in namespace std - memset (static_cast(pt), freed_value, sizeof(thread_data)); //-V598 -#endif - pool->deallocate(pt); - } - } - - void thread_data::operator delete(void *p, thread_pool& pool) - { - if (0 != p) { -#ifdef HPX_DEBUG_THREAD_POOL - using namespace std; // some systems have memset in namespace std - memset (p, freed_value, sizeof(thread_data)); -#endif - pool.deallocate(static_cast(p)); + thread_data_base::pool_base* pool = p->get_pool(); + p->~thread_data_base(); + pool->deallocate(p); } }