Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
PPU thread scheduler
  • Loading branch information
Nekotekina committed Feb 13, 2017
1 parent e496205 commit 598c90f
Show file tree
Hide file tree
Showing 41 changed files with 694 additions and 254 deletions.
20 changes: 14 additions & 6 deletions rpcs3/Emu/CPU/CPUThread.cpp
Expand Up @@ -89,18 +89,32 @@ cpu_thread::cpu_thread(u32 id)

bool cpu_thread::check_state()
{
bool cpu_sleep_called = false;

while (true)
{
if (test(state & cpu_flag::exit))
{
return true;
}

if (test(state & cpu_flag::signal) && state.test_and_reset(cpu_flag::signal))
{
cpu_sleep_called = false;
}

if (!test(state & (cpu_state_pause + cpu_flag::dbg_global_stop)))
{
break;
}

if (test(state & cpu_flag::suspend) && !cpu_sleep_called)
{
cpu_sleep();
cpu_sleep_called = true;
continue;
}

thread_ctrl::wait();
}

Expand All @@ -126,12 +140,6 @@ void cpu_thread::run()
notify();
}

void cpu_thread::set_signal()
{
verify("cpu_flag::signal" HERE), !state.test_and_set(cpu_flag::signal);
notify();
}

std::string cpu_thread::dump() const
{
return fmt::format("Type: %s\n" "State: %s\n", typeid(*this).name(), state.load());
Expand Down
11 changes: 4 additions & 7 deletions rpcs3/Emu/CPU/CPUThread.h
Expand Up @@ -8,7 +8,7 @@ enum class cpu_flag : u32
{
stop, // Thread not running (HLE, initial state)
exit, // Irreversible exit
suspend, // Thread paused
suspend, // Thread suspended
ret, // Callback return requested
signal, // Thread received a signal (HLE)

Expand Down Expand Up @@ -38,18 +38,12 @@ class cpu_thread : public named_thread
// Public thread state
atomic_t<bs_t<cpu_flag>> state{+cpu_flag::stop};

// Object associated with sleep state, possibly synchronization primitive (mutex, semaphore, etc.)
atomic_t<void*> owner{};

// Process thread state, return true if the checker must return
bool check_state();

// Run thread
void run();

// Set cpu_flag::signal
void set_signal();

// Check thread type
u32 id_type()
{
Expand All @@ -64,6 +58,9 @@ class cpu_thread : public named_thread

// Thread entry point function
virtual void cpu_task() = 0;

// Callback for cpu_flag::suspend
virtual void cpu_sleep() {}
};

inline cpu_thread* get_current_cpu_thread() noexcept
Expand Down
4 changes: 4 additions & 0 deletions rpcs3/Emu/Cell/Modules/cellAdec.cpp
Expand Up @@ -2,6 +2,7 @@
#include "Emu/System.h"
#include "Emu/IdManager.h"
#include "Emu/Cell/PPUModule.h"
#include "Emu/Cell/lv2/sys_sync.h"

extern "C"
{
Expand Down Expand Up @@ -199,6 +200,7 @@ class AudioDecoder : public ppu_thread
// TODO: finalize
cellAdec.warning("adecEndSeq:");
cbFunc(*this, id, CELL_ADEC_MSG_TYPE_SEQDONE, CELL_OK, cbArg);
lv2_obj::sleep(*this, -1);

just_finished = true;
break;
Expand Down Expand Up @@ -375,11 +377,13 @@ class AudioDecoder : public ppu_thread
{
frame.data = nullptr; // to prevent destruction
cbFunc(*this, id, CELL_ADEC_MSG_TYPE_PCMOUT, CELL_OK, cbArg);
lv2_obj::sleep(*this, -1);
}
}
}

cbFunc(*this, id, CELL_ADEC_MSG_TYPE_AUDONE, task.au.auInfo_addr, cbArg);
lv2_obj::sleep(*this, -1);
break;
}

