New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Load balanced thread pool #5
Conversation
while ((envIdx = nextTaskQueue.fetch_add( | ||
1, std::memory_order_acq_rel)) < | ||
numEnvs) { | ||
taskFunc(task, envIdx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The formatter is super aggressive. Probably set to 80 chars per line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic! Thanks a lot.
Maybe it does not help much currently but it might definitely help in the future when other types of envs are added.
if (renderer.isVulkan()) { | ||
renderer.reset(*envs[envIdx], envIdx); | ||
renderer.preDraw(*envs[envIdx], envIdx); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's good, I guess I never bothered to check. This actually means that we don't even have to fix this, because who cares about OpenGL renderer.
envs[envIdx]->reset(); | ||
|
||
// The vulkan renderer is fine with being reset in parallel | ||
if (renderer.isVulkan()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer if this was called supportsParallelReset
, but 99.99% we will never have another renderer :D
numReady.store(0, std::memory_order_relaxed); | ||
nextTaskQueue.store(0, std::memory_order_relaxed); | ||
std::atomic_thread_fence(std::memory_order_release); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this to use one fence for 2 instructions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, and the fence will also synchronize other things that aren't part of the atomic if needed.
if (envs[envIdx]->isDone()) { | ||
done[envIdx] = true; | ||
for (int agentIdx = 0; agentIdx < envs[envIdx]->getNumAgents(); | ||
++agentIdx) | ||
trueObjectives[envIdx][agentIdx] = | ||
envs[envIdx]->trueObjective(agentIdx); | ||
|
||
resetEnv(envIdx); | ||
} else { | ||
done[envIdx] = false; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in stepEnv()
|
||
cvTask.wait(lock, [this, &threadIdx] { | ||
return currTasks[threadIdx] != Task::IDLE; | ||
}); | ||
task = currTasks[threadIdx]; | ||
|
||
currTasks[threadIdx] = Task::IDLE; | ||
} | ||
|
||
int envIdx = 0; | ||
while ((envIdx = nextTaskQueue.fetch_add( | ||
1, std::memory_order_acq_rel)) < | ||
numEnvs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's pretty cool.
So we're using the expensive mutex/cond only once per cycle to wake up the thread, the rest is handled in the while loop. I quite like this
while (numReady.load(std::memory_order_acquire) < numThreads - 1) | ||
asm volatile("pause" ::: "memory"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, that's fair.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can thank Brennan for this one! He taught me this.
int envIdx = 0; | ||
while ((envIdx = nextTaskQueue.fetch_add(1, std::memory_order_acq_rel)) < | ||
int(envs.size())) { | ||
taskFunc(task, envIdx); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same loop as in the thread func. Perhaps can be a function.
Use a work queue to better load balanced the thread pool. I also added the ability to reset the envs in parallel and the vulkan rendering in parallel since it seems to be just OpenGL that doesn't like that. This doesn't really matter for the async RL setting as the extra latency due to bad load balancing doesn't matter. I made a kinda sync setting by setting SF to 1 worker and 1 env per worker, there FPS improves from 4.5k to 4.7k, so this helps, but not much.