Skip to content

Commit

Permalink
Rewrite spu_channel_4_t
Browse files Browse the repository at this point in the history
* Make spu_channel_4_t::clear() atomic:
Clear the last value before zeroing count, fixes a rare race introduced by RPCS3#6917.

* Optimize push() to use vectorization and less atomic instructions.

* Optimize try_pop() to use less atomic instructions
  • Loading branch information
elad335 committed Oct 31, 2019
1 parent 03a2d36 commit d062461
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 70 deletions.
4 changes: 2 additions & 2 deletions rpcs3/Emu/Cell/SPUASMJITRecompiler.cpp
Expand Up @@ -1736,8 +1736,8 @@ void spu_recompiler::RCHCNT(spu_opcode_t op)
case SPU_RdInMbox:
{
const XmmLink& vr = XmmAlloc();
c->movdqa(vr, SPU_OFF_128(ch_in_mbox));
c->pslldq(vr, 14);
c->movdqa(vr, SPU_OFF_128(ch_in_mbox, &spu_channel_4_t::control));
c->pslldq(vr, 15);
c->psrldq(vr, 3);
c->movdqa(SPU_OFF_128(gpr, op.rt), vr);
return;
Expand Down
5 changes: 2 additions & 3 deletions rpcs3/Emu/Cell/SPURecompiler.cpp
Expand Up @@ -5574,9 +5574,8 @@ class spu_llvm_recompiler : public spu_recompiler_base, public cpu_translator
}
case SPU_RdInMbox:
{
res.value = m_ir->CreateLoad(spu_ptr<u32>(&spu_thread::ch_in_mbox), true);
res.value = m_ir->CreateLShr(res.value, 8);
res.value = m_ir->CreateAnd(res.value, 7);
res.value = m_ir->CreateLoad(spu_ptr<u32>(&spu_thread::ch_in_mbox, &spu_channel_4_t::control), true);
res.value = m_ir->CreateAnd(res.value, 0xff);
break;
}
case SPU_RdEventStat:
Expand Down
92 changes: 39 additions & 53 deletions rpcs3/Emu/Cell/SPUThread.h
Expand Up @@ -262,49 +262,49 @@ struct spu_channel

struct spu_channel_4_t
{
struct alignas(16) sync_var_t
{
u8 waiting;
u8 count;
u32 value0;
u32 value1;
u32 value2;
};
v128 values;
atomic_t<u32> control;

static constexpr u32 lock_bit = 1u << 31;
static constexpr u32 bit_wait = 1u << 30;

private:

atomic_t<sync_var_t> values;
atomic_t<u32> value3;
[[nodiscard]] u32 lock()
{
return control.fetch_wait_op([](u32& value)
{
const bool res = !(value & lock_bit);
value |= lock_bit;
return res;
});
}

public:
void clear()
{
values.release({});
value3.release(0);
static_cast<void>(lock());
values.clear();
control.release(0);
}

// push unconditionally (overwriting latest value), returns true if needs signaling
void push(cpu_thread& spu, u32 value)
{
value3.store(value);
u32 count = lock();

if (values.atomic_op([=](sync_var_t& data) -> bool
if (LIKELY(count < 4))
{
switch (data.count++)
{
case 0: data.value0 = value; break;
case 1: data.value1 = value; break;
case 2: data.value2 = value; break;
default: data.count = 4;
}
values._u32[count++] = value;
control.release(count);
return;
}

if (data.waiting)
{
data.waiting = 0;
count &= ~bit_wait;
values._u32[count] = value;
control.release(count);

return true;
}

return false;
}))
if (count == 0)
{
spu.notify();
}
Expand All @@ -313,38 +313,24 @@ struct spu_channel_4_t
// returns non-zero value on success: queue size before removal
uint try_pop(u32& out)
{
return values.atomic_op([&](sync_var_t& data)
{
const uint result = data.count;

if (result != 0)
{
data.waiting = 0;
data.count--;
out = data.value0;

data.value0 = data.value1;
data.value1 = data.value2;
data.value2 = this->value3;
}
else
{
data.waiting = 1;
}

return result;
});
const u32 count = lock() & 0xff;
const v128 res = values;
out = res._u32[0];
values.vi = _mm_srli_si128(res.vi, sizeof(u32));
control.release(count ? count - 1 : bit_wait);
return count;
}

u32 get_count()
{
return values.raw().count;
return control.raw() & 0xff;
}

void set_values(u32 count, u32 value0, u32 value1 = 0, u32 value2 = 0, u32 value3 = 0)
{
this->values.raw() = { 0, static_cast<u8>(count), value0, value1, value2 };
this->value3 = value3;
static_cast<void>(lock());
values = v128::from32(value0, value1, value2, value3);
control.release(count);
}
};

Expand Down
57 changes: 45 additions & 12 deletions rpcs3/util/atomic.hpp
Expand Up @@ -728,11 +728,17 @@ class atomic_t
return atomic_storage<type>::compare_exchange(m_data, cmp_and_old, exch);
}

// Atomically read data
type load() const
{
return atomic_storage<type>::load(m_data);
}

// Atomic operation; returns old value, or pair of old value and return value (cancel op if evaluates to false)
template <typename F, typename RT = std::invoke_result_t<F, T&>>
std::conditional_t<std::is_void_v<RT>, type, std::pair<type, RT>> fetch_op(F func)
{
type _new, old = atomic_storage<type>::load(m_data);
type _new, old = load();

while (true)
{
Expand All @@ -742,7 +748,7 @@ class atomic_t
{
std::invoke(func, _new);

if (LIKELY(atomic_storage<type>::compare_exchange(m_data, old, _new)))
if (LIKELY(compare_exchange(old, _new)))
{
return old;
}
Expand All @@ -751,19 +757,52 @@ class atomic_t
{
RT ret = std::invoke(func, _new);

if (LIKELY(!ret || atomic_storage<type>::compare_exchange(m_data, old, _new)))
if (LIKELY(!ret || compare_exchange(old, _new)))
{
return {old, std::move(ret)};
}
}
}
}

// Atomic operation; returns old value (repeat op if evaluates to false)
template <typename F, typename RT = std::invoke_result_t<F, T&>>
RT fetch_wait_op(F func)
{
static_assert(std::is_convertible_v<RT, bool>);

type _new, old = load();

while (true)
{
_new = old;

const RT ret = std::invoke(func, _new);

if (UNLIKELY(!ret))
{
//if constexpr (use_futex_wait)
//{
// wait(old);
//}

_mm_pause();
old = load();
continue;
}

if (LIKELY(compare_exchange(old, _new)))
{
return old;
}
}
}

// Atomic operation; returns function result value, function is the lambda
template <typename F, typename RT = std::invoke_result_t<F, T&>>
RT atomic_op(F func)
{
type _new, old = atomic_storage<type>::load(m_data);
type _new, old = load();

while (true)
{
Expand All @@ -773,7 +812,7 @@ class atomic_t
{
std::invoke(func, _new);

if (LIKELY(atomic_storage<type>::compare_exchange(m_data, old, _new)))
if (LIKELY(compare_exchange(old, _new)))
{
return;
}
Expand All @@ -782,20 +821,14 @@ class atomic_t
{
RT result = std::invoke(func, _new);

if (LIKELY(atomic_storage<type>::compare_exchange(m_data, old, _new)))
if (LIKELY(compare_exchange(old, _new)))
{
return result;
}
}
}
}

// Atomically read data
type load() const
{
return atomic_storage<type>::load(m_data);
}

// Atomically read data
operator simple_type() const
{
Expand Down

0 comments on commit d062461

Please sign in to comment.