Skip to content

Commit

Permalink
[free_list] fixed a silly bug. [job_system] added some spinlocking in…
Browse files Browse the repository at this point in the history
… job worker threads. this is because moodycamel::concurrentqueue only spins for a very small amount of time before ultimately giving up and doing a proper sleep. a windows sleep timeslice will take at least 15-16ms, which is the entire frame boundary. therefore, its not really reasonable to spin. however, if you spin forever and never sleep, you waste all cpu resources. the new impl is basically trying to be a compromise
  • Loading branch information
harrand committed Jan 29, 2024
1 parent 80b70b2 commit 1e2da79
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/tz/core/data/free_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace tz
while(this->internal_handle > 0 && this->l->is_in_free_list(--this->internal_handle));
return *this;
}
free_list_iterator operator++(int){auto tmp = *this; --tmp; return tmp;}
free_list_iterator operator++(int){auto tmp = *this; ++tmp; return tmp;}
free_list_iterator operator--(int){auto tmp = *this; --tmp; return tmp;}
free_list_iterator operator+(const std::integral auto dst) const{auto old = *this; return old += dst;}
free_list_iterator operator-(const std::integral auto dst) const{auto old = *this; return old -= dst;}
Expand Down
31 changes: 27 additions & 4 deletions src/tz/core/job/impl/concurrentqueue_blocking/job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

namespace tz::impl
{
job_system_blockingcurrentqueue::job_system_blockingcurrentqueue()
job_system_blockingcurrentqueue::job_system_blockingcurrentqueue():
ptok(this->global_job_queue)
{
TZ_PROFZONE("job_system - initialise", 0xFFAA0000);
this->running_job_ids.reserve(128);
Expand All @@ -18,6 +19,7 @@ namespace tz::impl
auto& worker = this->thread_pool.emplace_back();
worker.thread = std::thread([this, i](){this->worker_thread_entrypoint(i);});
worker.local_tid = i;
worker.ctok = moodycamel::ConsumerToken{this->global_job_queue};
}
}

Expand Down Expand Up @@ -58,6 +60,7 @@ namespace tz::impl
}
if(einfo.maybe_worker_affinity.has_value())
{
TZ_PROFZONE("execute - affine job enqueue", 0xFFAA8888);
// add to list of affine jobs instead.
auto val = einfo.maybe_worker_affinity.value();
tz::assert(this->thread_pool[val].local_tid == val);
Expand All @@ -67,7 +70,8 @@ namespace tz::impl
}
else
{
this->global_job_queue.enqueue(jinfo);
TZ_PROFZONE("execute - global job enqueue", 0xFFAA8888);
this->global_job_queue.enqueue(this->ptok, jinfo);
}
//std::osyncstream(std::cout) << "added new job " << jinfo.job_id << "\n";
return ret;
Expand Down Expand Up @@ -212,15 +216,34 @@ namespace tz::impl
std::string thread_name = "Topaz Job Thread " + std::to_string(local_tid);
TZ_THREAD(thread_name.c_str());
worker_t& worker = this->thread_pool[local_tid];
constexpr std::int64_t queue_wait_timer_micros = 1000; // 1 millis
constexpr std::int64_t queue_wait_timer_micros = 1000; // 1 millis. this is the minimum amount of time we're willing to wait if our spins yield nothing. in practice it will wait way longer.
while(!this->close_requested.load())
{
job_info_t job;

// lets try to retrieve an affine job, if thats empty then get a job from the global queue.
// this `if statement` could not happen if we hit the timeout without getting a job.
// in which case we simply recurse.
if(worker.affine_jobs.try_dequeue(job) || this->global_job_queue.wait_dequeue_timed(job, queue_wait_timer_micros))

// note: on e.g windows sleeps suck asshole. 15-16ms is each quantum. moodycamel::concurrentqueue will spin for a maximum amount before actually sleeping - under which case we should expect to wait at least 16ms - too long.
// what we really want to do is spin for a certain amount of time, not until a maximum number of spins:
// spin for 10 micros.
auto deadline = std::chrono::steady_clock::now() + std::chrono::microseconds(2000);
bool found = false;
{
while(!found)
{
if(std::chrono::steady_clock::now() >= deadline)
{
// 10 micros of spinning has passed, and nothing. time to take the big bullet and sleep.
TZ_PROFZONE("job worker - go to sleep", 0xFFAAFFEE);
found = this->global_job_queue.wait_dequeue_timed(worker.ctok.value(), job, queue_wait_timer_micros);
break;
}
found = worker.affine_jobs.try_dequeue(job) || this->global_job_queue.try_dequeue(worker.ctok.value(), job);
}
}
if(found)
{
TZ_PROFZONE("job worker - do collected job", 0xFFAA0000);
// we have a job to do
Expand Down
2 changes: 2 additions & 0 deletions src/tz/core/job/impl/concurrentqueue_blocking/job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace tz::impl
std::size_t local_tid;
std::atomic<std::size_t> currently_running_job_id = std::numeric_limits<std::size_t>::max();
moodycamel::ConcurrentQueue<job_info_t> affine_jobs;
std::optional<moodycamel::ConsumerToken> ctok = std::nullopt;
std::vector<std::size_t> completed_job_cache;
std::optional<std::size_t> get_running_job() const;
void clear_job_cache(job_system_blockingcurrentqueue& js);
Expand All @@ -48,6 +49,7 @@ namespace tz::impl

std::deque<worker_t> thread_pool;
moodycamel::BlockingConcurrentQueue<job_info_t> global_job_queue;
moodycamel::ProducerToken ptok;
mutable std::mutex done_job_list_mutex;
std::vector<std::size_t> running_job_ids = {};
mutable std::mutex wake_me_on_a_job_done_mutex;
Expand Down

0 comments on commit 1e2da79

Please sign in to comment.