Skip to content

Commit

Permalink
Atomic waiting refactoring (#9208)
Browse files Browse the repository at this point in the history
* Use atomic waitables instead instead of global thread wait as often as possible.
* Add ::is_stopped() and and ::is_paued() which can be used in atomic loops and with atomic wait. (constexpr cpu flags test functions)
* Fix notification bug of sys_spu_thread_group_exit/terminate. (old bug, enhanced by #9117)
* Function time statistics at Emu.Stop() restored. (instead of current "X syscall failed with 0x00000000 : 0")
  • Loading branch information
elad335 committed Feb 13, 2021
1 parent cf38479 commit f43260b
Show file tree
Hide file tree
Showing 40 changed files with 371 additions and 230 deletions.
6 changes: 3 additions & 3 deletions Utilities/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1529,14 +1529,14 @@ bool handle_access_violation(u32 addr, bool is_writing, x64_context* context) no
else
{
// Wait until the thread is recovered
while (!cpu->state.test_and_reset(cpu_flag::signal))
while (auto state = cpu->state.fetch_sub(cpu_flag::signal))
{
if (cpu->is_stopped())
if (is_stopped(state) || state & cpu_flag::signal)
{
break;
}

thread_ctrl::wait();
thread_ctrl::wait_on(cpu->state, state);
}
}

Expand Down
33 changes: 25 additions & 8 deletions rpcs3/Emu/CPU/CPUThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ struct cpu_prof
if (threads.empty())
{
// Wait for messages if no work (don't waste CPU)
atomic_wait::list(registered).wait();
thread_ctrl::wait_on(registered, nullptr);
continue;
}

Expand Down Expand Up @@ -557,7 +557,14 @@ void cpu_thread::operator()()
while (!(state & cpu_flag::exit) && thread_ctrl::state() != thread_state::aborting)
{
// Check stop status
if (!(state & cpu_flag::stop))
const auto state0 = +state;

if (is_stopped(state0 - cpu_flag::stop))
{
break;
}

if (!(state0 & cpu_flag::stop))
{
cpu_task();

Expand All @@ -569,7 +576,7 @@ void cpu_thread::operator()()
continue;
}

thread_ctrl::wait();
thread_ctrl::wait_on(state, state0);

if (state & cpu_flag::ret && state.test_and_reset(cpu_flag::ret))
{
Expand All @@ -593,9 +600,9 @@ cpu_thread::cpu_thread(u32 id)
g_threads_created++;
}

void cpu_thread::cpu_wait()
void cpu_thread::cpu_wait(bs_t<cpu_flag> old)
{
thread_ctrl::wait();
thread_ctrl::wait_on(state, old);
}

bool cpu_thread::check_state() noexcept
Expand All @@ -607,6 +614,7 @@ bool cpu_thread::check_state() noexcept
while (true)
{
// Process all flags in a single atomic op
bs_t<cpu_flag> state1;
const auto state0 = state.fetch_op([&](bs_t<cpu_flag>& flags)
{
bool store = false;
Expand Down Expand Up @@ -660,7 +668,7 @@ bool cpu_thread::check_state() noexcept
}

// Atomically clean wait flag and escape
if (!(flags & (cpu_flag::exit + cpu_flag::ret + cpu_flag::stop)))
if (!(flags & (cpu_flag::exit + cpu_flag::ret + cpu_flag::stop)))
{
// Check pause flags which hold thread inside check_state (ignore suspend on cpu_flag::temp)
if (flags & (cpu_flag::pause + cpu_flag::dbg_global_pause + cpu_flag::dbg_pause + cpu_flag::memory + (cpu_can_stop ? cpu_flag::suspend : cpu_flag::pause)))
Expand All @@ -672,6 +680,7 @@ bool cpu_thread::check_state() noexcept
}

escape = false;
state1 = flags;
return store;
}

Expand Down Expand Up @@ -703,6 +712,7 @@ bool cpu_thread::check_state() noexcept
}

escape = true;
state1 = flags;
return store;
}).first;

Expand All @@ -714,6 +724,11 @@ bool cpu_thread::check_state() noexcept
cpu_counter::add(this);
}

if (retval)
{
cpu_on_stop();
}

ensure(cpu_can_stop || !retval);
return retval;
}
Expand All @@ -739,7 +754,7 @@ bool cpu_thread::check_state() noexcept
g_fxo->get<gdb_server>()->pause_from(this);
}

