Skip to content

Commit

Permalink
Merge pull request #1765 from STEllAR-GROUP/fixing_1763_2
Browse files Browse the repository at this point in the history
Restricting thread_data to creating only with intrusive_pointers
  • Loading branch information
sithhell committed Oct 1, 2015
2 parents 7d2bb84 + f9c47b7 commit 38b5740
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 183 deletions.
15 changes: 6 additions & 9 deletions hpx/runtime/threads/policies/thread_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ namespace hpx { namespace threads { namespace policies
threads::thread_init_data& data, thread_state_enum state, Lock& lk)
{
HPX_ASSERT(lk.owns_lock());
HPX_ASSERT(data.stacksize != 0);

std::ptrdiff_t stacksize = data.stacksize;

Expand Down Expand Up @@ -213,12 +214,8 @@ namespace hpx { namespace threads { namespace policies
hpx::util::unlock_guard<Lock> ull(lk);

// Allocate a new thread object.
if (data.stacksize != 0)
thrd.reset(new (memory_pool_) threads::thread_data(
data, memory_pool_, state));
else
thrd.reset(new threads::stackless_thread_data(
data, &memory_pool_, state));
thrd = threads::thread_data::create(
data, memory_pool_, state);
}
}

Expand Down Expand Up @@ -280,7 +277,7 @@ namespace hpx { namespace threads { namespace policies

// this thread has to be in the map now
HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end());
HPX_ASSERT(thrd->is_created_from(&memory_pool_));
HPX_ASSERT(thrd->get_pool() == &memory_pool_);
}

if (added) {
Expand Down Expand Up @@ -730,7 +727,7 @@ namespace hpx { namespace threads { namespace policies

// this thread has to be in the map now
HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end());
HPX_ASSERT(thrd->is_created_from(&memory_pool_));
HPX_ASSERT(thrd->get_pool() == &memory_pool_);

// push the new thread in the pending queue thread
if (initial_state == pending)
Expand Down Expand Up @@ -861,7 +858,7 @@ namespace hpx { namespace threads { namespace policies
/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data_base* thrd, boost::int64_t& busy_count)
{
if (thrd->is_created_from(&memory_pool_))
if (thrd->get_pool() == &memory_pool_)
{
terminated_items_.push(thrd);

Expand Down
180 changes: 55 additions & 125 deletions hpx/runtime/threads/thread_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ namespace hpx { namespace threads
struct tag {};
typedef util::spinlock_pool<tag> mutex_type;

struct pool_base
{
virtual ~pool_base() {}
virtual thread_data_base* allocate() = 0;
virtual void deallocate(thread_data_base*) = 0;
};

/// Construct a new \a thread
thread_data_base(thread_init_data& init_data, thread_state_enum newstate)
: current_state_(thread_state(newstate)),
Expand Down Expand Up @@ -584,7 +591,7 @@ namespace hpx { namespace threads
return stacksize_;
}

virtual bool is_created_from(void* pool) const = 0;
virtual pool_base* get_pool() = 0;
virtual thread_state_enum operator()() = 0;
virtual thread_id_type get_thread_id() const = 0;
virtual std::size_t get_thread_phase() const = 0;
Expand Down Expand Up @@ -662,17 +669,42 @@ namespace hpx { namespace threads
thread_data_base* this_() { return this; }

public:
typedef boost::lockfree::caching_freelist<thread_data> pool_type;
struct pool_type: thread_data_base::pool_base
{
pool_type(std::size_t size)
: pool_(size)
{}

thread_data(thread_init_data& init_data,
pool_type& pool, thread_state_enum newstate)
: thread_data_base(init_data, newstate),
coroutine_(std::move(init_data.func), std::move(init_data.target),
this_(), init_data.stacksize),
pool_(&pool)
virtual thread_data* allocate()
{
return pool_.allocate();
}

virtual void deallocate(thread_data_base* p)
{
pool_.deallocate(static_cast<thread_data*>(p));
}

private:
boost::lockfree::caching_freelist<thread_data> pool_;
};

static boost::intrusive_ptr<thread_data> create(
thread_init_data& init_data, pool_type& pool,
thread_state_enum newstate)
{
HPX_ASSERT(init_data.stacksize != 0);
HPX_ASSERT(coroutine_.is_ready());
thread_data* ret = pool.allocate();
if (ret == 0)
{
HPX_THROW_EXCEPTION(out_of_memory,
"thread_data::operator new",
"could not allocate memory for thread_data");
}
#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (ret, initial_value, sizeof(thread_data));
#endif
return new (ret) thread_data(init_data, pool, newstate);
}

~thread_data()
Expand All @@ -697,22 +729,9 @@ namespace hpx { namespace threads
HPX_ASSERT(coroutine_.is_ready());
}

///////////////////////////////////////////////////////////////////////
// Memory management
static void* operator new(std::size_t size, pool_type&);
static void operator delete(void* p, std::size_t size);
static void operator delete(void*, pool_type&);

// Won't be called.
static void* operator new(std::size_t) throw()
virtual pool_base* get_pool()
{
HPX_ASSERT(false);
return NULL;
}

bool is_created_from(void* pool) const
{
return pool_ == pool;
return pool_;
}

/// \brief Execute the thread function
Expand Down Expand Up @@ -767,113 +786,24 @@ namespace hpx { namespace threads
//}

private:
coroutine_type coroutine_;
pool_type* pool_;
};

typedef thread_data::pool_type thread_pool;

///////////////////////////////////////////////////////////////////////////
class stackless_thread_data : public thread_data_base
{
// Avoid warning about using 'this' in initializer list
thread_data_base* this_() { return this; }
friend HPX_EXPORT void intrusive_ptr_release(thread_data_base*);

public:
stackless_thread_data(thread_init_data& init_data,
void* pool, thread_state_enum newstate)
thread_data(thread_init_data& init_data,
pool_type& pool, thread_state_enum newstate)
: thread_data_base(init_data, newstate),
coroutine_(std::move(init_data.func), std::move(init_data.target), this_()),
pool_(pool)
{
HPX_ASSERT(init_data.stacksize == 0);
}

~stackless_thread_data()
{
LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128
<< get_description() << "), phase("
<< get_thread_phase() << ")";
}

void rebind(thread_init_data& init_data, thread_state_enum newstate)
{
LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128
<< get_description() << "), phase("
<< get_thread_phase() << "), rebind";

this->thread_data_base::rebind_base(init_data, newstate);

coroutine_.rebind(std::move(init_data.func),
std::move(init_data.target), this_());

HPX_ASSERT(init_data.stacksize == 0);
}

///////////////////////////////////////////////////////////////////////
// Memory management
bool is_created_from(void* pool) const
{
return pool_ == pool;
}

/// \brief Execute the thread function
///
/// \returns This function returns the thread state the thread
/// should be scheduled from this point on. The thread
/// manager will use the returned value to set the
/// thread's scheduling status.
thread_state_enum operator()()
{
thread_state_ex current_state_ex = get_state_ex();
current_state_ex_.store(thread_state_ex(wait_signaled,
current_state_ex.get_tag() + 1), boost::memory_order_release);

return coroutine_(current_state_ex);
}

thread_id_type get_thread_id() const
{
return thread_id_type(
reinterpret_cast<thread_data_base*>(coroutine_.get_thread_id())
);
}

std::size_t get_thread_phase() const
{
#ifndef HPX_HAVE_THREAD_PHASE_INFORMATION
return 0;
#else
return coroutine_.get_thread_phase();
#endif
}

#ifdef HPX_HAVE_THREAD_LOCAL_STORAGE
std::size_t get_thread_data() const
{
return coroutine_.get_thread_data();
}
std::size_t set_thread_data(std::size_t data)
coroutine_(std::move(init_data.func), std::move(init_data.target),
this_(), init_data.stacksize),
pool_(&pool)
{
return coroutine_.set_thread_data(data);
HPX_ASSERT(init_data.stacksize != 0);
HPX_ASSERT(coroutine_.is_ready());
}
#endif

/// This function will be called when the thread is about to be deleted
//void reset()
//{
// thread_data_base::reset();
// coroutine_.reset();
//}

private:
typedef util::coroutines::stackless_coroutine<
thread_function_sig
> coroutine_type;

coroutine_type coroutine_;
void* pool_;
pool_type* pool_;
};

typedef thread_data::pool_type thread_pool;
}}

#include <hpx/config/warnings_suffix.hpp>
Expand Down
52 changes: 3 additions & 49 deletions src/runtime/threads/thread_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,56 +35,10 @@ namespace hpx { namespace threads
void intrusive_ptr_release(thread_data_base* p)
{
if (0 == --p->count_)
delete p;
}

///////////////////////////////////////////////////////////////////////////
void* thread_data::operator new(std::size_t size, thread_pool& pool)
{
HPX_ASSERT(sizeof(thread_data) == size);

void *ret = reinterpret_cast<void*>(pool.allocate());
if (0 == ret)
{
HPX_THROW_EXCEPTION(out_of_memory,
"thread_data::operator new",
"could not allocate memory for thread_data");
}

#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (ret, initial_value, sizeof(thread_data));
#endif
return ret;
}

void thread_data::operator delete(void *p, std::size_t size)
{
HPX_ASSERT(sizeof(thread_data) == size);

if (0 != p)
{
thread_data* pt = static_cast<thread_data*>(p);
thread_pool* pool = pt->pool_;
HPX_ASSERT(pool);

#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (static_cast<void*>(pt), freed_value, sizeof(thread_data)); //-V598
#endif
pool->deallocate(pt);
}
}

void thread_data::operator delete(void *p, thread_pool& pool)
{
if (0 != p)
{
#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (p, freed_value, sizeof(thread_data));
#endif
pool.deallocate(static_cast<thread_data*>(p));
thread_data_base::pool_base* pool = p->get_pool();
p->~thread_data_base();
pool->deallocate(p);
}
}

Expand Down

0 comments on commit 38b5740

Please sign in to comment.