Skip to content

Commit

Permalink
Linux: use futex_waitv syscall for atomic waiting
Browse files Browse the repository at this point in the history
In order to make this possible, some unnecessary features were removed.
  • Loading branch information
Nekotekina committed Aug 2, 2023
1 parent 380e991 commit ba3a16d
Show file tree
Hide file tree
Showing 51 changed files with 439 additions and 572 deletions.
73 changes: 47 additions & 26 deletions Utilities/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2172,14 +2172,14 @@ u64 thread_base::finalize(thread_state result_state) noexcept
const u64 _self = m_thread;

// Set result state (errored or finalized)
m_sync.fetch_op([&](u64& v)
m_sync.fetch_op([&](u32& v)
{
v &= -4;
v |= static_cast<u32>(result_state);
});

// Signal waiting threads
m_sync.notify_all(2);
m_sync.notify_all();

return _self;
}
Expand Down Expand Up @@ -2266,8 +2266,18 @@ thread_state thread_ctrl::state()

void thread_ctrl::wait_for(u64 usec, [[maybe_unused]] bool alert /* true */)
{
if (!usec)
{
return;
}

auto _this = g_tls_this_thread;

if (!alert && usec > 50000)
{
usec = 50000;
}

#ifdef __linux__
static thread_local struct linux_timer_handle_t
{
Expand Down Expand Up @@ -2296,13 +2306,13 @@ void thread_ctrl::wait_for(u64 usec, [[maybe_unused]] bool alert /* true */)
}
} fd_timer;

if (!alert && usec > 0 && usec <= 1000 && fd_timer != -1)
if (!alert && fd_timer != -1)
{
struct itimerspec timeout;
u64 missed;

timeout.it_value.tv_nsec = usec * 1'000ull;
timeout.it_value.tv_sec = 0;
timeout.it_value.tv_nsec = usec % 1'000'000 * 1'000ull;
timeout.it_value.tv_sec = usec / 1'000'000;
timeout.it_interval.tv_sec = 0;
timeout.it_interval.tv_nsec = 0;
timerfd_settime(fd_timer, 0, &timeout, NULL);
Expand All @@ -2312,15 +2322,27 @@ void thread_ctrl::wait_for(u64 usec, [[maybe_unused]] bool alert /* true */)
}
#endif

if (_this->m_sync.bit_test_reset(2) || _this->m_taskq)
if (alert)
{
return;
if (_this->m_sync.bit_test_reset(2) || _this->m_taskq)
{
return;
}
}

// Wait for signal and thread state abort
atomic_wait::list<2> list{};
list.set<0>(_this->m_sync, 0, 4 + 1);
list.set<1>(_this->m_taskq, nullptr);

if (alert)
{
list.set<0>(_this->m_sync, 0);
list.set<1>(utils::bless<atomic_t<u32>>(&_this->m_taskq)[1], 0);
}
else
{
list.set<0>(_this->m_dummy, 0);
}

list.wait(atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff});
}

Expand All @@ -2331,29 +2353,27 @@ void thread_ctrl::wait_for_accurate(u64 usec)
return;
}

if (usec > 50000)
{
fmt::throw_exception("thread_ctrl::wait_for_accurate: unsupported amount");
}

#ifdef __linux__
return wait_for(usec, false);
#else
using namespace std::chrono_literals;

const auto until = std::chrono::steady_clock::now() + 1us * usec;

while (true)
{
#ifdef __linux__
// NOTE: Assumption that timer initialization has succeeded
u64 host_min_quantum = usec <= 1000 ? 10 : 50;
#else
// Host scheduler quantum for windows (worst case)
// NOTE: On ps3 this function has very high accuracy
constexpr u64 host_min_quantum = 500;
#endif

if (usec >= host_min_quantum)
{
#ifdef __linux__
// Do not wait for the last quantum to avoid loss of accuracy
wait_for(usec - ((usec % host_min_quantum) + host_min_quantum), false);
#else
// Wait on multiple of min quantum for large durations to avoid overloading low thread cpus
wait_for(usec - (usec % host_min_quantum), false);
#endif
}
// TODO: Determine best value for yield delay
else if (usec >= host_min_quantum / 2)
Expand All @@ -2374,6 +2394,7 @@ void thread_ctrl::wait_for_accurate(u64 usec)

usec = (until - current).count();
}
#endif
}

