Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup (shared_mutex, named_thread); fix cellVdecClose #5173

Merged
merged 8 commits into from Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions Utilities/Atomic.h
Expand Up @@ -610,6 +610,12 @@ class atomic_t
return atomic_storage<type>::compare_exchange(m_data, old, exch);
}

// As in std::atomic
bool compare_exchange(type& cmp_and_old, const type& exch)
{
return atomic_storage<type>::compare_exchange(m_data, cmp_and_old, exch);
}

// Atomic operation; returns old value, or pair of old value and return value (cancel op if evaluates to false)
template <typename F, typename RT = std::invoke_result_t<F, T&>>
std::conditional_t<std::is_void_v<RT>, type, std::pair<type, RT>> fetch_op(F&& func)
Expand Down
3 changes: 2 additions & 1 deletion Utilities/Config.h
Expand Up @@ -49,9 +49,10 @@ namespace cfg
_base(type _type, class node* owner, const std::string& name);

public:
// Disallow copy/move constructors and assignments
_base(const _base&) = delete;

_base& operator=(const _base&) = delete;

// Get type
type get_type() const { return m_type; }

Expand Down
2 changes: 1 addition & 1 deletion Utilities/GDBDebugServer.cpp
Expand Up @@ -829,7 +829,7 @@ void GDBDebugServer::on_stop()
this->stop = true;
//just in case we are waiting for breakpoint
this->notify();
named_thread::on_stop();
old_thread::on_stop();
}