Expand Down
7 changes: 7 additions & 0 deletions rpcs3/Emu/Cell/Modules/cellDmux.cpp
Expand Up @@ -2,6 +2,7 @@
#include "Emu/System.h"
#include "Emu/IdManager.h"
#include "Emu/Cell/PPUModule.h"
#include "Emu/Cell/lv2/sys_sync.h"

#include "cellPamf.h"
#include "cellDmux.h"
Expand Down Expand Up @@ -242,6 +243,7 @@ class Demuxer : public ppu_thread
dmuxMsg->msgType = CELL_DMUX_MSG_TYPE_DEMUX_DONE;
dmuxMsg->supplementalInfo = stream.userdata;
cbFunc(*this, id, dmuxMsg, cbArg);
lv2_obj::sleep(*this, -1);

is_working = false;

Expand Down Expand Up @@ -395,6 +397,7 @@ class Demuxer : public ppu_thread
esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_AU_FOUND;
esMsg->supplementalInfo = stream.userdata;
es.cbFunc(*this, id, es.id, esMsg, es.cbArg);
lv2_obj::sleep(*this, -1);
}
}
else
Expand Down Expand Up @@ -460,6 +463,7 @@ class Demuxer : public ppu_thread
esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_AU_FOUND;
esMsg->supplementalInfo = stream.userdata;
es.cbFunc(*this, id, es.id, esMsg, es.cbArg);
lv2_obj::sleep(*this, -1);
}

if (pes.has_ts)
Expand Down Expand Up @@ -536,6 +540,7 @@ class Demuxer : public ppu_thread
dmuxMsg->msgType = CELL_DMUX_MSG_TYPE_DEMUX_DONE;
dmuxMsg->supplementalInfo = stream.userdata;
cbFunc(*this, id, dmuxMsg, cbArg);
lv2_obj::sleep(*this, -1);

stream = {};

Expand Down Expand Up @@ -624,6 +629,7 @@ class Demuxer : public ppu_thread
esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_AU_FOUND;
esMsg->supplementalInfo = stream.userdata;
es.cbFunc(*this, id, es.id, esMsg, es.cbArg);
lv2_obj::sleep(*this, -1);
}

if (es.raw_data.size())
Expand All @@ -636,6 +642,7 @@ class Demuxer : public ppu_thread
esMsg->msgType = CELL_DMUX_ES_MSG_TYPE_FLUSH_DONE;
esMsg->supplementalInfo = stream.userdata;
es.cbFunc(*this, id, es.id, esMsg, es.cbArg);
lv2_obj::sleep(*this, -1);
break;
}

Expand Down
2 changes: 2 additions & 0 deletions rpcs3/Emu/Cell/Modules/cellFs.cpp
Expand Up @@ -4,6 +4,7 @@
#include "Emu/Cell/PPUModule.h"

#include "Emu/Cell/lv2/sys_fs.h"
#include "Emu/Cell/lv2/sys_sync.h"
#include "cellFs.h"

#include "Utilities/StrUtil.h"
Expand Down Expand Up @@ -742,6 +743,7 @@ struct fs_aio_thread : ppu_thread
}

func(*this, aio, error, xid, result);
lv2_obj::sleep(*this, -1);
}
}
};
Expand Down
48 changes: 24 additions & 24 deletions rpcs3/Emu/Cell/Modules/cellSpurs.cpp
Expand Up @@ -84,7 +84,7 @@ namespace _spurs
s32 add_default_syswkl(vm::ptr<CellSpurs> spurs, vm::cptr<u8> swlPriority, u32 swlMaxSpu, u32 swlIsPreem);

// Destroy the SPURS SPU threads and thread group
s32 finalize_spu(vm::ptr<CellSpurs> spurs);
s32 finalize_spu(ppu_thread&, vm::ptr<CellSpurs> spurs);