cpu_wait();
cpu_wait(state1);
}
else
{
Expand Down Expand Up @@ -799,6 +814,9 @@ void cpu_thread::notify()

void cpu_thread::abort()
{
state += cpu_flag::exit;
state.notify_one(cpu_flag::exit);

// Downcast to correct type
if (id_type() == 1)
{
Expand Down Expand Up @@ -1076,7 +1094,6 @@ void cpu_thread::stop_all() noexcept
{
auto on_stop = [](u32, cpu_thread& cpu)
{
cpu.state += cpu_flag::exit;
cpu.abort();
};

Expand Down
34 changes: 29 additions & 5 deletions rpcs3/Emu/CPU/CPUThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ enum class cpu_flag : u32
__bitset_enum_max
};

// Test stopped state
constexpr bool is_stopped(bs_t<cpu_flag> state)
{
return !!(state & (cpu_flag::stop + cpu_flag::exit));
}

// Test paused state
constexpr bool is_paused(bs_t<cpu_flag> state)
{
return !!(state & (cpu_flag::suspend + cpu_flag::dbg_global_pause + cpu_flag::dbg_pause));
}

class cpu_thread
{
public:
Expand Down Expand Up @@ -60,16 +72,25 @@ class cpu_thread
return false;
}

// Test stopped state
// Wrappers
static constexpr bool is_stopped(bs_t<cpu_flag> s)
{
return ::is_stopped(s);
}

static constexpr bool is_paused(bs_t<cpu_flag> s)
{
return ::is_paused(s);
}

bool is_stopped() const
{
return !!(state & (cpu_flag::stop + cpu_flag::exit));
return ::is_stopped(state);
}

// Test paused state
bool is_paused() const
{
return !!(state & (cpu_flag::suspend + cpu_flag::dbg_global_pause + cpu_flag::dbg_pause));
return ::is_paused(state);
}

bool has_pause_flag() const
Expand Down Expand Up @@ -122,7 +143,10 @@ class cpu_thread
virtual void cpu_return() {}

// Callback for thread_ctrl::wait or RSX wait
virtual void cpu_wait();
virtual void cpu_wait(bs_t<cpu_flag> flags);

// Callback for function abortion stats on Emu.Stop()
virtual void cpu_on_stop() {}

// For internal use
struct suspend_work
Expand Down
15 changes: 10 additions & 5 deletions rpcs3/Emu/Cell/Modules/cellMsgDialog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ struct msg_dlg_thread_info

if (new_value == 0)
{
wait_until.wait(0);
thread_ctrl::wait_on(wait_until, 0);
continue;
}

Expand Down Expand Up @@ -217,14 +217,19 @@ error_code open_msg_dialog(bool is_blocking, u32 type, vm::cptr<char> msgString,
lv2_obj::awake(&ppu);
});

while (!ppu.state.test_and_reset(cpu_flag::signal))
while (auto state = ppu.state.fetch_sub(cpu_flag::signal))
{
if (ppu.is_stopped())
if (is_stopped(state))
{
return 0;
return {};
}

thread_ctrl::wait();
if (state & cpu_flag::signal)
{
break;
}

thread_ctrl::wait_on(ppu.state, state);
}

if (is_blocking)
Expand Down
2 changes: 1 addition & 1 deletion rpcs3/Emu/Cell/Modules/cellSaveData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ static NEVER_INLINE error_code savedata_op(ppu_thread& ppu, u32 operation, u32 v
// Reschedule after a blocking dialog returns
if (ppu.check_state())
{
return 0;
return {};
}

if (res != CELL_OK)
Expand Down
2 changes: 1 addition & 1 deletion rpcs3/Emu/Cell/Modules/cellSysutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ error_code cellSysutilCheckCallback(ppu_thread& ppu)

if (ppu.is_stopped())
{
return 0;
return {};
}
}

Expand Down
2 changes: 1 addition & 1 deletion rpcs3/Emu/Cell/Modules/cellVdec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ static error_code vdecOpen(ppu_thread& ppu, T type, U res, vm::cptr<CellVdecCb>
});

thrd->state -= cpu_flag::stop;
thread_ctrl::notify(*thrd);
thrd->state.notify_one(cpu_flag::stop);

return CELL_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions rpcs3/Emu/Cell/Modules/sceNpTrophy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,11 +628,11 @@ error_code sceNpTrophyRegisterContext(ppu_thread& ppu, u32 context, u32 handle,
for (u32 old_value; current < until && (old_value = *queued);
current = get_system_time())
{
queued->wait(old_value, atomic_wait_timeout{(until - current) * 1000});
thread_ctrl::wait_on(*queued, old_value, until - current);

if (ppu.is_stopped())
{
return 0;
return {};
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rpcs3/Emu/Cell/PPUFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ using ppu_function_t = bool(*)(ppu_thread&);
ppu.current_function = #func;\
std::memcpy(ppu.syscall_args, ppu.gpr + 3, sizeof(ppu.syscall_args)); \
ppu_func_detail::do_call(ppu, func);\
static_cast<void>(ppu.test_stopped());\
ppu.current_function = old_f;\
ppu.cia += 4;\
__VA_ARGS__;\
Expand Down
44 changes: 24 additions & 20 deletions rpcs3/Emu/Cell/PPUThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,23 @@ void ppu_thread::cpu_sleep()
lv2_obj::awake(this);
}

void ppu_thread::cpu_on_stop()
{
if (current_function)
{
if (start_time)
{
ppu_log.warning("'%s' aborted (%fs)", current_function, (get_guest_system_time() - start_time) / 1000000.);
}
else
{
ppu_log.warning("'%s' aborted", current_function);
}

current_function = {};
}
}

void ppu_thread::exec_task()
{
if (g_cfg.core.ppu_decoder == ppu_decoder_type::llvm)
Expand Down Expand Up @@ -1138,20 +1155,18 @@ cmd64 ppu_thread::cmd_wait()
{
while (true)
{
if (state) [[unlikely]]
if (cmd64 result = cmd_queue[cmd_queue.peek()].exchange(cmd64{}))
{
if (is_stopped())
{
return cmd64{};
}
return result;
}

if (cmd64 result = cmd_queue[cmd_queue.peek()].exchange(cmd64{}))
if (is_stopped())
{
return result;
return {};
}

thread_ctrl::wait();
thread_ctrl::wait_on(cmd_notify, 0);
cmd_notify = 0;
}
}

Expand Down Expand Up @@ -1205,18 +1220,7 @@ void ppu_thread::fast_call(u32 addr, u32 rtoc)
{
if (std::uncaught_exceptions())
{
if (current_function)
{
if (start_time)
{
ppu_log.warning("'%s' aborted (%fs)", current_function, (get_guest_system_time() - start_time) / 1000000.);
}
else
{
ppu_log.warning("'%s' aborted", current_function);
}
}

cpu_on_stop();
current_function = old_func;
}
else
Expand Down
2 changes: 2 additions & 0 deletions rpcs3/Emu/Cell/PPUThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class ppu_thread : public cpu_thread
virtual std::string dump_misc() const override;
virtual void cpu_task() override final;
virtual void cpu_sleep() override;
virtual void cpu_on_stop() override;
virtual ~ppu_thread() override;

ppu_thread(const ppu_thread_params&, std::string_view name, u32 prio, int detached = 0);
Expand Down Expand Up @@ -257,6 +258,7 @@ class ppu_thread : public cpu_thread
void cmd_pop(u32 = 0);
cmd64 cmd_wait(); // Empty command means caller must return, like true from cpu_thread::check_status().
cmd64 cmd_get(u32 index) { return cmd_queue[cmd_queue.peek() + index].load(); }
atomic_t<u32> cmd_notify = 0;

const ppu_func_opd_t entry_func;
u64 start_time{0}; // Sleep start timepoint
Expand Down
2 changes: 1 addition & 1 deletion rpcs3/Emu/Cell/RawSPUThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ inline void try_start(spu_thread& spu)
}).second)
{
spu.state -= cpu_flag::stop;
thread_ctrl::notify(static_cast<named_thread<spu_thread>&>(spu));
spu.state.notify_one(cpu_flag::stop);
}
};

Expand Down
2 changes: 1 addition & 1 deletion rpcs3/Emu/Cell/SPURecompiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9076,7 +9076,7 @@ struct spu_llvm
{
// Interrupt profiler thread and put it to sleep
static_cast<void>(prof_mutex.reset());
atomic_wait::list(registered).wait(); // TODO
thread_ctrl::wait_on(registered, nullptr);
continue;
}

Expand Down

0 comments on commit f43260b

Please sign in to comment.