Skip to content

Commit

Permalink
Implement simple thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Nekotekina committed Nov 12, 2020
1 parent 50d80c6 commit dd2b777
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 14 deletions.
100 changes: 89 additions & 11 deletions Utilities/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1842,8 +1842,68 @@ thread_local DECLARE(thread_ctrl::g_tls_error_callback) = nullptr;

DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined };

static lf_fifo<atomic_t<thread_base**>, 240> s_thread_pool;

static shared_mutex s_pool_lock;

void thread_base::start(native_entry entry)
{
while (u32 size = s_thread_pool.size())
{
u32 pos = s_thread_pool.peek();
thread_base** tls = nullptr;

for (u32 i = pos; i < pos + size; i++)
{
auto val = s_thread_pool[i].load();

if (val && s_thread_pool[i].compare_and_swap_test(val, 0))
{
tls = val;
pos = i;
break;
}
}

if (tls)
{
// Send "this" and entry point
m_thread = reinterpret_cast<u64>(entry);
*tls = this;
s_thread_pool[pos].notify_one();

{
// Using it in MPMC manner is a bit tricky, since it's originally MPSC
std::lock_guard lock(s_pool_lock);

u32 count = 0;

while (!s_thread_pool[s_thread_pool.peek() + count])
{
count++;

if (count >= s_thread_pool.size())
{
break;
}
}

if (count)
{
s_thread_pool.pop_end(count);
}
}

// Wait for actual "m_thread" in return
while (m_thread == reinterpret_cast<u64>(entry))
{
busy_wait(300);
}

return;
}
}

#ifdef _WIN32
m_thread = ::_beginthreadex(nullptr, 0, entry, this, CREATE_SUSPENDED, nullptr);
verify("thread_ctrl::start" HERE), m_thread, ::ResumeThread(reinterpret_cast<HANDLE>(+m_thread)) != -1;
Expand All @@ -1867,8 +1927,11 @@ void thread_base::initialize(void (*error_cb)(), bool(*wait_cb)(const void*))
return thread_ctrl::get_name_cached();
};

std::string name = thread_ctrl::get_name_cached();
set_name(thread_ctrl::get_name_cached());
}

void thread_base::set_name(std::string name)
{
#ifdef _MSC_VER
struct THREADNAME_INFO
{
Expand Down Expand Up @@ -1983,11 +2046,34 @@ bool thread_base::finalize(thread_state result_state) noexcept
return !ok;
}

void thread_base::finalize() noexcept
void thread_base::finalize(u64 _self) noexcept
{
if (!_self)
{
// Don't even need to clean these values for detached threads
return;
}

atomic_wait_engine::set_wait_callback(nullptr);
g_tls_log_prefix = []() -> std::string { return {}; };
set_name("..pool");
thread_ctrl::g_tls_this_thread = nullptr;

// Try to add self to thread pool
const u32 pos = s_thread_pool.push_begin();
const auto tls = &thread_ctrl::g_tls_this_thread;
s_thread_pool[pos] = tls;

while (s_thread_pool[pos] == tls)
{
s_thread_pool[pos].wait(tls);
}

// Restore thread id
const auto _this = thread_ctrl::g_tls_this_thread;
const auto entry = _this->m_thread.exchange(_self);
_this->m_thread.notify_one();
reinterpret_cast<native_entry>(entry)(_this);
}

void thread_ctrl::_wait_for(u64 usec, bool alert /* true */)
Expand Down Expand Up @@ -2073,14 +2159,6 @@ thread_base::thread_base(std::string_view name)

thread_base::~thread_base()
{
if (m_thread)
{
#ifdef _WIN32
CloseHandle(reinterpret_cast<HANDLE>(m_thread.raw()));
#else
pthread_detach(reinterpret_cast<pthread_t>(m_thread.raw()));
#endif
}
}

bool thread_base::join() const
Expand Down Expand Up @@ -2169,7 +2247,7 @@ void thread_ctrl::emergency_exit(std::string_view reason)
delete _this;
}

thread_base::finalize();
thread_base::finalize(0);

#ifdef _WIN32
_endthreadex(0);
Expand Down
11 changes: 9 additions & 2 deletions Utilities/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ struct thread_thread_name<T, std::void_t<decltype(named_thread<T>::thread_name)>
// Thread base class
class thread_base
{
public:
// Native thread entry point function type
#ifdef _WIN32
using native_entry = uint(__stdcall*)(void* arg);
#else
using native_entry = void*(*)(void* arg);
#endif

private:
// Thread handle (platform-specific)
atomic_t<std::uintptr_t> m_thread{0};

Expand Down Expand Up @@ -129,7 +131,10 @@ class thread_base
bool finalize(thread_state result) noexcept;

// Cleanup after possibly deleting the thread instance
static void finalize() noexcept;
static void finalize(u64 _self) noexcept;

// Set name for debugger
static void set_name(std::string);

friend class thread_ctrl;

Expand Down Expand Up @@ -287,9 +292,11 @@ class named_thread final : public Context, result_storage_t<Context>, thread_bas
if (_this->entry_point())
{
delete _this;
thread::finalize(0);
return 0;
}

thread::finalize();
thread::finalize(_this->thread::m_thread);
return 0;
}

Expand Down
10 changes: 9 additions & 1 deletion rpcs3/Emu/CPU/CPUThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ struct cpu_counter
// Unregister and wait if necessary
_this->state += cpu_flag::wait;

if (slot >= std::size(cpu_array))
{
sys_log.fatal("Index out of bounds (%u)." HERE, slot);
return;
}

std::lock_guard lock(cpu_suspend_lock);

if (!cpu_array[slot].compare_and_swap_test(_this, nullptr))
Expand All @@ -344,7 +350,7 @@ struct cpu_counter

if (index >= std::size(cpu_array))
{
sys_log.fatal("Index out of bounds (%u).", index);
sys_log.fatal("Index out of bounds (%u)." HERE, index);
return;
}

Expand Down Expand Up @@ -520,6 +526,8 @@ void cpu_thread::operator()()

g_fxo->get<cpu_counter>()->remove(_this, s_tls_thread_slot);

s_tls_thread_slot = -1;

_this = nullptr;
}

Expand Down

0 comments on commit dd2b777

Please sign in to comment.