From cfda4d0ade2e7c1732aec00efe719d3b762ae690 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Fri, 13 Nov 2020 05:32:50 +0300 Subject: [PATCH 1/3] atomic.cpp: optimize raw_notify() for unspecified pointer mode Remove unnecessary optimization from cond_alloc(). Optimistic case was absolutely dominating anyway. Although the whole function is a dirty hack. Now scanning through all threads is faster. --- rpcs3/util/atomic.cpp | 54 +++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index a3bb98350793..8005c97f5172 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -532,7 +532,10 @@ static atomic_wait::cond_handle s_cond_list[UINT16_MAX + 1]{}; static atomic_t s_cond_bits[(UINT16_MAX + 1) / 64]{}; // Allocation semaphore -static atomic_t s_cond_sema{0}; +static atomic_t s_cond_sema{0}; + +// Max possible search distance (max i in loop) +static atomic_t s_cond_max{0}; static u32 #ifdef _WIN32 @@ -548,16 +551,7 @@ cond_alloc(std::uintptr_t iptr, __m128i mask) return 0; } - // Diversify search start points to reduce contention and increase immediate success chance -#ifdef _WIN32 - const u32 start = GetCurrentProcessorNumber(); -#elif __linux__ - const u32 start = sched_getcpu(); -#else - const u32 start = __rdtsc(); -#endif - - for (u32 i = start;; i++) + for (u32 i = 0;; i++) { const u32 group = i % ::size32(s_cond_bits); @@ -588,6 +582,18 @@ cond_alloc(std::uintptr_t iptr, __m128i mask) s_cond_list[id].mask = mask; s_cond_list[id].init(iptr); + // Update some stats + s_cond_max.fetch_op([i](u32& val) + { + if (val < i) + { + val = i; + return true; + } + + return false; + }); + return id; } } @@ -1373,9 +1379,33 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) // Special operation mode. Note that this is not atomic. if (!data) { + if (!s_cond_sema) + { + return false; + } + // Special path: search thread_id without pointer information - for (u32 i = 1; i <= UINT16_MAX; i++) + for (u32 i = 1; i < (s_cond_max + 1) * 64; i++) { + if ((i & 63) == 0) + { + for (u64 bits = s_cond_bits[i / 64]; bits; bits &= bits - 1) + { + utils::prefetch_read(s_cond_list + i + std::countl_zero(bits)); + } + } + + if (!s_cond_bits[i / 64]) + { + i |= 63; + continue; + } + + if (~s_cond_bits[i / 64] & (1ull << i)) + { + continue; + } + const auto cond = s_cond_list + i; const auto [old, ok] = cond->ptr_ref.fetch_op([&](u64& val) From e48f160a2973e8c466dcd0e5128d7327cef780f8 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Fri, 13 Nov 2020 05:36:39 +0300 Subject: [PATCH 2/3] atomic.cpp: continuation of fixing all-ones masks Also added missing const noexcept. --- rpcs3/util/atomic.cpp | 2 +- rpcs3/util/atomic.hpp | 35 +++++++++++++++++++++++++---------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index 8005c97f5172..b3bbc7bb1531 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -1391,7 +1391,7 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id) { for (u64 bits = s_cond_bits[i / 64]; bits; bits &= bits - 1) { - utils::prefetch_read(s_cond_list + i + std::countl_zero(bits)); + utils::prefetch_read(s_cond_list + i + std::countr_zero(bits)); } } diff --git a/rpcs3/util/atomic.hpp b/rpcs3/util/atomic.hpp index 1a874be975a5..c1746214be63 100644 --- a/rpcs3/util/atomic.hpp +++ b/rpcs3/util/atomic.hpp @@ -72,24 +72,25 @@ namespace atomic_wait __m128i mask; template - constexpr void set_value(T value) + static constexpr __m128i get_value(T value = T{}) { static_assert((sizeof(T) & (sizeof(T) - 1)) == 0); static_assert(sizeof(T) <= 16); if constexpr (sizeof(T) <= 8) { - old = _mm_cvtsi64_si128(std::bit_cast, T>(value)); + return _mm_cvtsi64_si128(std::bit_cast, T>(value)); } else if constexpr (sizeof(T) == 16) { - old = std::bit_cast<__m128i>(value); + return std::bit_cast<__m128i>(value); } } - void set_value() + template + constexpr void set_value(T value = T{}) { - old = _mm_setzero_si128(); + old = get_value(); } template @@ -108,9 +109,23 @@ namespace atomic_wait } } - void set_mask() + template + constexpr void set_mask() + { + mask = get_mask(); + } + + template + static constexpr __m128i get_mask() { - mask = _mm_set1_epi64x(-1); + if constexpr (sizeof(T) <= 8) + { + return _mm_cvtsi64_si128(UINT64_MAX >> ((64 - sizeof(T) * 8) & 63)); + } + else + { + return _mm_set1_epi64x(-1); + } } }; @@ -131,7 +146,7 @@ namespace atomic_wait template constexpr list(atomic_t&... vars) - : m_info{{&vars.raw(), sizeof(U), _mm_setzero_si128(), _mm_set1_epi64x(-1)}...} + : m_info{{&vars.raw(), sizeof(U), info::get_value(), info::get_mask()}...} { static_assert(sizeof...(U) <= Max); } @@ -164,7 +179,7 @@ namespace atomic_wait m_info[Index].data = &var.raw(); m_info[Index].size = sizeof(T2) | (static_cast(Flags) << 8); m_info[Index].template set_value(value); - m_info[Index].mask = _mm_set1_epi64x(-1); + m_info[Index].template set_mask(); } template @@ -1447,7 +1462,7 @@ class atomic_t // Overload with mask (only selected bits are checked), timeout is discouraged template - void wait(type old_value, type mask_value, atomic_wait_timeout timeout = atomic_wait_timeout::inf) + void wait(type old_value, type mask_value, atomic_wait_timeout timeout = atomic_wait_timeout::inf) const noexcept { if constexpr (sizeof(T) <= 8) { From ab365fe494a67000bfb97c97d688c90137868e5c Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Fri, 13 Nov 2020 11:32:47 +0300 Subject: [PATCH 3/3] Fixed thread pool a bit Use 128-bit allocator instead of queue. When pool is full (128), threads just terminate as before. --- Utilities/Thread.cpp | 124 +++++++++++++++++++++--------------- Utilities/Thread.h | 15 +++-- rpcs3/Emu/CPU/CPUThread.cpp | 2 + rpcs3/util/asm.hpp | 48 ++++++++++++++ 4 files changed, 132 insertions(+), 57 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index 59f69664dc18..510c53acc811 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -75,6 +75,7 @@ #include "sync.h" #include "util/vm.hpp" #include "util/logs.hpp" +#include "util/asm.hpp" #include "Emu/Memory/vm_locking.h" LOG_CHANNEL(sig_log, "SIG"); @@ -1842,66 +1843,40 @@ thread_local DECLARE(thread_ctrl::g_tls_error_callback) = nullptr; DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined }; -static lf_fifo, 240> s_thread_pool; +static atomic_t s_thread_bits{0}; -static shared_mutex s_pool_lock; +static atomic_t s_thread_pool[128]{}; void thread_base::start(native_entry entry) { - while (u32 size = s_thread_pool.size()) + for (u128 bits = s_thread_bits.load(); bits; bits &= bits - 1) { - u32 pos = s_thread_pool.peek(); - thread_base** tls = nullptr; + const u32 pos = utils::ctz128(bits); - for (u32 i = pos; i < pos + size; i++) + if (!s_thread_pool[pos]) { - auto val = s_thread_pool[i].load(); - - if (val && s_thread_pool[i].compare_and_swap_test(val, 0)) - { - tls = val; - pos = i; - break; - } + continue; } - if (tls) - { - // Send "this" and entry point - m_thread = reinterpret_cast(entry); - atomic_storage::store(*tls, this); - s_thread_pool[pos].notify_one(); - - { - // Using it in MPMC manner is a bit tricky, since it's originally MPSC - std::lock_guard lock(s_pool_lock); + thread_base** tls = s_thread_pool[pos].exchange(nullptr); - u32 count = 0; - - while (!s_thread_pool[s_thread_pool.peek() + count]) - { - count++; - - if (count >= s_thread_pool.size()) - { - break; - } - } - - if (count) - { - s_thread_pool.pop_end(count); - } - } + if (!tls) + { + continue; + } - // Wait for actual "m_thread" in return - while (m_thread == reinterpret_cast(entry)) - { - busy_wait(300); - } + // Send "this" and entry point + m_thread = reinterpret_cast(entry); + atomic_storage::release(*tls, this); + s_thread_pool[pos].notify_all(); - return; + // Wait for actual "m_thread" in return + while (m_thread == reinterpret_cast(entry)) + { + busy_wait(300); } + + return; } #ifdef _WIN32 @@ -1990,7 +1965,7 @@ void thread_base::notify_abort() noexcept } } -bool thread_base::finalize(thread_state result_state) noexcept +u64 thread_base::finalize(thread_state result_state) noexcept { // Report pending errors error_code::error_report(0, 0, 0, 0); @@ -2036,6 +2011,9 @@ bool thread_base::finalize(thread_state result_state) noexcept g_tls_fault_spu, fsoft, fhard, ctxvol, ctxinv); + const u64 _self = m_thread; + m_thread.release(0); + // Return true if need to delete thread object const bool ok = m_state.exchange(result_state) <= thread_state::aborting; @@ -2043,7 +2021,7 @@ bool thread_base::finalize(thread_state result_state) noexcept m_state.notify_all(); // No detached thread supported atm - return !ok; + return _self; } void thread_base::finalize(u64 _self) noexcept @@ -2056,23 +2034,54 @@ void thread_base::finalize(u64 _self) noexcept atomic_wait_engine::set_wait_callback(nullptr); g_tls_log_prefix = []() -> std::string { return {}; }; - set_name("..pool"); thread_ctrl::g_tls_this_thread = nullptr; // Try to add self to thread pool - const u32 pos = s_thread_pool.push_begin(); + const auto [bits, ok] = s_thread_bits.fetch_op([](u128& bits) + { + if (~bits) [[likely]] + { + // Set lowest clear bit + bits |= bits + 1; + return true; + } + + return false; + }); + + if (!ok) + { +#ifdef _WIN32 + CloseHandle(reinterpret_cast(_self)); +#else + pthread_detach(reinterpret_cast(_self)); +#endif + return; + } + + set_name("..pool"); + + // Obtain id from atomic op + const u32 pos = utils::ctz128(~bits); const auto tls = &thread_ctrl::g_tls_this_thread; s_thread_pool[pos] = tls; - while (s_thread_pool[pos] == tls || !atomic_storage::load(thread_ctrl::g_tls_this_thread)) + while (s_thread_pool[pos] == tls || !atomic_storage::load(*tls)) { s_thread_pool[pos].wait(tls); } + // Free thread pool slot + s_thread_bits.atomic_op([pos](u128& val) + { + val &= ~(u128(1) << pos); + }); + // Restore thread id - const auto _this = thread_ctrl::g_tls_this_thread; + const auto _this = atomic_storage::load(*tls); const auto entry = _this->m_thread.exchange(_self); _this->m_thread.notify_one(); + reinterpret_cast(entry)(_this); } @@ -2159,6 +2168,15 @@ thread_base::thread_base(std::string_view name) thread_base::~thread_base() { + if (m_thread) + { +#ifdef _WIN32 + CloseHandle(reinterpret_cast(m_thread.raw())); +#else + pthread_detach(reinterpret_cast(m_thread.raw())); +#endif + } + } bool thread_base::join() const diff --git a/Utilities/Thread.h b/Utilities/Thread.h index 6e290cb6acf3..5b1d5cc51557 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -128,7 +128,7 @@ class thread_base void notify_abort() noexcept; // Called at the thread end, returns true if needs destruction - bool finalize(thread_state result) noexcept; + u64 finalize(thread_state result) noexcept; // Cleanup after possibly deleting the thread instance static void finalize(u64 _self) noexcept; @@ -286,21 +286,28 @@ class named_thread final : public Context, result_storage_t, thread_bas static inline void* entry_point(void* arg) #endif { + if (auto _this = thread_ctrl::get_current()) + { + arg = _this; + } + const auto _this = static_cast(static_cast(arg)); // Perform self-cleanup if necessary - if (_this->entry_point()) + u64 _self = _this->entry_point(); + + if (!_self) { delete _this; thread::finalize(0); return 0; } - thread::finalize(_this->thread::m_thread); + thread::finalize(_self); return 0; } - bool entry_point() + u64 entry_point() { auto tls_error_cb = []() { diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 256be6c015ad..7a4c86eae1e8 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -522,6 +522,8 @@ void cpu_thread::operator()() s_tls_thread_slot = -1; + g_tls_current_cpu_thread = nullptr; + _this = nullptr; } diff --git a/rpcs3/util/asm.hpp b/rpcs3/util/asm.hpp index 86f7f363cd2a..a2291137ea24 100644 --- a/rpcs3/util/asm.hpp +++ b/rpcs3/util/asm.hpp @@ -206,6 +206,30 @@ namespace utils return r; } + inline u32 ctz128(u128 arg) + { + if (u64 lo = static_cast(arg)) + { + return std::countr_zero(lo); + } + else + { + return std::countr_zero(arg >> 64) + 64; + } + } + + inline u32 clz128(u128 arg) + { + if (u64 hi = static_cast(arg >> 64)) + { + return std::countl_zero(hi); + } + else + { + return std::countl_zero(arg) + 64; + } + } + #elif defined(_MSC_VER) inline void prefetch_read(const void* ptr) { @@ -287,5 +311,29 @@ namespace utils return r; } + + inline u32 ctz128(u128 arg) + { + if (!arg.lo) + { + return std::countr_zero(arg.hi) + 64u; + } + else + { + return std::countr_zero(arg.lo); + } + } + + inline u32 clz128(u128 arg) + { + if (arg.hi) + { + return std::countl_zero(arg.hi); + } + else + { + return std::countl_zero(arg.lo) + 64; + } + } #endif } // namespace utils