Skip to content

Commit

Permalink
atomic.cpp: remove load() from notify functions
Browse files Browse the repository at this point in the history
Only compare masks for overlap for second overload (with mask provided).
Explicit "new value" can be provided in new 3-arg overloads.
Also rename atomic_storage_futex -> atomic_wait_engine.
  • Loading branch information
Nekotekina committed Nov 6, 2020
1 parent 9fb8d44 commit 87614e8
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 48 deletions.
6 changes: 3 additions & 3 deletions Utilities/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1848,7 +1848,7 @@ void thread_base::initialize(void (*error_cb)(), bool(*wait_cb)(const void*))
thread_ctrl::g_tls_error_callback = error_cb;

// Initialize atomic wait callback
atomic_storage_futex::set_wait_callback(wait_cb);
atomic_wait_engine::set_wait_callback(wait_cb);

g_tls_log_prefix = []
{
Expand Down Expand Up @@ -1908,7 +1908,7 @@ void thread_base::notify_abort() noexcept
while (auto ptr = m_state_notifier.load())
{
// Since this function is not perfectly implemented, run it in a loop
if (atomic_storage_futex::raw_notify(ptr, tid))
if (atomic_wait_engine::raw_notify(ptr, tid))
{
break;
}
Expand Down Expand Up @@ -1973,7 +1973,7 @@ bool thread_base::finalize(thread_state result_state) noexcept

void thread_base::finalize() noexcept
{
atomic_storage_futex::set_wait_callback(nullptr);
atomic_wait_engine::set_wait_callback(nullptr);
g_tls_log_prefix = []() -> std::string { return {}; };
thread_ctrl::g_tls_this_thread = nullptr;
}
Expand Down
4 changes: 2 additions & 2 deletions rpcs3/Emu/CPU/CPUThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ void cpu_thread::operator()()
return;
}

atomic_storage_futex::set_notify_callback([](const void*, u64 progress)
atomic_wait_engine::set_notify_callback([](const void*, u64 progress)
{
static thread_local bool wait_set = false;

Expand Down Expand Up @@ -514,7 +514,7 @@ void cpu_thread::operator()()
ptr->compare_and_swap(_this, nullptr);
}

atomic_storage_futex::set_notify_callback(nullptr);
atomic_wait_engine::set_notify_callback(nullptr);

g_tls_log_control = [](const char*, u64){};

Expand Down
49 changes: 29 additions & 20 deletions rpcs3/util/atomic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12
return true;
}

// Compare only masks, new value is not available in this mode
if ((size1 | size2) == umax)
{
// Simple mask overlap
const auto v0 = _mm_and_si128(mask1, mask2);
const auto v1 = _mm_packs_epi16(v0, v0);
return _mm_cvtsi128_si64(v1) != 0;
}

// Generate masked value inequality bits
const auto v0 = _mm_and_si128(_mm_and_si128(mask1, mask2), _mm_xor_si128(val1, val2));

Expand Down Expand Up @@ -137,7 +146,7 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12
return true;
}

namespace
namespace atomic_wait
{
// Essentially a fat semaphore
struct alignas(64) cond_handle
Expand Down Expand Up @@ -184,7 +193,7 @@ namespace
}

// Max allowed thread number is chosen to fit in 16 bits
static std::aligned_storage_t<sizeof(cond_handle), alignof(cond_handle)> s_cond_list[UINT16_MAX]{};
static std::aligned_storage_t<sizeof(atomic_wait::cond_handle), alignof(atomic_wait::cond_handle)> s_cond_list[UINT16_MAX]{};

// Used to allow concurrent notifying
static atomic_t<u16> s_cond_refs[UINT16_MAX + 1]{};
Expand Down Expand Up @@ -234,7 +243,7 @@ static u32 cond_alloc()
const u32 id = group * 64 + std::countr_one(bits);

// Construct inplace before it can be used
new (s_cond_list + id) cond_handle();
new (s_cond_list + id) atomic_wait::cond_handle();

// Add first reference
verify(HERE), !s_cond_refs[id]++;
Expand All @@ -248,11 +257,11 @@ static u32 cond_alloc()
return 0;
}

