From 045fdb276dd49fad279df7acf8f9219adfe8183a Mon Sep 17 00:00:00 2001 From: harrand Date: Tue, 30 Jan 2024 00:04:11 +0000 Subject: [PATCH] [job] the unshittening of the job system. use a windows syscall to drastically reduce the sleep latency. by default its 16ms, which necessitated millions of stupid hacks to try to maintain a balance of responsiveness and minimising spinning. now there is no spinning really at all - job workers are free to go to sleep if concurrentqueue's minimal spinning doesnt catch anything. this is because we can be sure that they dont oversleep by a ridiculously long time - we expect 1-2ms, not 16. removed the concept of job system aggression, and factoring aggression into lua thread-local states. all simplified now. --- cmake/platform.cmake | 2 +- .../job/impl/concurrentqueue_blocking/job.cpp | 78 ++++--------------- .../job/impl/concurrentqueue_blocking/job.hpp | 3 - src/tz/lua/state.cpp | 7 +- src/tz/lua/state.hpp | 2 +- 5 files changed, 17 insertions(+), 75 deletions(-) diff --git a/cmake/platform.cmake b/cmake/platform.cmake index 21f94be538..9400170f22 100644 --- a/cmake/platform.cmake +++ b/cmake/platform.cmake @@ -1,5 +1,5 @@ function(configure_windows target) - target_link_libraries(${target} PUBLIC OpenGL32 dwmapi) + target_link_libraries(${target} PUBLIC OpenGL32 dwmapi winmm) endfunction() function(configure_linux target) diff --git a/src/tz/core/job/impl/concurrentqueue_blocking/job.cpp b/src/tz/core/job/impl/concurrentqueue_blocking/job.cpp index feca70264d..47413d2bed 100644 --- a/src/tz/core/job/impl/concurrentqueue_blocking/job.cpp +++ b/src/tz/core/job/impl/concurrentqueue_blocking/job.cpp @@ -8,12 +8,21 @@ #include #include +// really need winapi here. +#ifdef _WIN32 +#include "tz/wsi/impl/windows/detail/winapi.hpp" +#include +#endif + namespace tz::impl { job_system_blockingcurrentqueue::job_system_blockingcurrentqueue(): ptok(this->global_job_queue) { TZ_PROFZONE("job_system - initialise", 0xFFAA0000); + #ifdef _WIN32 + timeBeginPeriod(1); + #endif this->running_job_ids.reserve(128); for(std::size_t i = 0; i < std::thread::hardware_concurrency() - 1; i++) { @@ -34,6 +43,9 @@ namespace tz::impl { worker.thread.join(); } + #ifdef _WIN32 + timeEndPeriod(1); + #endif tz::assert(!this->any_work_remaining()); } @@ -175,7 +187,7 @@ namespace tz::impl std::size_t job_system_blockingcurrentqueue::worker_count() const { - return std::max(1u + static_cast(this->thread_pool.size() * this->aggression), this->thread_pool.size()); + return this->thread_pool.size(); } //-------------------------------------------------------------------------------------------------- @@ -198,20 +210,6 @@ namespace tz::impl return this->jobs_created_this_frame.load(); } -//-------------------------------------------------------------------------------------------------- - - float job_system_blockingcurrentqueue::get_aggression() const - { - return this->aggression.load(); - } - -//-------------------------------------------------------------------------------------------------- - - void job_system_blockingcurrentqueue::set_aggression(float aggression) - { - this->aggression.store(std::clamp(aggression, 0.0f, 1.0f), std::memory_order_relaxed); - } - //-------------------------------------------------------------------------------------------------- std::optional job_system_blockingcurrentqueue::worker_t::get_running_job() const @@ -236,59 +234,11 @@ namespace tz::impl { job_info_t job; - bool found = false; - // some workers are disabled depending on the aggressiveness of the job system. - // aggressiveness 0 means 1 worker. - // aggressiveness 1.0 means all workers - - const float aggro = this->get_aggression(); - const float aggro_step = 1.0f / std::thread::hardware_concurrency(); - if(worker.local_tid > 0) - { - if(worker.local_tid > aggro / aggro_step) - { - // naptime. unless we have an affine job - found = worker.affine_jobs.try_dequeue(job); - if(!found) - { - //tz::report("worker %zu is naptime coz aggression is only %.2f", worker.local_tid, aggro); - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - continue; - } - } - } - // lets try to retrieve an affine job, if thats empty then get a job from the global queue. // this `if statement` could not happen if we hit the timeout without getting a job. // in which case we simply recurse. - // note: on e.g windows sleeps suck asshole. 15-16ms is each quantum. moodycamel::concurrentqueue will spin for a maximum amount before actually sleeping - under which case we should expect to wait at least 16ms - too long. - // what we really want to do is spin for a certain amount of time, not until a maximum number of spins. - // however, how much we spin for *drastically* affects performance. spin for too short a time? you sleep almost instantly and kill perf when load is high - // spin too long? you're maxing out cpu resources even when the application isnt doing anything. - // the solution here is something PGO aligned. - // im going to clamp between a very low spin time and a very high spin time, depending on how many jobs have been requested this frame. - // this means if tons of jobs are requested, we spin for a long time to keep up. - // if very few are submitted, we chill way the fuck out. - - // 10us is super tiny, will basically never catch anything. - // 2000us is incredibly long. highly likely to catch everything. will also definitely max out the cpu usage. - long long spin_duration = std::lerp(2, 3000, aggro); - auto deadline = std::chrono::steady_clock::now() + std::chrono::microseconds(spin_duration); - { - while(!found) - { - if(std::chrono::steady_clock::now() >= deadline) - { - // 10 micros of spinning has passed, and nothing. time to take the big bullet and sleep. - TZ_PROFZONE("job worker - go to sleep", 0xFFAAFFEE); - found = this->global_job_queue.wait_dequeue_timed(worker.ctok.value(), job, queue_wait_timer_micros); - break; - } - found = worker.affine_jobs.try_dequeue(job) || this->global_job_queue.try_dequeue(worker.ctok.value(), job); - } - } - if(found) + if(worker.affine_jobs.try_dequeue(job) || this->global_job_queue.wait_dequeue_timed(worker.ctok.value(), job, queue_wait_timer_micros)) { TZ_PROFZONE("job worker - do collected job", 0xFFAA0000); // we have a job to do diff --git a/src/tz/core/job/impl/concurrentqueue_blocking/job.hpp b/src/tz/core/job/impl/concurrentqueue_blocking/job.hpp index 7176e7ddfa..e1648561f1 100644 --- a/src/tz/core/job/impl/concurrentqueue_blocking/job.hpp +++ b/src/tz/core/job/impl/concurrentqueue_blocking/job.hpp @@ -31,8 +31,6 @@ namespace tz::impl virtual std::size_t worker_count() const override; virtual std::vector get_worker_ids() const override; unsigned int jobs_started_this_frame() const; - float get_aggression() const; - void set_aggression(float aggression); private: struct job_info_t { @@ -66,7 +64,6 @@ namespace tz::impl std::atomic lifetime_jobs_created = 0u; std::atomic jobs_created_this_frame = 0u; std::atomic close_requested = false; - std::atomic aggression = 0.0f; }; } diff --git a/src/tz/lua/state.cpp b/src/tz/lua/state.cpp index 9df3ee8cbd..8e7431d9b6 100644 --- a/src/tz/lua/state.cpp +++ b/src/tz/lua/state.cpp @@ -569,17 +569,12 @@ namespace tz::lua return defstate; } - void for_all_states(state_applicator fn, bool ignore_aggression) + void for_all_states(state_applicator fn) { // for each worker, execute a new job to register the function with the necessary affinity. std::vector handles = {}; - const float aggro = tz::job_system().get_aggression(); for(tz::worker_id_t wid : tz::job_system().get_worker_ids()) { - if(!ignore_aggression && wid > aggro / (1.0f / std::thread::hardware_concurrency())) - { - continue; - } handles.push_back(tz::job_system().execute([fn]() { fn(get_state()); diff --git a/src/tz/lua/state.hpp b/src/tz/lua/state.hpp index 2fb6143398..90cf113d3c 100644 --- a/src/tz/lua/state.hpp +++ b/src/tz/lua/state.hpp @@ -148,7 +148,7 @@ namespace tz::lua state& get_state(); using state_applicator = std::function; - void for_all_states(state_applicator fn, bool ignore_aggression = true); + void for_all_states(state_applicator fn); }