Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New atomic fixups #9259

Merged
merged 3 commits into from Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
124 changes: 71 additions & 53 deletions Utilities/Thread.cpp
Expand Up @@ -75,6 +75,7 @@
#include "sync.h"
#include "util/vm.hpp"
#include "util/logs.hpp"
#include "util/asm.hpp"
#include "Emu/Memory/vm_locking.h"

LOG_CHANNEL(sig_log, "SIG");
Expand Down Expand Up @@ -1842,66 +1843,40 @@ thread_local DECLARE(thread_ctrl::g_tls_error_callback) = nullptr;

DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined };

static lf_fifo<atomic_t<thread_base**>, 240> s_thread_pool;
static atomic_t<u128, 64> s_thread_bits{0};

static shared_mutex s_pool_lock;
static atomic_t<thread_base**> s_thread_pool[128]{};

void thread_base::start(native_entry entry)
{
while (u32 size = s_thread_pool.size())
for (u128 bits = s_thread_bits.load(); bits; bits &= bits - 1)
{
u32 pos = s_thread_pool.peek();
thread_base** tls = nullptr;
const u32 pos = utils::ctz128(bits);

for (u32 i = pos; i < pos + size; i++)
if (!s_thread_pool[pos])
{
auto val = s_thread_pool[i].load();

if (val && s_thread_pool[i].compare_and_swap_test(val, 0))
{
tls = val;
pos = i;
break;
}
continue;
}

if (tls)
{
// Send "this" and entry point
m_thread = reinterpret_cast<u64>(entry);
atomic_storage<thread_base*>::store(*tls, this);
s_thread_pool[pos].notify_one();

{
// Using it in MPMC manner is a bit tricky, since it's originally MPSC
std::lock_guard lock(s_pool_lock);
thread_base** tls = s_thread_pool[pos].exchange(nullptr);

u32 count = 0;

while (!s_thread_pool[s_thread_pool.peek() + count])
{
count++;

if (count >= s_thread_pool.size())
{
break;
}
}

if (count)
{
s_thread_pool.pop_end(count);
}
}
if (!tls)
{
continue;
}

// Wait for actual "m_thread" in return
while (m_thread == reinterpret_cast<u64>(entry))
{
busy_wait(300);
}
// Send "this" and entry point
m_thread = reinterpret_cast<u64>(entry);
atomic_storage<thread_base*>::release(*tls, this);
s_thread_pool[pos].notify_all();

return;
// Wait for actual "m_thread" in return
while (m_thread == reinterpret_cast<u64>(entry))
{
busy_wait(300);
}

return;
}

#ifdef _WIN32
Expand Down Expand Up @@ -1990,7 +1965,7 @@ void thread_base::notify_abort() noexcept
}
}

bool thread_base::finalize(thread_state result_state) noexcept
u64 thread_base::finalize(thread_state result_state) noexcept
{
// Report pending errors
error_code::error_report(0, 0, 0, 0);
Expand Down Expand Up @@ -2036,14 +2011,17 @@ bool thread_base::finalize(thread_state result_state) noexcept
g_tls_fault_spu,
fsoft, fhard, ctxvol, ctxinv);

const u64 _self = m_thread;
m_thread.release(0);

// Return true if need to delete thread object
const bool ok = m_state.exchange(result_state) <= thread_state::aborting;

// Signal waiting threads
m_state.notify_all();

// No detached thread supported atm
return !ok;
return _self;
}

void thread_base::finalize(u64 _self) noexcept
Expand All @@ -2056,23 +2034,54 @@ void thread_base::finalize(u64 _self) noexcept

atomic_wait_engine::set_wait_callback(nullptr);
g_tls_log_prefix = []() -> std::string { return {}; };
set_name("..pool");
thread_ctrl::g_tls_this_thread = nullptr;

// Try to add self to thread pool
const u32 pos = s_thread_pool.push_begin();
const auto [bits, ok] = s_thread_bits.fetch_op([](u128& bits)
{
if (~bits) [[likely]]
{
// Set lowest clear bit
bits |= bits + 1;
return true;
}

return false;
});

if (!ok)
{
#ifdef _WIN32
CloseHandle(reinterpret_cast<HANDLE>(_self));
#else
pthread_detach(reinterpret_cast<pthread_t>(_self));
#endif
return;
}

set_name("..pool");

// Obtain id from atomic op
const u32 pos = utils::ctz128(~bits);
const auto tls = &thread_ctrl::g_tls_this_thread;
s_thread_pool[pos] = tls;

while (s_thread_pool[pos] == tls || !atomic_storage<thread_base*>::load(thread_ctrl::g_tls_this_thread))
while (s_thread_pool[pos] == tls || !atomic_storage<thread_base*>::load(*tls))
{
s_thread_pool[pos].wait(tls);
}

// Free thread pool slot
s_thread_bits.atomic_op([pos](u128& val)
{
val &= ~(u128(1) << pos);
});

// Restore thread id
const auto _this = thread_ctrl::g_tls_this_thread;
const auto _this = atomic_storage<thread_base*>::load(*tls);
const auto entry = _this->m_thread.exchange(_self);
_this->m_thread.notify_one();

reinterpret_cast<native_entry>(entry)(_this);
}

Expand Down Expand Up @@ -2159,6 +2168,15 @@ thread_base::thread_base(std::string_view name)