std::string thread_ctrl::get_name_cached()
Expand Down Expand Up @@ -2440,7 +2461,7 @@ bool thread_base::join(bool dtor) const

for (u64 i = 0; (m_sync & 3) <= 1; i++)
{
m_sync.wait(0, 2, timeout);
m_sync.wait(m_sync & ~2, timeout);

if (m_sync & 2)
{
Expand All @@ -2460,7 +2481,7 @@ void thread_base::notify()
{
// Set notification
m_sync |= 4;
m_sync.notify_one(4);
m_sync.notify_all();
}

u64 thread_base::get_native_id() const
Expand Down Expand Up @@ -2497,7 +2518,7 @@ u64 thread_base::get_cycles()
{
cycles = static_cast<u64>(thread_time.tv_sec) * 1'000'000'000 + thread_time.tv_nsec;
#endif
if (const u64 old_cycles = m_sync.fetch_op([&](u64& v){ v &= 7; v |= (cycles << 3); }) >> 3)
if (const u64 old_cycles = m_cycles.exchange(cycles))
{
return cycles - old_cycles;
}
Expand All @@ -2507,7 +2528,7 @@ u64 thread_base::get_cycles()
}
else
{
return m_sync >> 3;
return m_cycles;
}
}

Expand Down Expand Up @@ -2560,8 +2581,8 @@ void thread_base::exec()
}

// Notify waiters
ptr->exec.release(nullptr);
ptr->exec.notify_all();
ptr->done.release(1);
ptr->done.notify_all();
}

if (ptr->next)
Expand Down
33 changes: 23 additions & 10 deletions Utilities/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,19 @@ class thread_future
protected:
atomic_t<void(*)(thread_base*, thread_future*)> exec{};

atomic_t<u32> done{0};

public:
// Get reference to the atomic variable for inspection and waiting for
const auto& get_wait() const
{
return exec;
return done;
}

// Wait (preset)
void wait() const
{
exec.wait<atomic_wait::op_ne>(nullptr);
done.wait(0);
}
};

Expand All @@ -131,8 +133,13 @@ class thread_base
// Thread handle (platform-specific)
atomic_t<u64> m_thread{0};

// Thread state and cycles
atomic_t<u64> m_sync{0};
// Thread cycles
atomic_t<u64> m_cycles{0};

atomic_t<u32> m_dummy{0};

// Thread state
atomic_t<u32> m_sync{0};

// Thread name
atomic_ptr<std::string> m_tname;
Expand Down Expand Up @@ -284,16 +291,22 @@ class thread_ctrl final
}

atomic_wait::list<Max + 2> list{};
list.template set<Max>(_this->m_sync, 0, 4 + 1);
list.template set<Max + 1>(_this->m_taskq, nullptr);
list.template set<Max>(_this->m_sync, 0);
list.template set<Max + 1>(_this->m_taskq);
setter(list);
list.wait(atomic_wait_timeout{usec <= 0xffff'ffff'ffff'ffff / 1000 ? usec * 1000 : 0xffff'ffff'ffff'ffff});
}

template <atomic_wait::op Op = atomic_wait::op::eq, typename T, typename U>
template <typename T, typename U>
static inline void wait_on(T& wait, U old, u64 usec = -1)
{
wait_on_custom<1>([&](atomic_wait::list<3>& list){ list.set<0, Op>(wait, old); }, usec);
wait_on_custom<1>([&](atomic_wait::list<3>& list) { list.template set<0>(wait, old); }, usec);
}

template <typename T>
static inline void wait_on(T& wait)
{
wait_on_custom<1>([&](atomic_wait::list<3>& list) { list.template set<0>(wait); });
}