void GDBDebugServer::pause_from(cpu_thread* t) {
Expand Down
4 changes: 2 additions & 2 deletions Utilities/GDBDebugServer.h
Expand Up @@ -40,8 +40,8 @@ class wrong_checksum_exception : public std::runtime_error {
const u64 ALL_THREADS = 0xffffffffffffffff;
const u64 ANY_THREAD = 0;

class GDBDebugServer : public named_thread {

class GDBDebugServer : public old_thread
{
socket_t server_socket;
socket_t client_socket;
std::weak_ptr<cpu_thread> selected_thread;
Expand Down
1 change: 0 additions & 1 deletion Utilities/JIT.cpp
Expand Up @@ -244,7 +244,6 @@ struct MemoryManager : llvm::RTDyldMemoryManager

if (!is_ro)
{
LOG_ERROR(GENERAL, "LLVM: Writeable data section not supported!");
}

utils::memory_commit(s_next, size);
Expand Down
137 changes: 87 additions & 50 deletions Utilities/Thread.cpp
Expand Up @@ -35,10 +35,12 @@
#endif

#include "sync.h"
#include "Log.h"

thread_local u64 g_tls_fault_all = 0;
thread_local u64 g_tls_fault_rsx = 0;
thread_local u64 g_tls_fault_spu = 0;
extern thread_local std::string(*g_tls_log_prefix)();

[[noreturn]] void catch_all_exceptions()
{
Expand All @@ -48,11 +50,11 @@ thread_local u64 g_tls_fault_spu = 0;
}
catch (const std::exception& e)
{
report_fatal_error("Unhandled exception of type '"s + typeid(e).name() + "': "s + e.what());
report_fatal_error("{" + g_tls_log_prefix() + "} Unhandled exception of type '"s + typeid(e).name() + "': "s + e.what());
}
catch (...)
{
report_fatal_error("Unhandled exception (unknown)");
report_fatal_error("{" + g_tls_log_prefix() + "} Unhandled exception (unknown)");
}
}

Expand Down Expand Up @@ -1567,25 +1569,21 @@ extern atomic_t<u32> g_thread_count(0);

thread_local DECLARE(thread_ctrl::g_tls_this_thread) = nullptr;

extern thread_local std::string(*g_tls_log_prefix)();

DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined };

void thread_ctrl::start(const std::shared_ptr<thread_ctrl>& ctrl, task_stack task)
void thread_base::start(const std::shared_ptr<thread_base>& ctrl, task_stack task)
{
#ifdef _WIN32
using thread_result = uint;
using thread_type = thread_result(__stdcall*)(void* arg);
#else
using thread_result = void*;
using thread_type = thread_result(*)(void* arg);
#endif

// Thread entry point
const thread_type entry = [](void* arg) -> thread_result
const native_entry entry = [](void* arg) -> thread_result
{
// Recover shared_ptr from short-circuited thread_ctrl object pointer
const std::shared_ptr<thread_ctrl> ctrl = static_cast<thread_ctrl*>(arg)->m_self;
// Recover shared_ptr from short-circuited thread_base object pointer
std::shared_ptr<thread_base> ctrl = static_cast<thread_base*>(arg)->m_self;

try
{
Expand All @@ -1596,17 +1594,18 @@ void thread_ctrl::start(const std::shared_ptr<thread_ctrl>& ctrl, task_stack tas
{
// Capture exception
ctrl->finalize(std::current_exception());
finalize();
return 0;
}

ctrl->finalize(nullptr);
finalize();
return 0;
};

ctrl->m_self = ctrl;
ctrl->m_task = std::move(task);

// TODO: implement simple thread pool
#ifdef _WIN32
std::uintptr_t thread = _beginthreadex(nullptr, 0, entry, ctrl.get(), 0, nullptr);
verify("thread_ctrl::start" HERE), thread != 0;
Expand All @@ -1619,14 +1618,24 @@ void thread_ctrl::start(const std::shared_ptr<thread_ctrl>& ctrl, task_stack tas
ctrl->m_thread = (uintptr_t)thread;
}

void thread_ctrl::initialize()
void thread_base::start(native_entry entry)
{
#ifdef _WIN32
m_thread = ::_beginthreadex(nullptr, 0, entry, this, CREATE_SUSPENDED, nullptr);
verify("thread_ctrl::start" HERE), m_thread, ::ResumeThread(reinterpret_cast<HANDLE>(+m_thread)) != -1;
#else
verify("thread_ctrl::start" HERE), pthread_create(reinterpret_cast<pthread_t*>(&m_thread.raw()), nullptr, entry, this) == 0;
#endif
}

void thread_base::initialize()
{
// Initialize TLS variable
g_tls_this_thread = this;
thread_ctrl::g_tls_this_thread = this;

g_tls_log_prefix = []
{
return g_tls_this_thread->m_name;
return thread_ctrl::g_tls_this_thread->m_name.get();
};

++g_thread_count;
Expand All @@ -1645,7 +1654,7 @@ void thread_ctrl::initialize()
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = m_name.c_str();
info.szName = m_name.get().c_str();
info.dwThreadID = -1;
info.dwFlags = 0;

Expand All @@ -1660,17 +1669,17 @@ void thread_ctrl::initialize()
#endif

#if defined(__APPLE__)
pthread_setname_np(m_name.substr(0, 15).c_str());
pthread_setname_np(m_name.get().substr(0, 15).c_str());
#elif defined(__DragonFly__) || defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(pthread_self(), m_name.c_str());
pthread_set_name_np(pthread_self(), m_name.get().c_str());
#elif defined(__NetBSD__)
pthread_setname_np(pthread_self(), "%s", (void*)m_name.c_str());
pthread_setname_np(pthread_self(), "%s", (void*)m_name.get().c_str());
#elif !defined(_WIN32)
pthread_setname_np(pthread_self(), m_name.substr(0, 15).c_str());
pthread_setname_np(pthread_self(), m_name.get().substr(0, 15).c_str());
#endif
}

void thread_ctrl::finalize(std::exception_ptr eptr) noexcept
std::shared_ptr<thread_base> thread_base::finalize(std::exception_ptr eptr) noexcept
{
// Report pending errors
error_code::error_report(0, 0, 0, 0);
Expand All @@ -1693,7 +1702,7 @@ void thread_ctrl::finalize(std::exception_ptr eptr) noexcept

g_tls_log_prefix = []
{
return g_tls_this_thread->m_name;
return thread_ctrl::g_tls_this_thread->m_name.get();
};

LOG_NOTICE(GENERAL, "Thread time: %fs (%fGc); Faults: %u [rsx:%u, spu:%u];",
Expand All @@ -1703,13 +1712,24 @@ void thread_ctrl::finalize(std::exception_ptr eptr) noexcept
g_tls_fault_rsx,
g_tls_fault_spu);

--g_thread_count;

// Untangle circular reference, set exception
std::lock_guard{m_mutex}, m_self.reset(), m_exception = eptr;
std::unique_lock lock(m_mutex);

// Signal joining waiters
m_jcv.notify_all();
// Possibly last reference to the thread object
std::shared_ptr<thread_base> self = std::move(m_self);
m_state = thread_state::finished;
m_exception = eptr;

// Signal waiting threads
lock.unlock(), m_jcv.notify_all();
return self;
}

void thread_base::finalize() noexcept
{
g_tls_log_prefix = []() -> std::string { return {}; };
thread_ctrl::g_tls_this_thread = nullptr;
--g_thread_count;
}

bool thread_ctrl::_wait_for(u64 usec)
Expand All @@ -1718,7 +1738,7 @@ bool thread_ctrl::_wait_for(u64 usec)

struct half_lock
{
semaphore<>& ref;
shared_mutex& ref;

void lock()
{
Expand Down Expand Up @@ -1777,18 +1797,18 @@ bool thread_ctrl::_wait_for(u64 usec)
return false;
}

[[noreturn]] void thread_ctrl::_throw()
[[noreturn]] void thread_base::_throw()
{
std::exception_ptr ex = std::exchange(m_exception, std::exception_ptr{});
m_signal &= ~3;
m_mutex.unlock();
std::rethrow_exception(std::move(ex));
}

void thread_ctrl::_notify(cond_variable thread_ctrl::* ptr)
void thread_base::_notify(cond_variable thread_base::* ptr)
{
// Optimized lock + unlock
if (!m_mutex.get())
if (!m_mutex.is_free())
{
m_mutex.lock();
m_mutex.unlock();
Expand All @@ -1797,12 +1817,12 @@ void thread_ctrl::_notify(cond_variable thread_ctrl::* ptr)
(this->*ptr).notify_one();
}

thread_ctrl::thread_ctrl(std::string&& name)
: m_name(std::move(name))
thread_base::thread_base(std::string_view name)
: m_name(name)
{
}

thread_ctrl::~thread_ctrl()
thread_base::~thread_base()
{
if (m_thread)
{
Expand All @@ -1814,13 +1834,13 @@ thread_ctrl::~thread_ctrl()
}
}

std::exception_ptr thread_ctrl::get_exception() const
std::exception_ptr thread_base::get_exception() const
{
std::lock_guard lock(m_mutex);
return m_exception;
}

void thread_ctrl::set_exception(std::exception_ptr ptr)
void thread_base::set_exception(std::exception_ptr ptr)
{
std::lock_guard lock(m_mutex);
m_exception = ptr;
Expand All @@ -1836,35 +1856,52 @@ void thread_ctrl::set_exception(std::exception_ptr ptr)
}
}

void thread_ctrl::join()
void thread_base::join() const
{
#ifdef _WIN32
//verify("thread_ctrl::join" HERE), WaitForSingleObjectEx((HANDLE)m_thread.load(), -1, false) == WAIT_OBJECT_0;
#endif
if (m_state == thread_state::finished)
{
return;
}

std::unique_lock lock(m_mutex);

while (m_self)
while (m_state != thread_state::finished)
{
m_jcv.wait(lock);
}
}

void thread_base::detach()
{
auto self = weak_from_this().lock();

if (!self)
{
LOG_FATAL(GENERAL, "Cannot detach thread '%s'", get_name());
return;
}

if (UNLIKELY(m_exception && !std::uncaught_exceptions()))
if (self->m_state.compare_and_swap_test(thread_state::created, thread_state::detached))
{
std::rethrow_exception(m_exception);
std::lock_guard lock(m_mutex);

if (m_state == thread_state::detached)
{
m_self = std::move(self);
}
}
}

void thread_ctrl::notify()
void thread_base::notify()
{
if (!(m_signal & 1))
{
m_signal |= 1;
_notify(&thread_ctrl::m_cond);
_notify(&thread_base::m_cond);
}
}

u64 thread_ctrl::get_cycles()
u64 thread_base::get_cycles()
{
u64 cycles;

Expand Down Expand Up @@ -2059,23 +2096,23 @@ void thread_ctrl::set_thread_affinity_mask(u16 mask)
#endif
}

named_thread::named_thread()
old_thread::old_thread()
{
}

named_thread::~named_thread()
old_thread::~old_thread()
{
}

std::string named_thread::get_name() const
std::string old_thread::get_name() const
{
return fmt::format("('%s') Unnamed Thread", typeid(*this).name());
}

void named_thread::start_thread(const std::shared_ptr<void>& _this)
void old_thread::start_thread(const std::shared_ptr<void>& _this)
{
// Ensure it's not called from the constructor and the correct object is passed
verify("named_thread::start_thread" HERE), _this.get() == this;
verify("old_thread::start_thread" HERE), _this.get() == this;

// Run thread
thread_ctrl::spawn(m_thread, get_name(), [this, _this]()
Expand Down