static cond_handle* cond_get(u32 cond_id)
static atomic_wait::cond_handle* cond_get(u32 cond_id)
{
if (cond_id - 1 < u32{UINT16_MAX}) [[likely]]
{
return std::launder(reinterpret_cast<cond_handle*>(s_cond_list + (cond_id - 1)));
return std::launder(reinterpret_cast<atomic_wait::cond_handle*>(s_cond_list + (cond_id - 1)));
}

return nullptr;
Expand Down Expand Up @@ -322,7 +331,7 @@ static u32 cond_lock(atomic_t<u16>* sema)
return 0;
}

namespace
namespace atomic_wait
{
#define MAX_THREADS (56)

Expand Down Expand Up @@ -407,24 +416,24 @@ namespace
}

// Main hashtable for atomic wait.
alignas(128) static sync_var s_hashtable[s_hashtable_size]{};
alignas(128) static atomic_wait::sync_var s_hashtable[s_hashtable_size]{};

namespace
namespace atomic_wait
{
struct slot_info
{
constexpr slot_info() noexcept = default;

// Branch extension
sync_var branch[48 - s_hashtable_power]{};
atomic_wait::sync_var branch[48 - s_hashtable_power]{};
};
}

// Number of search groups (defines max slot branch count as gcount * 64)
#define MAX_SLOTS (4096)

// Array of slot branch objects
alignas(128) static slot_info s_slot_list[MAX_SLOTS]{};
alignas(128) static atomic_wait::slot_info s_slot_list[MAX_SLOTS]{};

// Allocation bits
static atomic_t<u64, 64> s_slot_bits[MAX_SLOTS / 64]{};
Expand Down Expand Up @@ -482,7 +491,7 @@ static u64 slot_alloc()

#undef MAX_SLOTS

static sync_var* slot_get(std::uintptr_t iptr, sync_var* loc, u64 lv = 0)
static atomic_wait::sync_var* slot_get(std::uintptr_t iptr, atomic_wait::sync_var* loc, u64 lv = 0)
{
if (!loc)
{
Expand Down Expand Up @@ -523,7 +532,7 @@ static void slot_free(u64 id)
s_slot_sema--;
}

static void slot_free(std::uintptr_t iptr, sync_var* loc, u64 lv = 0)
static void slot_free(std::uintptr_t iptr, atomic_wait::sync_var* loc, u64 lv = 0)
{
const u64 value = loc->addr_ref.load();

Expand Down Expand Up @@ -568,15 +577,15 @@ SAFE_BUFFERS void
#ifdef _WIN32
__vectorcall
#endif
atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 timeout, __m128i mask)
atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 timeout, __m128i mask)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);

// Allocated slot index
u64 slot_a = -1;

// Found slot object
sync_var* slot = nullptr;
atomic_wait::sync_var* slot = nullptr;

auto install_op = [&](u64& value) -> u64
{
Expand Down Expand Up @@ -616,7 +625,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti
// Search detail
u64 lv = 0;

for (sync_var* ptr = &s_hashtable[iptr % s_hashtable_size];;)
for (atomic_wait::sync_var* ptr = &s_hashtable[iptr % s_hashtable_size];;)
{
auto [_old, ok] = ptr->addr_ref.fetch_op(install_op);

Expand Down Expand Up @@ -899,7 +908,7 @@ alert_sema(atomic_t<u16>* sema, const void* data, u64 info, u32 size, __m128i ma
return ok;
}

void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data))
void atomic_wait_engine::set_wait_callback(bool(*cb)(const void* data))
{
if (cb)
{
Expand All @@ -911,7 +920,7 @@ void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data))
}
}

void atomic_storage_futex::set_notify_callback(void(*cb)(const void*, u64))
void atomic_wait_engine::set_notify_callback(void(*cb)(const void*, u64))
{
if (cb)
{
Expand All @@ -923,7 +932,7 @@ void atomic_storage_futex::set_notify_callback(void(*cb)(const void*, u64))
}
}

bool atomic_storage_futex::raw_notify(const void* data, u64 thread_id)
bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);