// Stop the event helper thread
s32 stop_event_helper(ppu_thread& ppu, vm::ptr<CellSpurs> spurs);
Expand Down Expand Up @@ -276,7 +276,7 @@ namespace _spurs
//s32 cellSpursEventFlagWait(ppu_thread& ppu, vm::ptr<CellSpursEventFlag> eventFlag, vm::ptr<u16> mask, u32 mode);
//s32 cellSpursEventFlagTryWait(ppu_thread& ppu, vm::ptr<CellSpursEventFlag> eventFlag, vm::ptr<u16> mask, u32 mode);
//s32 cellSpursEventFlagAttachLv2EventQueue(ppu_thread& ppu, vm::ptr<CellSpursEventFlag> eventFlag);
//s32 cellSpursEventFlagDetachLv2EventQueue(vm::ptr<CellSpursEventFlag> eventFlag);
//s32 cellSpursEventFlagDetachLv2EventQueue(ppu_thread& ppu, vm::ptr<CellSpursEventFlag> eventFlag);
//s32 cellSpursEventFlagGetDirection(vm::ptr<CellSpursEventFlag> eventFlag, vm::ptr<u32> direction);
//s32 cellSpursEventFlagGetClearMode(vm::ptr<CellSpursEventFlag> eventFlag, vm::ptr<u32> clear_mode);
//s32 cellSpursEventFlagGetTasksetAddress(vm::ptr<CellSpursEventFlag> eventFlag, vm::pptr<CellSpursTaskset> taskset);
Expand Down Expand Up @@ -380,7 +380,7 @@ s32 _spurs::create_lv2_eq(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, vm::ptr<u32

if (s32 rc = _spurs::attach_lv2_eq(ppu, spurs, *queueId, port, 1, true))
{
sys_event_queue_destroy(*queueId, SYS_EVENT_QUEUE_DESTROY_FORCE);
sys_event_queue_destroy(ppu, *queueId, SYS_EVENT_QUEUE_DESTROY_FORCE);
}

return CELL_OK;
Expand Down Expand Up @@ -574,7 +574,7 @@ void _spurs::handler_entry(ppu_thread& ppu, vm::ptr<CellSpurs> spurs)

CHECK_SUCCESS(sys_spu_thread_group_start(spurs->spuTG));

if (s32 rc = sys_spu_thread_group_join(spurs->spuTG, vm::null, vm::null))
if (s32 rc = sys_spu_thread_group_join(ppu, spurs->spuTG, vm::null, vm::null))
{
if (rc == CELL_ESTAT)
{
Expand Down Expand Up @@ -674,7 +674,7 @@ s32 _spurs::wakeup_shutdown_completion_waiter(ppu_thread& ppu, vm::ptr<CellSpurs
if (!wklF->hook || wklEvent->load() & 0x10)
{
verify(HERE), (wklF->x28 == 2);
rc = sys_semaphore_post((u32)wklF->sem, 1);
rc = sys_semaphore_post(ppu, (u32)wklF->sem, 1);
}

return rc;
Expand Down Expand Up @@ -712,11 +712,11 @@ void _spurs::event_helper_entry(ppu_thread& ppu, vm::ptr<CellSpurs> spurs)

for (u32 i = 0; i < CELL_SPURS_MAX_WORKLOAD; i++)
{
sys_semaphore_post((u32)spurs->wklF1[i].sem, 1);
sys_semaphore_post(ppu, (u32)spurs->wklF1[i].sem, 1);

if (spurs->flags1 & SF1_32_WORKLOADS)
{
sys_semaphore_post((u32)spurs->wklF2[i].sem, 1);
sys_semaphore_post(ppu, (u32)spurs->wklF2[i].sem, 1);
}
}
}
Expand Down Expand Up @@ -747,7 +747,7 @@ void _spurs::event_helper_entry(ppu_thread& ppu, vm::ptr<CellSpurs> spurs)
}
else if (data0 == 2)
{
CHECK_SUCCESS(sys_semaphore_post((u32)spurs->semPrv, 1));
CHECK_SUCCESS(sys_semaphore_post(ppu, (u32)spurs->semPrv, 1));
}
else if (data0 == 3)
{
Expand Down Expand Up @@ -775,7 +775,7 @@ s32 _spurs::create_event_helper(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, u32 p
return CELL_SPURS_CORE_ERROR_AGAIN;
}

sys_event_queue_destroy(spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE);
sys_event_queue_destroy(ppu, spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE);
return CELL_SPURS_CORE_ERROR_AGAIN;
}

Expand All @@ -788,7 +788,7 @@ s32 _spurs::create_event_helper(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, u32 p
return CELL_SPURS_CORE_ERROR_STAT;
}

sys_event_queue_destroy(spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE);
sys_event_queue_destroy(ppu, spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE);
return CELL_SPURS_CORE_ERROR_STAT;
}

Expand All @@ -814,7 +814,7 @@ s32 _spurs::create_event_helper(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, u32 p
return CELL_SPURS_CORE_ERROR_STAT;
}

sys_event_queue_destroy(spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE);
sys_event_queue_destroy(ppu, spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE);
return CELL_SPURS_CORE_ERROR_STAT;
}

