Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CommandQueueMT: Optimize & fix handling of sync/ret commands #90760

Merged
merged 1 commit into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 0 additions & 29 deletions core/templates/command_queue_mt.cpp
Expand Up @@ -41,35 +41,6 @@ void CommandQueueMT::unlock() {
mutex.unlock();
}

void CommandQueueMT::wait_for_flush() {
// wait one millisecond for a flush to happen
OS::get_singleton()->delay_usec(1000);
}

CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
int idx = -1;

while (true) {
lock();
for (int i = 0; i < SYNC_SEMAPHORES; i++) {
if (!sync_sems[i].in_use) {
sync_sems[i].in_use = true;
idx = i;
break;
}
}
unlock();

if (idx == -1) {
wait_for_flush();
} else {
break;
}
}

return &sync_sems[idx];
}

CommandQueueMT::CommandQueueMT() {
}

Expand Down
73 changes: 30 additions & 43 deletions core/templates/command_queue_mt.h
Expand Up @@ -32,9 +32,9 @@
#define COMMAND_QUEUE_MT_H

#include "core/object/worker_thread_pool.h"
#include "core/os/condition_variable.h"
#include "core/os/memory.h"
#include "core/os/mutex.h"
#include "core/os/semaphore.h"
#include "core/string/print_string.h"
#include "core/templates/local_vector.h"
#include "core/templates/simple_type.h"
Expand Down Expand Up @@ -251,74 +251,64 @@
#define DECL_PUSH(N) \
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
MutexLock mlock(mutex); \
CMD_TYPE(N) *cmd = allocate<CMD_TYPE(N)>(); \
cmd->instance = p_instance; \
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
}

#define CMD_RET_TYPE(N) CommandRet##N<T, M, COMMA_SEP_LIST(TYPE_ARG, N) COMMA(N) R>

#define DECL_PUSH_AND_RET(N) \
template <typename T, typename M, COMMA_SEP_LIST(TYPE_PARAM, N) COMMA(N) typename R> \
void push_and_ret(T *p_instance, M p_method, COMMA_SEP_LIST(PARAM, N) COMMA(N) R *r_ret) { \
SyncSemaphore *ss = _alloc_sync_sem(); \
CMD_RET_TYPE(N) *cmd = allocate_and_lock<CMD_RET_TYPE(N)>(); \
MutexLock mlock(mutex); \
CMD_RET_TYPE(N) *cmd = allocate<CMD_RET_TYPE(N)>(); \
cmd->instance = p_instance; \
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->ret = r_ret; \
cmd->sync_sem = ss; \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
ss->sem.wait(); \
ss->in_use = false; \
sync_tail++; \
_wait_for_sync(mlock); \
}

#define CMD_SYNC_TYPE(N) CommandSync##N<T, M COMMA(N) COMMA_SEP_LIST(TYPE_ARG, N)>

#define DECL_PUSH_AND_SYNC(N) \
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
void push_and_sync(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
SyncSemaphore *ss = _alloc_sync_sem(); \
CMD_SYNC_TYPE(N) *cmd = allocate_and_lock<CMD_SYNC_TYPE(N)>(); \
MutexLock mlock(mutex); \
CMD_SYNC_TYPE(N) *cmd = allocate<CMD_SYNC_TYPE(N)>(); \
cmd->instance = p_instance; \
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->sync_sem = ss; \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
ss->sem.wait(); \
ss->in_use = false; \
sync_tail++; \
_wait_for_sync(mlock); \
}

#define MAX_CMD_PARAMS 15

class CommandQueueMT {
struct SyncSemaphore {
Semaphore sem;
bool in_use = false;
};

struct CommandBase {
bool sync = false;
virtual void call() = 0;
virtual SyncSemaphore *get_sync_semaphore() { return nullptr; }
virtual ~CommandBase() = default; // Won't be called.
};

struct SyncCommand : public CommandBase {
SyncSemaphore *sync_sem = nullptr;

virtual SyncSemaphore *get_sync_semaphore() override {
return sync_sem;
virtual void call() override {}
SyncCommand() {
sync = true;
}
};

Expand All @@ -340,9 +330,11 @@ class CommandQueueMT {
SYNC_SEMAPHORES = 8
};

BinaryMutex mutex;
LocalVector<uint8_t> command_mem;
SyncSemaphore sync_sems[SYNC_SEMAPHORES];
Mutex mutex;
ConditionVariable sync_cond_var;
uint32_t sync_head = 0;
uint32_t sync_tail = 0;
WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
uint64_t flush_read_ptr = 0;

Expand All @@ -357,32 +349,23 @@ class CommandQueueMT {
return cmd;
}

template <typename T>
T *allocate_and_lock() {
lock();
T *ret = allocate<T>();
return ret;
}

void _flush() {
lock();

if (unlikely(flush_read_ptr)) {
// Re-entrant call.
unlock();
return;
}

lock();

WorkerThreadPool::thread_enter_command_queue_mt_flush(this);
while (flush_read_ptr < command_mem.size()) {
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
flush_read_ptr += 8;
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);

SyncSemaphore *sync_sem = cmd->get_sync_semaphore();
cmd->call();
if (sync_sem) {
sync_sem->sem.post(); // Release in case it needs sync/ret.
if (unlikely(cmd->sync)) {
sync_head++;
sync_cond_var.notify_all();
}

flush_read_ptr += size;
Expand All @@ -394,8 +377,12 @@ class CommandQueueMT {
unlock();
}

void wait_for_flush();
SyncSemaphore *_alloc_sync_sem();
_FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
uint32_t sync_head_goal = sync_tail;
do {
sync_cond_var.wait(p_lock);
} while (sync_head != sync_head_goal); // Can't use lower-than because of wraparound.
}

public:
void lock();
Expand Down