// Exit.
Expand Down Expand Up @@ -637,7 +650,7 @@ class named_thread final : public Context, result_storage<Context>, thread_base
{
bool notify_sync = false;

if (s >= thread_state::aborting && thread::m_sync.fetch_op([](u64& v){ return !(v & 3) && (v |= 1); }).second)
if (s >= thread_state::aborting && thread::m_sync.fetch_op([](u32& v) { return !(v & 3) && (v |= 1); }).second)
{
notify_sync = true;
}
Expand All @@ -650,7 +663,7 @@ class named_thread final : public Context, result_storage<Context>, thread_base
if (notify_sync)
{
// Notify after context abortion has been made so all conditions for wake-up be satisfied by the time of notification
thread::m_sync.notify_one(1);
thread::m_sync.notify_all();
}

if (s == thread_state::finished)
Expand Down
6 changes: 3 additions & 3 deletions Utilities/cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ void cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept
ensure(_old);

// Wait with timeout
m_value.wait(_old, c_signal_mask, atomic_wait_timeout{_timeout > max_timeout ? umax : _timeout * 1000});
m_value.wait(_old, atomic_wait_timeout{_timeout > max_timeout ? umax : _timeout * 1000});

// Cleanup
m_value.atomic_op([](u32& value)
Expand Down Expand Up @@ -47,10 +47,10 @@ void cond_variable::imp_wake(u32 _count) noexcept
if (_count > 1 || ((_old + (c_signal_mask & (0 - c_signal_mask))) & c_signal_mask) == c_signal_mask)
{
// Resort to notify_all if signal count reached max
m_value.notify_all(c_signal_mask);
m_value.notify_all();
}
else
{
m_value.notify_one(c_signal_mask);
m_value.notify_one();
}
}
30 changes: 18 additions & 12 deletions Utilities/lockless.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "util/types.hpp"
#include "util/atomic.hpp"
#include "util/asm.hpp"

//! Simple unshrinkable array base for concurrent access. Only growths automatically.
//! There is no way to know the current size. The smaller index is, the faster it's accessed.
Expand Down Expand Up @@ -280,12 +281,17 @@ class lf_queue_slice
template <typename T>
class lf_queue final
{
atomic_t<lf_queue_item<T>*> m_head{nullptr};
atomic_t<u64> m_head{0};

lf_queue_item<T>* load(u64 value) const noexcept
{
return reinterpret_cast<lf_queue_item<T>*>(value >> 16);
}

// Extract all elements and reverse element order (FILO to FIFO)
lf_queue_item<T>* reverse() noexcept
{
if (auto* head = m_head.load() ? m_head.exchange(nullptr) : nullptr)
if (auto* head = load(m_head) ? load(m_head.exchange(0)) : nullptr)
{
if (auto* prev = head->m_link)
{
Expand All @@ -311,43 +317,43 @@ class lf_queue final

~lf_queue()
{
delete m_head.load();
delete load(m_head);
}

template <atomic_wait::op Flags = atomic_wait::op::eq>
void wait(std::nullptr_t /*null*/ = nullptr) noexcept
{
if (m_head == nullptr)
if (m_head == 0)
{
m_head.template wait<Flags>(nullptr);
utils::bless<atomic_t<u32>>(&m_head)[1].wait(0);
}
}

const volatile void* observe() const noexcept
{
return m_head.load();
return load(m_head);
}

explicit operator bool() const noexcept
{
return m_head != nullptr;
return m_head != 0;
}

template <typename... Args>
void push(Args&&... args)
{
auto _old = m_head.load();
auto oldv = m_head.load();
auto _old = load(oldv);
auto item = new lf_queue_item<T>(_old, std::forward<Args>(args)...);

while (!m_head.compare_exchange(_old, item))
while (!m_head.compare_exchange(oldv, reinterpret_cast<u64>(item) << 16))
{
item->m_link = _old;
}

if (!_old)
{
// Notify only if queue was empty
m_head.notify_one();
utils::bless<atomic_t<u32>>(&m_head)[1].notify_one();
}
}

Expand All @@ -363,7 +369,7 @@ class lf_queue final
lf_queue_slice<T> pop_all_reversed()
{
lf_queue_slice<T> result;
result.m_head = m_head.exchange(nullptr);
result.m_head = load(m_head.exchange(0));
return result;
}

Expand Down
4 changes: 2 additions & 2 deletions Utilities/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ void shared_mutex::imp_wait()
break;
}

m_value.wait(old, c_sig);
m_value.wait(old);
}
}

void shared_mutex::imp_signal()
{
m_value += c_sig;
m_value.notify_one(c_sig);
m_value.notify_one();
}

void shared_mutex::imp_lock(u32 val)
Expand Down
Loading

0 comments on commit ba3a16d

Please sign in to comment.