Expand Down Expand Up @@ -965,7 +974,7 @@ void
#ifdef _WIN32
__vectorcall
#endif
atomic_storage_futex::notify_one(const void* data, u32 size, __m128i mask, __m128i new_value)
atomic_wait_engine::notify_one(const void* data, u32 size, __m128i mask, __m128i new_value)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);

Expand Down Expand Up @@ -998,7 +1007,7 @@ SAFE_BUFFERS void
#ifdef _WIN32
__vectorcall
#endif
atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m128i new_value)
atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i new_value)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);

Expand Down
68 changes: 45 additions & 23 deletions rpcs3/util/atomic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ enum class atomic_wait_timeout : u64
inf = 0xffffffffffffffff,
};

// Unused externally
namespace atomic_wait
{
struct sync_var;
struct slot_info;
struct sema_handle;
}

// Helper for waitable atomics (as in C++20 std::atomic)
struct atomic_storage_futex
struct atomic_wait_engine
{
private:
template <typename T, std::size_t Align>
Expand Down Expand Up @@ -1242,12 +1250,12 @@ class atomic_t
if constexpr (sizeof(T) <= 8)
{
const __m128i old = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(old_value));
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
}
else if constexpr (sizeof(T) == 16)
{
const __m128i old = std::bit_cast<__m128i>(old_value);
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
}
}

Expand All @@ -1258,73 +1266,87 @@ class atomic_t
{
const __m128i old = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(old_value));
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i old = std::bit_cast<__m128i>(old_value);
const __m128i mask = std::bit_cast<__m128i>(mask_value);
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
}
}

void notify_one() noexcept
{
atomic_wait_engine::notify_one(&m_data, -1, _mm_set1_epi64x(-1), _mm_setzero_si128());
}

// Notify with mask, allowing to not wake up thread which doesn't wait on this mask
void notify_one(type mask_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_one(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
atomic_wait_engine::notify_one(&m_data, -1, mask, _mm_setzero_si128());
}
else if constexpr (sizeof(T) == 16)
{
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_one(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
const __m128i mask = std::bit_cast<__m128i>(mask_value);
atomic_wait_engine::notify_one(&m_data, -1, mask, _mm_setzero_si128());
}
}

void notify_one(type mask_value) noexcept
// Notify with mask and value, allowing to not wake up thread which doesn't wait on them
void notify_one(type mask_value, type new_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_one(&m_data, sizeof(T), mask, _new);
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(new_value));
atomic_wait_engine::notify_one(&m_data, sizeof(T), mask, _new);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i mask = std::bit_cast<__m128i>(mask_value);
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_one(&m_data, sizeof(T), mask, _new);
const __m128i _new = std::bit_cast<__m128i>(new_value);
atomic_wait_engine::notify_one(&m_data, sizeof(T), mask, _new);
}
}

void notify_all() noexcept
{
atomic_wait_engine::notify_all(&m_data, -1, _mm_set1_epi64x(-1), _mm_setzero_si128());
}

// Notify all threads with mask, allowing to not wake up threads which don't wait on them
void notify_all(type mask_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_all(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
atomic_wait_engine::notify_all(&m_data, sizeof(T), mask, _mm_setzero_si128());
}
else if constexpr (sizeof(T) == 16)
{
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_all(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
const __m128i mask = std::bit_cast<__m128i>(mask_value);
atomic_wait_engine::notify_all(&m_data, sizeof(T), mask, _mm_setzero_si128());
}
}

void notify_all(type mask_value) noexcept
// Notify all threads with mask and value, allowing to not wake up threads which don't wait on them
void notify_all(type mask_value, type new_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_all(&m_data, sizeof(T), mask, _new);
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(new_value));
atomic_wait_engine::notify_all(&m_data, sizeof(T), mask, _new);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i mask = std::bit_cast<__m128i>(mask_value);
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_all(&m_data, sizeof(T), mask, _new);
const __m128i _new = std::bit_cast<__m128i>(new_value);
atomic_wait_engine::notify_all(&m_data, sizeof(T), mask, _new);
}
}
};
Expand Down

0 comments on commit 87614e8

Please sign in to comment.