Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restricting thread_data to creating only with intrusive_pointers #1765

Merged
merged 3 commits into from Oct 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 6 additions & 9 deletions hpx/runtime/threads/policies/thread_queue.hpp
Expand Up @@ -146,6 +146,7 @@ namespace hpx { namespace threads { namespace policies
threads::thread_init_data& data, thread_state_enum state, Lock& lk)
{
HPX_ASSERT(lk.owns_lock());
HPX_ASSERT(data.stacksize != 0);

std::ptrdiff_t stacksize = data.stacksize;

Expand Down Expand Up @@ -213,12 +214,8 @@ namespace hpx { namespace threads { namespace policies
hpx::util::unlock_guard<Lock> ull(lk);

// Allocate a new thread object.
if (data.stacksize != 0)
thrd.reset(new (memory_pool_) threads::thread_data(
data, memory_pool_, state));
else
thrd.reset(new threads::stackless_thread_data(
data, &memory_pool_, state));
thrd = threads::thread_data::create(
data, memory_pool_, state);
}
}

Expand Down Expand Up @@ -280,7 +277,7 @@ namespace hpx { namespace threads { namespace policies

// this thread has to be in the map now
HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end());
HPX_ASSERT(thrd->is_created_from(&memory_pool_));
HPX_ASSERT(thrd->get_pool() == &memory_pool_);
}

if (added) {
Expand Down Expand Up @@ -730,7 +727,7 @@ namespace hpx { namespace threads { namespace policies

// this thread has to be in the map now
HPX_ASSERT(thread_map_.find(thrd.get()) != thread_map_.end());
HPX_ASSERT(thrd->is_created_from(&memory_pool_));
HPX_ASSERT(thrd->get_pool() == &memory_pool_);

// push the new thread in the pending queue thread
if (initial_state == pending)
Expand Down Expand Up @@ -861,7 +858,7 @@ namespace hpx { namespace threads { namespace policies
/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data_base* thrd, boost::int64_t& busy_count)
{
if (thrd->is_created_from(&memory_pool_))
if (thrd->get_pool() == &memory_pool_)
{
terminated_items_.push(thrd);

Expand Down
180 changes: 55 additions & 125 deletions hpx/runtime/threads/thread_data.hpp
Expand Up @@ -121,6 +121,13 @@ namespace hpx { namespace threads
struct tag {};
typedef util::spinlock_pool<tag> mutex_type;

struct pool_base
{
virtual ~pool_base() {}
virtual thread_data_base* allocate() = 0;
virtual void deallocate(thread_data_base*) = 0;
};

/// Construct a new \a thread
thread_data_base(thread_init_data& init_data, thread_state_enum newstate)
: current_state_(thread_state(newstate)),
Expand Down Expand Up @@ -584,7 +591,7 @@ namespace hpx { namespace threads
return stacksize_;
}

virtual bool is_created_from(void* pool) const = 0;
virtual pool_base* get_pool() = 0;
virtual thread_state_enum operator()() = 0;
virtual thread_id_type get_thread_id() const = 0;
virtual std::size_t get_thread_phase() const = 0;
Expand Down Expand Up @@ -662,17 +669,42 @@ namespace hpx { namespace threads
thread_data_base* this_() { return this; }

public:
typedef boost::lockfree::caching_freelist<thread_data> pool_type;
struct pool_type: thread_data_base::pool_base
{
pool_type(std::size_t size)
: pool_(size)
{}

thread_data(thread_init_data& init_data,
pool_type& pool, thread_state_enum newstate)
: thread_data_base(init_data, newstate),
coroutine_(std::move(init_data.func), std::move(init_data.target),
this_(), init_data.stacksize),
pool_(&pool)
virtual thread_data* allocate()
{
return pool_.allocate();
}

virtual void deallocate(thread_data_base* p)
{
pool_.deallocate(static_cast<thread_data*>(p));
}

private:
boost::lockfree::caching_freelist<thread_data> pool_;
};

static boost::intrusive_ptr<thread_data> create(
thread_init_data& init_data, pool_type& pool,
thread_state_enum newstate)
{
HPX_ASSERT(init_data.stacksize != 0);
HPX_ASSERT(coroutine_.is_ready());
thread_data* ret = pool.allocate();
if (ret == 0)
{
HPX_THROW_EXCEPTION(out_of_memory,
"thread_data::operator new",
"could not allocate memory for thread_data");
}
#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (ret, initial_value, sizeof(thread_data));
#endif
return new (ret) thread_data(init_data, pool, newstate);
}

~thread_data()
Expand All @@ -697,22 +729,9 @@ namespace hpx { namespace threads
HPX_ASSERT(coroutine_.is_ready());
}

///////////////////////////////////////////////////////////////////////
// Memory management
static void* operator new(std::size_t size, pool_type&);
static void operator delete(void* p, std::size_t size);
static void operator delete(void*, pool_type&);

// Won't be called.
static void* operator new(std::size_t) throw()
virtual pool_base* get_pool()
{
HPX_ASSERT(false);
return NULL;
}

bool is_created_from(void* pool) const
{
return pool_ == pool;
return pool_;
}

/// \brief Execute the thread function
Expand Down Expand Up @@ -767,113 +786,24 @@ namespace hpx { namespace threads
//}

private:
coroutine_type coroutine_;
pool_type* pool_;
};

typedef thread_data::pool_type thread_pool;

///////////////////////////////////////////////////////////////////////////
class stackless_thread_data : public thread_data_base
{
// Avoid warning about using 'this' in initializer list
thread_data_base* this_() { return this; }
friend HPX_EXPORT void intrusive_ptr_release(thread_data_base*);

public:
stackless_thread_data(thread_init_data& init_data,
void* pool, thread_state_enum newstate)
thread_data(thread_init_data& init_data,
pool_type& pool, thread_state_enum newstate)
: thread_data_base(init_data, newstate),
coroutine_(std::move(init_data.func), std::move(init_data.target), this_()),
pool_(pool)
{
HPX_ASSERT(init_data.stacksize == 0);
}

~stackless_thread_data()
{
LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128
<< get_description() << "), phase("
<< get_thread_phase() << ")";
}

void rebind(thread_init_data& init_data, thread_state_enum newstate)
{
LTM_(debug) << "~stackless_thread(" << this << "), description(" //-V128
<< get_description() << "), phase("
<< get_thread_phase() << "), rebind";

this->thread_data_base::rebind_base(init_data, newstate);

coroutine_.rebind(std::move(init_data.func),
std::move(init_data.target), this_());

HPX_ASSERT(init_data.stacksize == 0);
}

///////////////////////////////////////////////////////////////////////
// Memory management
bool is_created_from(void* pool) const
{
return pool_ == pool;
}

/// \brief Execute the thread function
///
/// \returns This function returns the thread state the thread
/// should be scheduled from this point on. The thread
/// manager will use the returned value to set the
/// thread's scheduling status.
thread_state_enum operator()()
{
thread_state_ex current_state_ex = get_state_ex();
current_state_ex_.store(thread_state_ex(wait_signaled,
current_state_ex.get_tag() + 1), boost::memory_order_release);

return coroutine_(current_state_ex);
}

thread_id_type get_thread_id() const
{
return thread_id_type(
reinterpret_cast<thread_data_base*>(coroutine_.get_thread_id())
);
}

std::size_t get_thread_phase() const
{
#ifndef HPX_HAVE_THREAD_PHASE_INFORMATION
return 0;
#else
return coroutine_.get_thread_phase();
#endif
}

#ifdef HPX_HAVE_THREAD_LOCAL_STORAGE
std::size_t get_thread_data() const
{
return coroutine_.get_thread_data();
}
std::size_t set_thread_data(std::size_t data)
coroutine_(std::move(init_data.func), std::move(init_data.target),
this_(), init_data.stacksize),
pool_(&pool)
{
return coroutine_.set_thread_data(data);
HPX_ASSERT(init_data.stacksize != 0);
HPX_ASSERT(coroutine_.is_ready());
}
#endif

/// This function will be called when the thread is about to be deleted
//void reset()
//{
// thread_data_base::reset();
// coroutine_.reset();
//}

private:
typedef util::coroutines::stackless_coroutine<
thread_function_sig
> coroutine_type;

coroutine_type coroutine_;
void* pool_;
pool_type* pool_;
};

typedef thread_data::pool_type thread_pool;
}}

#include <hpx/config/warnings_suffix.hpp>
Expand Down
52 changes: 3 additions & 49 deletions src/runtime/threads/thread_data.cpp
Expand Up @@ -35,56 +35,10 @@ namespace hpx { namespace threads
void intrusive_ptr_release(thread_data_base* p)
{
if (0 == --p->count_)
delete p;
}

///////////////////////////////////////////////////////////////////////////
void* thread_data::operator new(std::size_t size, thread_pool& pool)
{
HPX_ASSERT(sizeof(thread_data) == size);

void *ret = reinterpret_cast<void*>(pool.allocate());
if (0 == ret)
{
HPX_THROW_EXCEPTION(out_of_memory,
"thread_data::operator new",
"could not allocate memory for thread_data");
}

#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (ret, initial_value, sizeof(thread_data));
#endif
return ret;
}

void thread_data::operator delete(void *p, std::size_t size)
{
HPX_ASSERT(sizeof(thread_data) == size);

if (0 != p)
{
thread_data* pt = static_cast<thread_data*>(p);
thread_pool* pool = pt->pool_;
HPX_ASSERT(pool);

#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (static_cast<void*>(pt), freed_value, sizeof(thread_data)); //-V598
#endif
pool->deallocate(pt);
}
}

void thread_data::operator delete(void *p, thread_pool& pool)
{
if (0 != p)
{
#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (p, freed_value, sizeof(thread_data));
#endif
pool.deallocate(static_cast<thread_data*>(p));
thread_data_base::pool_base* pool = p->get_pool();
p->~thread_data_base();
pool->deallocate(p);
}
}

Expand Down