Skip to content

Commit

Permalink
[job] the unshittening of the job system. use a windows syscall to dr…
Browse files Browse the repository at this point in the history
…astically reduce the sleep latency. by default its 16ms, which necessitated millions of stupid hacks to try to maintain a balance of responsiveness and minimising spinning. now there is no spinning really at all - job workers are free to go to sleep if concurrentqueue's minimal spinning doesnt catch anything. this is because we can be sure that they dont oversleep by a ridiculously long time - we expect 1-2ms, not 16. removed the concept of job system aggression, and factoring aggression into lua thread-local states. all simplified now.
  • Loading branch information
harrand committed Jan 30, 2024
1 parent 84b98a2 commit 045fdb2
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 75 deletions.
2 changes: 1 addition & 1 deletion cmake/platform.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
function(configure_windows target)
target_link_libraries(${target} PUBLIC OpenGL32 dwmapi)
target_link_libraries(${target} PUBLIC OpenGL32 dwmapi winmm)
endfunction()

function(configure_linux target)
Expand Down
78 changes: 14 additions & 64 deletions src/tz/core/job/impl/concurrentqueue_blocking/job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@
#include <iostream>
#include <syncstream>

// really need winapi here.
#ifdef _WIN32
#include "tz/wsi/impl/windows/detail/winapi.hpp"
#include <timeapi.h>
#endif

namespace tz::impl
{
job_system_blockingcurrentqueue::job_system_blockingcurrentqueue():
ptok(this->global_job_queue)
{
TZ_PROFZONE("job_system - initialise", 0xFFAA0000);
#ifdef _WIN32
timeBeginPeriod(1);
#endif
this->running_job_ids.reserve(128);
for(std::size_t i = 0; i < std::thread::hardware_concurrency() - 1; i++)
{
Expand All @@ -34,6 +43,9 @@ namespace tz::impl
{
worker.thread.join();
}
#ifdef _WIN32
timeEndPeriod(1);
#endif
tz::assert(!this->any_work_remaining());
}

Expand Down Expand Up @@ -175,7 +187,7 @@ namespace tz::impl

std::size_t job_system_blockingcurrentqueue::worker_count() const
{
return std::max(1u + static_cast<std::size_t>(this->thread_pool.size() * this->aggression), this->thread_pool.size());
return this->thread_pool.size();
}

//--------------------------------------------------------------------------------------------------
Expand All @@ -198,20 +210,6 @@ namespace tz::impl
return this->jobs_created_this_frame.load();
}

//--------------------------------------------------------------------------------------------------

float job_system_blockingcurrentqueue::get_aggression() const
{
return this->aggression.load();
}

//--------------------------------------------------------------------------------------------------

void job_system_blockingcurrentqueue::set_aggression(float aggression)
{
this->aggression.store(std::clamp(aggression, 0.0f, 1.0f), std::memory_order_relaxed);
}

//--------------------------------------------------------------------------------------------------

std::optional<std::size_t> job_system_blockingcurrentqueue::worker_t::get_running_job() const
Expand All @@ -236,59 +234,11 @@ namespace tz::impl
{
job_info_t job;

bool found = false;
// some workers are disabled depending on the aggressiveness of the job system.
// aggressiveness 0 means 1 worker.
// aggressiveness 1.0 means all workers

const float aggro = this->get_aggression();
const float aggro_step = 1.0f / std::thread::hardware_concurrency();
if(worker.local_tid > 0)
{
if(worker.local_tid > aggro / aggro_step)
{
// naptime. unless we have an affine job
found = worker.affine_jobs.try_dequeue(job);
if(!found)
{
//tz::report("worker %zu is naptime coz aggression is only %.2f", worker.local_tid, aggro);
std::this_thread::sleep_for(std::chrono::milliseconds(5));
continue;
}
}
}

// lets try to retrieve an affine job, if thats empty then get a job from the global queue.
// this `if statement` could not happen if we hit the timeout without getting a job.
// in which case we simply recurse.

// note: on e.g windows sleeps suck asshole. 15-16ms is each quantum. moodycamel::concurrentqueue will spin for a maximum amount before actually sleeping - under which case we should expect to wait at least 16ms - too long.
// what we really want to do is spin for a certain amount of time, not until a maximum number of spins.
// however, how much we spin for *drastically* affects performance. spin for too short a time? you sleep almost instantly and kill perf when load is high
// spin too long? you're maxing out cpu resources even when the application isnt doing anything.
// the solution here is something PGO aligned.
// im going to clamp between a very low spin time and a very high spin time, depending on how many jobs have been requested this frame.
// this means if tons of jobs are requested, we spin for a long time to keep up.
// if very few are submitted, we chill way the fuck out.

// 10us is super tiny, will basically never catch anything.
// 2000us is incredibly long. highly likely to catch everything. will also definitely max out the cpu usage.
long long spin_duration = std::lerp(2, 3000, aggro);
auto deadline = std::chrono::steady_clock::now() + std::chrono::microseconds(spin_duration);
{
while(!found)
{
if(std::chrono::steady_clock::now() >= deadline)
{
// 10 micros of spinning has passed, and nothing. time to take the big bullet and sleep.
TZ_PROFZONE("job worker - go to sleep", 0xFFAAFFEE);
found = this->global_job_queue.wait_dequeue_timed(worker.ctok.value(), job, queue_wait_timer_micros);
break;
}
found = worker.affine_jobs.try_dequeue(job) || this->global_job_queue.try_dequeue(worker.ctok.value(), job);
}
}
if(found)
if(worker.affine_jobs.try_dequeue(job) || this->global_job_queue.wait_dequeue_timed(worker.ctok.value(), job, queue_wait_timer_micros))
{
TZ_PROFZONE("job worker - do collected job", 0xFFAA0000);
// we have a job to do
Expand Down
3 changes: 0 additions & 3 deletions src/tz/core/job/impl/concurrentqueue_blocking/job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ namespace tz::impl
virtual std::size_t worker_count() const override;
virtual std::vector<worker_id_t> get_worker_ids() const override;
unsigned int jobs_started_this_frame() const;
float get_aggression() const;
void set_aggression(float aggression);
private:
struct job_info_t
{
Expand Down Expand Up @@ -66,7 +64,6 @@ namespace tz::impl
std::atomic<std::uint64_t> lifetime_jobs_created = 0u;
std::atomic<std::size_t> jobs_created_this_frame = 0u;
std::atomic<bool> close_requested = false;
std::atomic<float> aggression = 0.0f;
};
}

Expand Down
7 changes: 1 addition & 6 deletions src/tz/lua/state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,17 +569,12 @@ namespace tz::lua
return defstate;
}

void for_all_states(state_applicator fn, bool ignore_aggression)
void for_all_states(state_applicator fn)
{
// for each worker, execute a new job to register the function with the necessary affinity.
std::vector<tz::job_handle> handles = {};
const float aggro = tz::job_system().get_aggression();
for(tz::worker_id_t wid : tz::job_system().get_worker_ids())
{
if(!ignore_aggression && wid > aggro / (1.0f / std::thread::hardware_concurrency()))
{
continue;
}
handles.push_back(tz::job_system().execute([fn]()
{
fn(get_state());
Expand Down
2 changes: 1 addition & 1 deletion src/tz/lua/state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ namespace tz::lua
state& get_state();

using state_applicator = std::function<void(state&)>;
void for_all_states(state_applicator fn, bool ignore_aggression = true);
void for_all_states(state_applicator fn);

}

Expand Down

0 comments on commit 045fdb2

Please sign in to comment.