Skip to content

Commit

Permalink
Fixed possible spin lock on scheduler thread. Added fixed sample rate…
Browse files Browse the repository at this point in the history
… processing.
  • Loading branch information
StefanoLusardi committed Feb 17, 2021
1 parent c4bc949 commit 16e3084
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ target_include_directories(ssts INTERFACE
option(SSTS_ENABLE_CODE_COVERAGE "Enable code coverage (requires installing library)" OFF)
option(SSTS_BUILD_DOCS "Build library documentation" OFF)
option(SSTS_BUILD_TESTS "Build library tests" ON)
option(SSTS_BUILD_EXAMPLES "Build library examples" OFF)
option(SSTS_BUILD_EXAMPLES "Build library examples" ON)
option(SSTS_INSTALL_LIBRARY "Install library" OFF)
option(SSTS_INSTALL_EXAMPLES "Install examples (requires installing library and building examples)" OFF)
option(SSTS_ENABLE_SANITIZERS "Run unit tests with Thread Sanitizer support" OFF)
Expand Down
6 changes: 4 additions & 2 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ function(add_example EXAMPLE_TARGET)
endif()
endfunction()

# add_example(example_pool)
# add_example(example_test)
add_example(example_duplicate)
add_example(example_pool)
add_example(example_test)
add_example(example_recursive)

add_example(example_scheduler)
add_example(example_nested)
Expand Down
101 changes: 101 additions & 0 deletions examples/example_duplicate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#include <ssts/task_scheduler.hpp>

#include "utils/utils.hpp"

void main_loop()
{
ssts::utils::log(ssts::version());

ssts::task_scheduler s(8);
s.set_duplicate_allowed(true);

ssts::utils::timer t;

/**
* A: Loop Time > Processing Time
*
* Task A is looping slower (1 second) than its processing time (400 millisseconds).
* The execution rate is once per second even if the scheduler could satisfy smaller loop time (up to 400 milliseconds).
*/
std::string idA = " ==:A:== ";
std::atomic_int n_task_runA = 0;
s.every(std::string(idA), 1s, [idA, &n_task_runA]
{
std::cout << idA << std::endl;
std::this_thread::sleep_for(400ms);
++n_task_runA;
});

/**
* B: Loop Time < Processing Time
*
* Task B is looping faster (10 milliseconds) than its processing time (5 seconds).
* The execution is scheduled only when previous task is completed, so once every second.
*/
std::string idB = " ==:B:== ";
std::atomic_int n_task_runB = 0;
s.every(std::string(idB), 1000ms, [idB, &n_task_runB]
{
std::cout << idB << std::endl;
std::this_thread::sleep_for(5s);
++n_task_runB;
});

std::string idC = " ==:C:== ";
std::atomic_int n_task_runC = 0;
s.every(std::string(idC), 250ms, [idC, &n_task_runC]
{
std::cout << idC << std::endl;
std::this_thread::sleep_for(5ms);
++n_task_runC;
});

std::string idD = " ==:D:== ";
std::atomic_int n_task_runD = 0;
s.every(std::string(idD), 250ms, [idD, &n_task_runD]
{
std::cout << idD << std::endl;
std::this_thread::sleep_for(400ms);
++n_task_runD;
});

// if(!s.set_enabled(idD, false))
// std::cout << "ERROR: set_enabled(idD, false) failed" << std::endl;

// if(s.is_enabled(idD))
// std::cout << "ERROR: is_enabled(idD) failed" << std::endl;

// std::this_thread::sleep_for(2s);
// if(!s.set_enabled(idD, true))
// std::cout << "ERROR: set_enabled(idD, true) failed" << std::endl;

// if(!s.is_enabled(idD))
// std::cout << "ERROR: is_enabled(idD) failed" << std::endl;

// if(!s.is_scheduled(idA)) std::cout << "ERROR: is_scheduled(idA) failed" << std::endl;
// if(!s.is_scheduled(idB)) std::cout << "ERROR: is_scheduled(idB) failed" << std::endl;
// if(!s.is_scheduled(idC)) std::cout << "ERROR: is_scheduled(idC) failed" << std::endl;
// if(!s.is_scheduled(idD)) std::cout << "ERROR: is_scheduled(idD) failed" << std::endl;
// if(s.size() != 4) std::cout << "ERROR: size() != 4" << std::endl;

std::this_thread::sleep_for(10s);

std::cout << "task size: " << s.size() << std::endl;
s.stop();

std::cout << "\n\n" << std::endl;
std::cout << idA << "tasks run: " << n_task_runA << std::endl;
std::cout << idB << "tasks run: " << n_task_runB << std::endl;
std::cout << idC << "tasks run: " << n_task_runC << std::endl;
std::cout << idD << "tasks run: " << n_task_runD << std::endl;

ssts::utils::log_test("Scheduler finished succesfully!");
}

int main()
{
// for(int i=0; i<1'000; ++i)
main_loop();

return 0;
}
33 changes: 33 additions & 0 deletions examples/example_recursive.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <ssts/task_scheduler.hpp>
#include "utils/utils.hpp"

ssts::task_scheduler s(4);

void t_recursive();

void recursive(const std::string& str)
{
std::cout << str << std::endl;
t_recursive();
}

void t_recursive()
{
s.in("RecursiveTask", 1s, std::bind(&recursive, "I'm a Recursive Task!"));
// while(!s.is_scheduled("RecursiveTask"))
// std::this_thread::sleep_for(10ms);

// std::cout << std::boolalpha << s.is_scheduled("RecursiveTask") << std::endl;
}

int main()
{
ssts::utils::log(ssts::version());
ssts::utils::timer t;

t_recursive();

std::this_thread::sleep_for(20s);
ssts::utils::log("Task Scheduler finished");
return 0;
}
47 changes: 42 additions & 5 deletions include/ssts/task_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <thread>
#include <future>
#include <condition_variable>
#include <unordered_set>

#include "task.hpp"

Expand All @@ -34,6 +35,7 @@ class task_pool
*/
explicit task_pool(const unsigned int num_threads = std::thread::hardware_concurrency())
: _is_running{ true }
, _is_duplicate_allowed{ true }
{
const auto thread_count = std::clamp(num_threads, 1u, std::thread::hardware_concurrency());
_threads.reserve(thread_count);
Expand Down Expand Up @@ -100,27 +102,39 @@ class task_pool
* Returns the result of the asynchronous computation.
*/
template<typename FunctionType>
auto run(FunctionType&& f)
auto run(FunctionType&& f, const std::optional<size_t>& task_hash = std::nullopt)
{
using result_type = std::invoke_result_t<std::decay_t<FunctionType>>;

std::packaged_task<result_type()> task(std::forward<FunctionType>(f));
std::future<result_type> future = task.get_future();

std::unique_lock lock(_task_mtx);
_task_queue.emplace(std::move(task));

if(!_is_duplicate_allowed && is_already_running(task_hash))
return future;

auto hash = task_hash.value_or(0);
_task_queue.emplace(hash, std::move(task));
lock.unlock();
_task_cv.notify_one();

return future;
}

void set_duplicate_allowed(bool is_allowed)
{
_is_duplicate_allowed = is_allowed;
}

private:
std::atomic_bool _is_running;
std::atomic_bool _is_duplicate_allowed;
std::vector<std::thread> _threads;
std::queue<ssts::task> _task_queue;
std::queue<std::pair<size_t, ssts::task>> _task_queue;
std::unordered_set<size_t> _active_hash_set;
std::condition_variable _task_cv;
std::mutex _task_mtx;
std::mutex _hash_mtx;

void worker_thread()
{
Expand All @@ -132,13 +146,36 @@ class task_pool
if (!_is_running)
return;

auto task = std::move(_task_queue.front());
auto task = std::move(_task_queue.front().second);
auto hash = _task_queue.front().first;

if(!_is_duplicate_allowed)
{
std::scoped_lock hash_lock(_hash_mtx);
_active_hash_set.insert(hash);
}

_task_queue.pop();

lock.unlock();
task();

if(!_is_duplicate_allowed)
{
std::scoped_lock hash_lock(_hash_mtx);
if (_active_hash_set.find(hash) != _active_hash_set.end())
_active_hash_set.erase(hash);
}
}
}

bool is_already_running(const std::optional<size_t>& opt_hash)
{
if (!opt_hash.has_value())
return false;

return _active_hash_set.find(opt_hash.value()) != _active_hash_set.end();
}
};

}
77 changes: 67 additions & 10 deletions include/ssts/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,26 @@ class task_scheduler
explicit task_scheduler(const unsigned int num_threads = std::thread::hardware_concurrency())
: _tp{num_threads}
, _is_running{true}
, _is_duplicate_allowed{ false }
, _is_duplicate_allowed{ true }
{
_scheduler_thread = std::thread([this] {
while (_is_running)
{
std::this_thread::yield();
std::unique_lock lock(_update_tasks_mtx);

if (_tasks.empty())
{
_update_tasks_cv.wait(lock, [this] { return !_tasks.empty() || !_is_running; });
// _update_tasks_cv.wait(lock, [this] { return !_is_running || (!_tasks.empty() && ssts::clock::now() >= _tasks.begin()->first); });
_update_tasks_cv.wait(lock, [this] { return !_is_running || !_tasks.empty(); });
}
else
{
_update_tasks_cv.wait_until(lock, _tasks.begin()->first, [this] { return !_is_running || ssts::clock::now() >= _tasks.begin()->first; });
if (!_update_tasks_cv.wait_until(
lock, _tasks.begin()->first, [this] { return !_is_running || (!_tasks.empty() && ssts::clock::now() >= _tasks.begin()->first); }))
{
std::this_thread::yield();
}
}

if (!_is_running)
Expand Down Expand Up @@ -220,6 +226,7 @@ class task_scheduler
void set_duplicate_allowed(bool is_allowed)
{
_is_duplicate_allowed = is_allowed;
_tp.set_duplicate_allowed(is_allowed);
}

/*!
Expand Down Expand Up @@ -465,6 +472,41 @@ class task_scheduler
_update_tasks_cv.notify_one();
}

/*
void update_tasks()
{
// All the tasks whose start time is before ssts::clock::now(),
// (which are the ones from _tasks.begin(), up to last_task_to_process)
// can be enqueued in the TaskPool.
auto last_task_to_process = _tasks.upper_bound(ssts::clock::now());
for (auto it = _tasks.begin(); it != last_task_to_process; it++)
{
_tp.run([t = it->second.clone(), start_time = it->first, this]
{
if(t->is_enabled())
t->invoke();
if (t->interval().has_value())
{
// Make sure that next_start_time is greater than ssts::clock::now(),
// otherwise the task is scheduled in the past.
// Increment next_start_time starting from current start_time with a step equal to t->interval().value()
// in order to keep the scheduling with a fixed sample rate.
it->second.set_enabled(false);
auto interval = it->second.interval().value();
auto next_start_time = it->first + interval;
while(ssts::clock::now() > next_start_time)
next_start_time += interval;
add_task(std::move(next_start_time), std::move(*t));
}
});
}
// Erase tasks already processed.
_tasks.erase(_tasks.begin(), last_task_to_process);
}
*/

void update_tasks()
{
decltype(_tasks) recursive_tasks;
Expand All @@ -474,19 +516,34 @@ class task_scheduler
// can be enqueued in the TaskPool.
auto last_task_to_process = _tasks.upper_bound(ssts::clock::now());
for (auto it = _tasks.begin(); it != last_task_to_process; it++)
{
// Add task to the TaskPool if enabled to run.
if (it->second.is_enabled())
_tp.run([t = it->second.clone(), this] { t->invoke(); });

{
if(it->second.is_enabled())
{
_tp.run([t = it->second.clone()]
{
t->invoke();
}, it->second.hash());
}

// Keep track of recursive tasks if task has a valid interval value.
if (it->second.interval().has_value())
recursive_tasks.emplace(ssts::clock::now() + it->second.interval().value(), std::move(it->second));
{
// Make sure that next_start_time is greater than ssts::clock::now(),
// otherwise the task is scheduled in the past.
// Increment next_start_time starting from current start_time with a step equal to t->interval().value()
// in order to keep the scheduling with a fixed sample rate.
auto interval = it->second.interval().value();
auto next_start_time = it->first + interval;
while(ssts::clock::now() > next_start_time)
next_start_time += interval;

recursive_tasks.emplace(std::move(next_start_time), std::move(it->second));
}
}

// Erase tasks already processed.
_tasks.erase(_tasks.begin(), last_task_to_process);

// Re-schedule recursive tasks.
_tasks.merge(recursive_tasks);
}
Expand Down

0 comments on commit 16e3084

Please sign in to comment.