Skip to content

Commit

Permalink
xrCore: removed task finish callback from the task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Xottab-DUTY committed May 15, 2024
1 parent 85f7a8f commit 343d5cf
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 100 deletions.
28 changes: 0 additions & 28 deletions src/xrCore/Threading/ParallelFor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,6 @@ class ParallelForTask
return task;
}

// Doesn't wait until done
static decltype(auto) Run(const Range& range, bool wait, const Task::OnFinishFunc& callback, const Function& function)
{
TaskData taskData{ range, function };

auto& task = TaskManager::AddTask(callback, task_func, sizeof(TaskData), &taskData);
if (wait)
{
VERIFY2(TaskScheduler, "Task scheduler is not yet created. "
"You should explicitly state that you know this by setting 'wait' param to false.");
if (TaskScheduler)
TaskScheduler->Wait(task);
}
return task;
}

private:
static void task_func(Task& thisTask, void* data_ptr)
{
Expand Down Expand Up @@ -204,15 +188,3 @@ decltype(auto) xr_parallel_for(const Range& range, const Function& function)
return details::ParallelForTask<Range, Function>::Run(range, true, function);
}

template <typename Range, typename Function>
decltype(auto) xr_parallel_for(const Range& range, bool wait, const Task::OnFinishFunc& callback, const Function& function)
{
return details::ParallelForTask<Range, Function>::Run(range, wait, callback, function);
}

