Skip to content

Commit

Permalink
[job] job workers now become inactive depending on the aggression. by…
Browse files Browse the repository at this point in the history
… default, the aggression is 0.0f, meaning only one worker is enabled. if the application is going to need more firepower, it should set the aggression via tz::job_system().set_aggression(a) where a is between 0.0 and 1.0. 0.0 means very little jobs flying around (low throughput, low cpu usage), and 1.0 means send everything (maximum throughput, 100% cpu utilisation). it is now upto the user to tune the aggression based on their load. in addition, the value returned by tz::job_system().worker_count() now may be smaller than the true hardware thread count, depending on aggression. with 1.0 aggression though, its all the threads.
  • Loading branch information
harrand committed Jan 29, 2024
1 parent d32762c commit a2ac882
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 6 deletions.
53 changes: 49 additions & 4 deletions src/tz/core/job/impl/concurrentqueue_blocking/job.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "tz/core/job/impl/concurrentqueue_blocking/job.hpp"
#include "tz/core/debug.hpp"
#include "tz/core/profile.hpp"
#include <atomic>
#include <limits>
#include <chrono>

Expand Down Expand Up @@ -174,7 +175,7 @@ namespace tz::impl

std::size_t job_system_blockingcurrentqueue::worker_count() const
{
return this->thread_pool.size();
return std::max(static_cast<unsigned int>(this->thread_pool.size() * this->aggression), 1u);
}

//--------------------------------------------------------------------------------------------------
Expand All @@ -197,6 +198,20 @@ namespace tz::impl
return this->jobs_created_this_frame.load();
}

//--------------------------------------------------------------------------------------------------

float job_system_blockingcurrentqueue::get_aggression() const
{
return this->aggression.load();
}

//--------------------------------------------------------------------------------------------------

void job_system_blockingcurrentqueue::set_aggression(float aggression)
{
this->aggression.store(std::clamp(aggression, 0.0f, 1.0f), std::memory_order_relaxed);
}

//--------------------------------------------------------------------------------------------------

std::optional<std::size_t> job_system_blockingcurrentqueue::worker_t::get_running_job() const
Expand All @@ -221,15 +236,45 @@ namespace tz::impl
{
job_info_t job;

bool found = false;
// some workers are disabled depending on the aggressiveness of the job system.
// aggressiveness 0 means 1 worker.
// aggressiveness 1.0 means all workers

const float aggro = this->get_aggression();
const float aggro_step = 1.0f / std::thread::hardware_concurrency();
if(worker.local_tid > 0)
{
if(worker.local_tid > aggro / aggro_step)
{
// naptime. unless we have an affine job
found = worker.affine_jobs.try_dequeue(job);
if(!found)
{
//tz::report("worker %zu is naptime coz aggression is only %.2f", worker.local_tid, aggro);
std::this_thread::sleep_for(std::chrono::milliseconds(5));
continue;
}
}
}

// lets try to retrieve an affine job, if thats empty then get a job from the global queue.
// this `if statement` could not happen if we hit the timeout without getting a job.
// in which case we simply recurse.

// note: on e.g windows sleeps suck asshole. 15-16ms is each quantum. moodycamel::concurrentqueue will spin for a maximum amount before actually sleeping - under which case we should expect to wait at least 16ms - too long.
// what we really want to do is spin for a certain amount of time, not until a maximum number of spins:
// spin for 10 micros.
// what we really want to do is spin for a certain amount of time, not until a maximum number of spins.
// however, how much we spin for *drastically* affects performance. spin for too short a time? you sleep almost instantly and kill perf when load is high
// spin too long? you're maxing out cpu resources even when the application isnt doing anything.
// the solution here is something PGO aligned.
// im going to clamp between a very low spin time and a very high spin time, depending on how many jobs have been requested this frame.
// this means if tons of jobs are requested, we spin for a long time to keep up.
// if very few are submitted, we chill way the fuck out.

// 10us is super tiny, will basically never catch anything.
// 2000us is incredibly long. highly likely to catch everything. will also definitely max out the cpu usage.
long long spin_duration = std::lerp(2, 2000, aggro);
auto deadline = std::chrono::steady_clock::now() + std::chrono::microseconds(2000);
bool found = false;
{
while(!found)
{
Expand Down
3 changes: 3 additions & 0 deletions src/tz/core/job/impl/concurrentqueue_blocking/job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace tz::impl
virtual std::size_t worker_count() const override;
virtual std::vector<worker_id_t> get_worker_ids() const override;
unsigned int jobs_started_this_frame() const;
float get_aggression() const;
void set_aggression(float aggression);
private:
struct job_info_t
{
Expand Down Expand Up @@ -64,6 +66,7 @@ namespace tz::impl
std::atomic<std::uint64_t> lifetime_jobs_created = 0u;
std::atomic<std::size_t> jobs_created_this_frame = 0u;
std::atomic<bool> close_requested = false;
std::atomic<float> aggression = 0.0f;
};
}

Expand Down
4 changes: 2 additions & 2 deletions src/tz/ren/animation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ namespace tz::ren
{
TZ_PROFZONE("animation_renderer - animation advance", 0xFFE54550);

std::size_t job_count = std::thread::hardware_concurrency();
std::size_t job_count = tz::job_system().worker_count();
std::size_t objects_per_job = this->animated_objects.size() / job_count;
std::size_t remainder_objects = this->animated_objects.size() % job_count;
tz::assert((objects_per_job * job_count) + remainder_objects == this->animated_objects.size());
Expand Down Expand Up @@ -685,7 +685,7 @@ namespace tz::ren
{
TZ_PROFZONE("load textures - execute jobs", 0xFF44DD44);
// we should split this into threads.
std::size_t job_count = std::thread::hardware_concurrency();
std::size_t job_count = tz::job_system().worker_count();
std::size_t imgs_per_job = gltf.data.get_images().size() / job_count;
std::size_t remainder_imgs = gltf.data.get_images().size() % job_count;
std::vector<tz::job_handle> jobs(job_count);
Expand Down

0 comments on commit a2ac882

Please sign in to comment.