Skip to content

Commit

Permalink
atomic.cpp: add std primitive fallback for other platforms
Browse files Browse the repository at this point in the history
Other platforms = not Windows or Linux.
  • Loading branch information
Nekotekina committed Oct 25, 2020
1 parent eec2dfa commit af6fcb1
Showing 1 changed file with 195 additions and 7 deletions.
202 changes: 195 additions & 7 deletions rpcs3/util/atomic.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
#include "atomic.hpp"

// TODO: something for other platforms
#if defined(__linux__) || !defined(_WIN32)
#if defined(__linux__)
#define USE_FUTEX
#elif !defined(_WIN32)
#define USE_STD
#endif

#include "Utilities/sync.h"

#include <utility>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <iterator>
#include <memory>
#include <cstdlib>
Expand Down Expand Up @@ -70,6 +72,110 @@ static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u6
return false;
}

#ifdef USE_STD
namespace
{
// Standard CV/mutex pair
struct cond_handle
{
std::condition_variable cond;
std::mutex mtx;

cond_handle() noexcept
{
mtx.lock();
}
};
}

// Arbitrary max allowed thread number
static constexpr u32 s_max_conds = 512 * 64;

static std::aligned_storage_t<sizeof(cond_handle), alignof(cond_handle)> s_cond_list[s_max_conds]{};

alignas(64) atomic_t<u64> s_cond_bits[s_max_conds / 64];

alignas(64) atomic_t<u32> s_cond_sema{0};

static u32 cond_alloc()
{
// Determine whether there is a free slot or not
if (!s_cond_sema.try_inc(s_max_conds + 1))
{
return 0;
}

// Diversify search start points to reduce contention and increase immediate success chance
#ifdef _WIN32
const u32 start = GetCurrentProcessorNumber();
#elif __linux__
const u32 start = sched_getcpu();
#else
const u32 start = __rdtsc();
#endif

for (u32 i = start * 8;; i++)
{
const u32 group = i % (s_max_conds / 64);

const auto [bits, ok] = s_cond_bits[group].fetch_op([](u64& bits)
{
if (~bits)
{
// Set lowest clear bit
bits |= bits + 1;
return true;
}

return false;
});

if (ok)
{
// Find lowest clear bit
const u32 id = group * 64 + std::countr_one(bits);

// Construct inplace before it can be used
new (s_cond_list + id) cond_handle();

return id + 1;
}
}

// TODO: unreachable
std::abort();
return 0;
}

static cond_handle* cond_get(u32 cond_id)
{
if (cond_id - 1 < s_max_conds) [[likely]]
{
return std::launder(reinterpret_cast<cond_handle*>(s_cond_list + (cond_id - 1)));
}

return nullptr;
}

static void cond_free(u32 cond_id)
{
if (cond_id - 1 >= s_max_conds)
{
// Ignore bad id because it may contain notifier lock
return;
}

// Call the destructor
cond_get(cond_id)->~cond_handle();

// Remove the allocation bit
s_cond_bits[(cond_id - 1) / 64] &= ~(1ull << ((cond_id - 1) % 64));

// Release the semaphore
s_cond_sema--;
}
#endif

namespace
{
struct sync_var
Expand All @@ -87,6 +193,16 @@ namespace

atomic_t<u32>* sema_alloc()
{
#ifdef USE_STD
const u32 cond_id = cond_alloc();

if (cond_id == 0)
{
// Too many threads
return nullptr;
}
#endif

const auto [bits, ok] = sema_bits.fetch_op([](u64& bits)
{
if (bits + 1 < (1ull << 60))
Expand All @@ -104,7 +220,9 @@ namespace
// Find lowest clear bit
const auto sema = &sema_data[std::countr_one(bits)];

#if defined(USE_FUTEX)
#if defined(USE_STD)
sema->release(cond_id);
#elif defined(USE_FUTEX)
sema->release(1);
#elif defined(_WIN32)
if (NtWaitForAlertByThreadId)
Expand All @@ -131,8 +249,11 @@ namespace
}

// Clear sema
#ifdef USE_STD
cond_free(sema->exchange(0));
#else
sema->release(0);

#endif
// Clear sema bit
sema_bits &= ~(1ull << (sema - sema_data));
}
Expand Down Expand Up @@ -404,10 +525,15 @@ SAFE_BUFFERS void atomic_storage_futex::wait(const void* data, std::size_t size,
sema = slot->sema_alloc();
}

#ifdef USE_STD
// Create mutex for condition variable (already locked)
std::unique_lock lock(cond_get(sema->load() & 0x7fffffff)->mtx, std::adopt_lock);
#endif

// Can skip unqueue process if true
#if defined(USE_FUTEX)
const bool fallback = true;
#elif defined(_WIN32)
#if defined(USE_FUTEX) || defined(USE_STD)
constexpr bool fallback = true;
#else
bool fallback = false;
#endif

Expand All @@ -427,6 +553,25 @@ SAFE_BUFFERS void atomic_storage_futex::wait(const void* data, std::size_t size,
{
futex(sema, FUTEX_WAIT_PRIVATE, 1, timeout + 1 ? &ts : nullptr);
}
#elif defined(USE_STD)
const u32 val = sema->load();

if (val >> 31)
{
// Locked by notifier
if (!ptr_cmp(data, size, old_value, mask))
{
break;
}
}
else if (timeout + 1)
{
cond_get(val)->cond.wait_for(lock, std::chrono::nanoseconds(timeout));
}
else
{
cond_get(val)->cond.wait(lock);
}
#elif defined(_WIN32)
LARGE_INTEGER qw;
qw.QuadPart = -static_cast<s64>(timeout / 100);
Expand Down Expand Up @@ -525,6 +670,10 @@ SAFE_BUFFERS void atomic_storage_futex::wait(const void* data, std::size_t size,
verify(HERE), thread_id[0] == GetCurrentThreadId();
#endif

#ifdef USE_STD
lock.unlock();
#endif

slot->sema_free(sema);

slot_free(iptr, &s_hashtable[iptr % s_hashtable_size]);
Expand All @@ -542,6 +691,45 @@ static inline bool alert_sema(atomic_t<u32>* sema)
futex(sema, FUTEX_WAKE_PRIVATE, 0x7fff'ffff);
return true;
}
#elif defined(USE_STD)
// Check if not zero and not locked
u32 old_val = sema->load();

if (((old_val - 1) >> 31) == 0)
{
const auto [cond_id, ok] = sema->fetch_op([](u32& id)
{
if ((id - 1) >> 31)
{
return false;
}

// Set notify lock
id |= 1u << 31;
return true;
});

if (ok)
{
if (auto cond = cond_get(cond_id))
{
// Not super efficient: locking is required to avoid lost notifications
cond->mtx.lock();
cond->mtx.unlock();
cond->cond.notify_all();

// Try to remove notifier lock gracefully
if (!sema->compare_and_swap_test(cond_id | (1u << 31), cond_id)) [[unlikely]]
{
// Cleanup helping
cond_free(cond_id);
return false;
}

return true;
}
}
}
#elif defined(_WIN32)
if (NtWaitForAlertByThreadId)
{
Expand Down

0 comments on commit af6fcb1

Please sign in to comment.