// Caller thread will wait on the task finish
template <typename Range, typename Function>
decltype(auto) xr_parallel_for(const Range& range, const Task::OnFinishFunc& callback, const Function& function)
{
return details::ParallelForTask<Range, Function>::Run(range, true, callback, function);
}
13 changes: 3 additions & 10 deletions src/xrCore/Threading/ParallelForEach.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ namespace details
class ParallelForEachTask
{
public:
template <typename Iterator, typename Function, typename ThirdArgument>
static decltype(auto) Run(Iterator begin, Iterator end, ThirdArgument thirdArgument, const Function& function)
template <typename Iterator, typename Function>
static decltype(auto) Run(Iterator begin, Iterator end, bool wait, const Function& function)
{
return xr_parallel_for(TaskRange(begin, end), thirdArgument, [&](TaskRange<Iterator>& range)
return xr_parallel_for(TaskRange(begin, end), wait, [&](TaskRange<Iterator>& range)
{
for (auto& it : range)
{
Expand All @@ -49,10 +49,3 @@ decltype(auto) xr_parallel_for_each(Range& range, const Function& function)
{
return details::ParallelForEachTask::Run(std::begin(range), std::end(range), true, function);
}

// User has a callback, he is responsible for waiting on the task finish (due to task management system limitation)
template <typename Range, typename Function>
decltype(auto) xr_parallel_for_each(Range& range, const Task::OnFinishFunc& callback, const Function& function)
{
return details::ParallelForEachTask::Run(std::begin(range), std::end(range), callback, function);
}
21 changes: 1 addition & 20 deletions src/xrCore/Threading/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
#include "Task.hpp"

Task::Data::Data(const TaskFunc& task, Task* parent)
: task_func(task), on_finish_callback(nullptr), parent(parent), jobs(1) {}

Task::Data::Data(const TaskFunc& task, const OnFinishFunc& onFinishCallback, Task* parent)
: task_func(task), on_finish_callback(onFinishCallback), parent(parent), jobs(1) {}
: task_func(task), parent(parent), jobs(1) {}

Task::Task(const TaskFunc& task, void* data, size_t dataSize, Task* parent /*= nullptr*/)
: m_data(task, parent)
Expand All @@ -33,23 +30,7 @@ Task::Task(const TaskFunc& task, void* data, size_t dataSize, Task* parent /*= n
}
}

Task::Task(const TaskFunc& task, const OnFinishFunc& onFinishCallback, void* data, size_t dataSize, Task* parent /*= nullptr*/)
: m_data(task, onFinishCallback, parent)
{
VERIFY2(dataSize <= sizeof(m_user_data), "Cannot fit your data in the task");
if (data && dataSize)
{
CopyMemory(m_user_data, data, std::min(dataSize, sizeof(m_user_data)));
}
}

void Task::Execute()
{
m_data.task_func(*this, m_user_data);
}

void Task::Finish()
{
if (m_data.on_finish_callback)
m_data.on_finish_callback(*this, m_user_data);
}
7 changes: 0 additions & 7 deletions src/xrCore/Threading/Task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,17 @@ class XRCORE_API Task final : Noncopyable

public:
using TaskFunc = fastdelegate::FastDelegate<void(Task&, void*)>;
using OnFinishFunc = fastdelegate::FastDelegate<void(const Task&, void*)>;

private:
// ordered from biggest to smallest
struct Data
{
TaskFunc task_func{};
OnFinishFunc on_finish_callback{};
Task* parent{};
std::atomic_int16_t jobs{}; // at least 1 (task itself), zero means task is done.

Data() = default;
Data(const TaskFunc& task, Task* parent);
Data(const TaskFunc& task, const OnFinishFunc& onFinishCallback, Task* parent);
} m_data;

static constexpr size_t USER_DATA_SIZE = TASK_SIZE - sizeof(m_data);
Expand All @@ -69,9 +66,6 @@ class XRCORE_API Task final : Noncopyable
// Will just execute
Task(const TaskFunc& task, void* data, size_t dataSize, Task* parent = nullptr);

// Will execute and call back
Task(const TaskFunc& task, const OnFinishFunc& onFinishCallback, void* data, size_t dataSize, Task* parent = nullptr);

public:
static constexpr size_t GetAvailableDataStorageSize()
{
Expand All @@ -96,5 +90,4 @@ class XRCORE_API Task final : Noncopyable
private:
// Called by TaskManager
void Execute();
void Finish();
};
31 changes: 1 addition & 30 deletions src/xrCore/Threading/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,12 @@ Task* TaskManager::TryToSteal() const
void TaskManager::ExecuteTask(Task& task)
{
task.Execute();
FinalizeTask(task);
}

void TaskManager::FinalizeTask(Task& task)
{
// Finalize
for (Task* it = &task; ; it = it->m_data.parent)
{
const auto unfinishedJobs = it->m_data.jobs.fetch_sub(1, std::memory_order_acq_rel) - 1; // fetch_sub returns previous value
VERIFY2(unfinishedJobs >= 0, "The same task was executed two times.");
it->Finish();
if (unfinishedJobs || !it->m_data.parent)
break;
}
Expand Down Expand Up @@ -361,51 +357,26 @@ Task& TaskManager::CreateTask(const Task::TaskFunc& taskFunc, size_t dataSize /*
return *new (AllocateTask()) Task(taskFunc, data, dataSize);
}

Task& TaskManager::CreateTask(const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize /*= 0*/, void* data /*= nullptr*/)
{
return *new (AllocateTask()) Task(taskFunc, onFinishCallback, data, dataSize);
}

Task& TaskManager::CreateTask(Task& parent, const Task::TaskFunc& taskFunc, size_t dataSize /*= 0*/, void* data /*= nullptr*/)
{
IncrementTaskJobsCounter(parent);
return *new (AllocateTask()) Task(taskFunc, data, dataSize, &parent);
}

Task& TaskManager::CreateTask(Task& parent, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize /*= 0*/, void* data /*= nullptr*/)
{
IncrementTaskJobsCounter(parent);
return *new (AllocateTask()) Task(taskFunc, onFinishCallback, data, dataSize, &parent);
}

Task& TaskManager::AddTask(const Task::TaskFunc& taskFunc, size_t dataSize /*= 0*/, void* data /*= nullptr*/)
{
auto& task = CreateTask(taskFunc, dataSize, data);
PushTask(task);
return task;
}

Task& TaskManager::AddTask(const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize /*= 0*/, void* data /*= nullptr*/)
{
auto& task = CreateTask(onFinishCallback, taskFunc, dataSize, data);
PushTask(task);
return task;
}

Task& TaskManager::AddTask(Task& parent, const Task::TaskFunc& taskFunc, size_t dataSize /*= 0*/, void* data /*= nullptr*/)
{
auto& task = CreateTask(parent, taskFunc, dataSize, data);
PushTask(task);
return task;
}

Task& TaskManager::AddTask(Task& parent, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize /*= 0*/, void* data /*= nullptr*/)
{
auto& task = CreateTask(parent, onFinishCallback, taskFunc, dataSize, data);
PushTask(task);
return task;
}

size_t TaskManager::GetWorkersCount() const
{
return workers.size();
Expand Down
5 changes: 0 additions & 5 deletions src/xrCore/Threading/TaskManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class XRCORE_API TaskManager final
[[nodiscard]] Task* TryToSteal() const;

static void ExecuteTask(Task& task);
static void FinalizeTask(Task& task);

[[nodiscard]] ICF static Task* AllocateTask();
static void ICF IncrementTaskJobsCounter(Task& parent);
Expand All @@ -57,11 +56,9 @@ class XRCORE_API TaskManager final
// TaskFunc is at the end for fancy in-place lambdas
// Create a task, but don't run it yet
[[nodiscard]] static Task& CreateTask(const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] static Task& CreateTask(const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);

// Create a task as child, but don't run it yet
[[nodiscard]] static Task& CreateTask(Task& parent, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] static Task& CreateTask(Task& parent, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);

// Run task in parallel
static void PushTask(Task& task);
Expand All @@ -71,11 +68,9 @@ class XRCORE_API TaskManager final

// Shortcut: create a task and run it immediately
static Task& AddTask(const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
static Task& AddTask(const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);

// Shortcut: create task and run it immediately
static Task& AddTask(Task& parent, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
static Task& AddTask(Task& parent, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);

public:
void RegisterThisThreadAsWorker();
Expand Down

0 comments on commit 343d5cf

Please sign in to comment.