thread_base::~thread_base()
{
if (m_thread)
{
#ifdef _WIN32
CloseHandle(reinterpret_cast<HANDLE>(m_thread.raw()));
#else
pthread_detach(reinterpret_cast<pthread_t>(m_thread.raw()));
#endif
}

}

bool thread_base::join() const
Expand Down
15 changes: 11 additions & 4 deletions Utilities/Thread.h
Expand Up @@ -128,7 +128,7 @@ class thread_base
void notify_abort() noexcept;

// Called at the thread end, returns true if needs destruction
bool finalize(thread_state result) noexcept;
u64 finalize(thread_state result) noexcept;

// Cleanup after possibly deleting the thread instance
static void finalize(u64 _self) noexcept;
Expand Down Expand Up @@ -286,21 +286,28 @@ class named_thread final : public Context, result_storage_t<Context>, thread_bas
static inline void* entry_point(void* arg)
#endif
{
if (auto _this = thread_ctrl::get_current())
{
arg = _this;
}

const auto _this = static_cast<named_thread*>(static_cast<thread*>(arg));

// Perform self-cleanup if necessary
if (_this->entry_point())
u64 _self = _this->entry_point();

if (!_self)
{
delete _this;
thread::finalize(0);
return 0;
}

thread::finalize(_this->thread::m_thread);
thread::finalize(_self);
return 0;
}

bool entry_point()
u64 entry_point()
{
auto tls_error_cb = []()
{
Expand Down
2 changes: 2 additions & 0 deletions rpcs3/Emu/CPU/CPUThread.cpp
Expand Up @@ -522,6 +522,8 @@ void cpu_thread::operator()()

s_tls_thread_slot = -1;

g_tls_current_cpu_thread = nullptr;

_this = nullptr;
}

Expand Down
48 changes: 48 additions & 0 deletions rpcs3/util/asm.hpp
Expand Up @@ -206,6 +206,30 @@ namespace utils
return r;
}

inline u32 ctz128(u128 arg)
{
if (u64 lo = static_cast<u64>(arg))
{
return std::countr_zero<u64>(lo);
}
else
{
return std::countr_zero<u64>(arg >> 64) + 64;
}
}

inline u32 clz128(u128 arg)
{
if (u64 hi = static_cast<u64>(arg >> 64))
{
return std::countl_zero<u64>(hi);
}
else
{
return std::countl_zero<u64>(arg) + 64;
}
}

#elif defined(_MSC_VER)
inline void prefetch_read(const void* ptr)
{
Expand Down Expand Up @@ -287,5 +311,29 @@ namespace utils

return r;
}

inline u32 ctz128(u128 arg)
{
if (!arg.lo)
{
return std::countr_zero(arg.hi) + 64u;
}
else
{
return std::countr_zero(arg.lo);
}
}

inline u32 clz128(u128 arg)
{
if (arg.hi)
{
return std::countl_zero(arg.hi);
}
else
{
return std::countl_zero(arg.lo) + 64;
}
}
#endif
} // namespace utils
54 changes: 42 additions & 12 deletions rpcs3/util/atomic.cpp
Expand Up @@ -532,7 +532,10 @@ static atomic_wait::cond_handle s_cond_list[UINT16_MAX + 1]{};
static atomic_t<u64, 64> s_cond_bits[(UINT16_MAX + 1) / 64]{};

// Allocation semaphore
static atomic_t<u32, 64> s_cond_sema{0};
static atomic_t<u32> s_cond_sema{0};

// Max possible search distance (max i in loop)
static atomic_t<u32> s_cond_max{0};

static u32
#ifdef _WIN32
Expand All @@ -548,16 +551,7 @@ cond_alloc(std::uintptr_t iptr, __m128i mask)
return 0;
}

// Diversify search start points to reduce contention and increase immediate success chance
#ifdef _WIN32
const u32 start = GetCurrentProcessorNumber();
#elif __linux__
const u32 start = sched_getcpu();
#else
const u32 start = __rdtsc();
#endif

for (u32 i = start;; i++)
for (u32 i = 0;; i++)
{
const u32 group = i % ::size32(s_cond_bits);

Expand Down Expand Up @@ -588,6 +582,18 @@ cond_alloc(std::uintptr_t iptr, __m128i mask)
s_cond_list[id].mask = mask;
s_cond_list[id].init(iptr);

// Update some stats
s_cond_max.fetch_op([i](u32& val)
{
if (val < i)
{
val = i;
return true;
}

return false;
});

return id;
}
}
Expand Down Expand Up @@ -1373,9 +1379,33 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
// Special operation mode. Note that this is not atomic.
if (!data)
{
if (!s_cond_sema)
{
return false;
}

// Special path: search thread_id without pointer information
for (u32 i = 1; i <= UINT16_MAX; i++)
for (u32 i = 1; i < (s_cond_max + 1) * 64; i++)
{
if ((i & 63) == 0)
{
for (u64 bits = s_cond_bits[i / 64]; bits; bits &= bits - 1)
{
utils::prefetch_read(s_cond_list + i + std::countr_zero(bits));
}
}

if (!s_cond_bits[i / 64])
{
i |= 63;
continue;
}

if (~s_cond_bits[i / 64] & (1ull << i))
{
continue;
}

const auto cond = s_cond_list + i;

const auto [old, ok] = cond->ptr_ref.fetch_op([&](u64& val)
Expand Down