Expand All @@ -839,13 +839,13 @@ s32 _spurs::add_default_syswkl(vm::ptr<CellSpurs> spurs, vm::cptr<u8> swlPriorit
return CELL_OK;
}

s32 _spurs::finalize_spu(vm::ptr<CellSpurs> spurs)
s32 _spurs::finalize_spu(ppu_thread& ppu, vm::ptr<CellSpurs> spurs)
{
if (spurs->flags & SAF_UNKNOWN_FLAG_7 || spurs->flags & SAF_UNKNOWN_FLAG_8)
{
while (true)
{
CHECK_SUCCESS(sys_spu_thread_group_join(spurs->spuTG, vm::null, vm::null));
CHECK_SUCCESS(sys_spu_thread_group_join(ppu, spurs->spuTG, vm::null, vm::null));

if (s32 rc = sys_spu_thread_group_destroy(spurs->spuTG))
{
Expand Down Expand Up @@ -880,7 +880,7 @@ s32 _spurs::stop_event_helper(ppu_thread& ppu, vm::ptr<CellSpurs> spurs)
return CELL_SPURS_CORE_ERROR_STAT;
}

if (sys_event_port_send(spurs->eventPort, 0, 1, 0) != CELL_OK)
if (sys_event_port_send(ppu, spurs->eventPort, 0, 1, 0) != CELL_OK)
{
return CELL_SPURS_CORE_ERROR_STAT;
}
Expand All @@ -895,7 +895,7 @@ s32 _spurs::stop_event_helper(ppu_thread& ppu, vm::ptr<CellSpurs> spurs)
CHECK_SUCCESS(sys_event_port_disconnect(spurs->eventPort));
CHECK_SUCCESS(sys_event_port_destroy(spurs->eventPort));
CHECK_SUCCESS(_spurs::detach_lv2_eq(spurs, spurs->spuPort, true));
CHECK_SUCCESS(sys_event_queue_destroy(spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE));
CHECK_SUCCESS(sys_event_queue_destroy(ppu, spurs->eventQueue, SYS_EVENT_QUEUE_DESTROY_FORCE));

return CELL_OK;
}
Expand Down Expand Up @@ -1141,15 +1141,15 @@ s32 _spurs::initialize(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, u32 revision,
// Create a mutex to protect access to SPURS handler thread data
if (s32 rc = sys_lwmutex_create(lwMutex, vm::make_var(sys_lwmutex_attribute_t{ SYS_SYNC_PRIORITY, SYS_SYNC_NOT_RECURSIVE, "_spuPrv" })))
{
_spurs::finalize_spu(spurs);
_spurs::finalize_spu(ppu, spurs);
return rollback(), rc;
}

// Create condition variable to signal the SPURS handler thread
if (s32 rc = sys_lwcond_create(lwCond, lwMutex, vm::make_var(sys_lwcond_attribute_t{ "_spuPrv" })))
{
sys_lwmutex_destroy(ppu, lwMutex);
_spurs::finalize_spu(spurs);
_spurs::finalize_spu(ppu, spurs);
return rollback(), rc;
}

Expand All @@ -1166,7 +1166,7 @@ s32 _spurs::initialize(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, u32 revision,
{
sys_lwcond_destroy(lwCond);
sys_lwmutex_destroy(ppu, lwMutex);
_spurs::finalize_spu(spurs);
_spurs::finalize_spu(ppu, spurs);
return rollback(), rc;
}

Expand All @@ -1176,7 +1176,7 @@ s32 _spurs::initialize(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, u32 revision,
_spurs::stop_event_helper(ppu, spurs);
sys_lwcond_destroy(lwCond);
sys_lwmutex_destroy(ppu, lwMutex);
_spurs::finalize_spu(spurs);
_spurs::finalize_spu(ppu, spurs);
return rollback(), rc;
}

Expand All @@ -1188,7 +1188,7 @@ s32 _spurs::initialize(ppu_thread& ppu, vm::ptr<CellSpurs> spurs, u32 revision,
_spurs::stop_event_helper(ppu, spurs);
sys_lwcond_destroy(lwCond);
sys_lwmutex_destroy(ppu, lwMutex);
_spurs::finalize_spu(spurs);
_spurs::finalize_spu(ppu, spurs);
return rollback(), rc;
}

Expand Down Expand Up @@ -2787,7 +2787,7 @@ s32 cellSpursEventFlagSet(ppu_thread& ppu, vm::ptr<CellSpursEventFlag> eventFlag
// Signal the PPU thread to be woken up
eventFlag->pendingRecvTaskEvents[ppuWaitSlot] = ppuEvents;

CHECK_SUCCESS(sys_event_port_send(eventFlag->eventPortId, 0, 0, 0));
CHECK_SUCCESS(sys_event_port_send(ppu, eventFlag->eventPortId, 0, 0, 0));
}

if (pendingRecv)
Expand Down Expand Up @@ -3067,7 +3067,7 @@ s32 cellSpursEventFlagAttachLv2EventQueue(ppu_thread& ppu, vm::ptr<CellSpursEven

if (_spurs::detach_lv2_eq(spurs, *port, true) == CELL_OK)
{
sys_event_queue_destroy(*eventQueueId, SYS_EVENT_QUEUE_DESTROY_FORCE);
sys_event_queue_destroy(ppu, *eventQueueId, SYS_EVENT_QUEUE_DESTROY_FORCE);
}

return failure(rc);
Expand All @@ -3077,7 +3077,7 @@ s32 cellSpursEventFlagAttachLv2EventQueue(ppu_thread& ppu, vm::ptr<CellSpursEven
}

/// Detach an LV2 event queue from SPURS event flag
s32 cellSpursEventFlagDetachLv2EventQueue(vm::ptr<CellSpursEventFlag> eventFlag)
s32 cellSpursEventFlagDetachLv2EventQueue(ppu_thread& ppu, vm::ptr<CellSpursEventFlag> eventFlag)
{
cellSpurs.warning("cellSpursEventFlagDetachLv2EventQueue(eventFlag=*0x%x)", eventFlag);

Expand Down Expand Up @@ -3131,7 +3131,7 @@ s32 cellSpursEventFlagDetachLv2EventQueue(vm::ptr<CellSpursEventFlag> eventFlag)

if (rc == CELL_OK)
{
rc = sys_event_queue_destroy(eventFlag->eventQueueId, SYS_EVENT_QUEUE_DESTROY_FORCE);
rc = sys_event_queue_destroy(ppu, eventFlag->eventQueueId, SYS_EVENT_QUEUE_DESTROY_FORCE);
}

return CELL_OK;
Expand Down

0 comments on commit 598c90f

Please sign in to comment.