Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rejecting a task and adding it back to the queue? #105

Open
BrodyHiggerson opened this issue Nov 21, 2023 · 8 comments
Open

Rejecting a task and adding it back to the queue? #105

BrodyHiggerson opened this issue Nov 21, 2023 · 8 comments
Labels

Comments

@BrodyHiggerson
Copy link

Another API / design creator blessing question for you.

In my existing task system, there is the concept of inter-task dependencies same as in enkiTS, but I also support read/write dependency declarations, where the task processing algorithm first attempts to satisfy the dependencies, and then all jobs that have task dependencies solved are put into a queue where they 'fight' to grab their read/write deps. It's not possible to know in advance which will win and imo it's not practical or worth trying to 'solve' that entire tree, instead we let it happen at runtime.

An example, called regularly, e.g. when a task completes and pulls in any dependent tasks that are unblocked. It's messy but more of an illustration.

void TickGraph::ReadyContendingJobsIfPossible()
{
// lock here ...
	static thread_local std::vector<TickJob*> contendeds;
	contendeds.reserve(128);

	{
		TickJob* contendingJob;
		while (contendingTickQueue_.try_pop(contendingJob))
		{
			if (impl_->typeDepTracker_.TryPushNode(contendingJob->node_)) // check against read/write dependencies
			{
				ExecuteJob(contendingJob);
			}
			else
			{
				contendeds.push_back(contendingJob); // we failed, so prepare to put the job back into the contending queue
			}
		}
	}

	for (TickJob* contended : contendeds)
	{
		MoveToContending(contended); // put it back in for next time
	}
	contendeds.clear();

// unlock here ...
}

I'm not trying to replicate this exactly but just giving context. I could imagine maybe in TaskScheduler::TryRunTask, in the block when bHaveTask is true, I add a call to some optional callback that checks if the task is actually allowed (defaulting to true), and if not, it calls AddTaskSetToPipe to put it back into circulation and early-outs. Not sure how I feel about the task going back into the end of the pipe, but maybe that's fine. Would also need to figure out a way to do it for Pinned tasks too.

Interested in any insights you might have from a canonical design/API POV, and I may have missed something that would help.

@dougbinks
Copy link
Owner

I have been considering adding dynamic dependencies which would support data access dependencies to enkiTS, but have not yet done so.

A simpler change which I have also been considering is the idea of allowing tasks to delay themselves, which for tasksets would allow a given range (within the calling range) to be delayed (put back in the task queue). This would allow your tasks to perform their checks and delay if unfulfilled.

@BrodyHiggerson
Copy link
Author

BrodyHiggerson commented Nov 21, 2023 via email

@dougbinks
Copy link
Owner

Could you elaborate on the "given range" being delayed? As in the entire
task set itself?

So ITaskSet is executed (potentially on many threads) with a given TaskSetPartition range_. The idea is that when in ExecuteRange a TaskSet could call Delay( TaskSetPartition delayRange_ ) with delayRange_.start >= range_.start and delayRange_.end <= range_.end (with delayRange_.start < delayRange_.end ).

This could be used in your approach to put the entire range back on the task queue.

For pinned tasks this is simpler as the whole task can be delayed.

Also, as a stopgap I'll have to implement something quick and dirty to
unblock, even if it lives in the darkness of a fork. Does the point in the
flow I suggested make sense?

I'm not sure - enkiTS tries to guarantee that it is lock-free (which helps ensure no single task thread blocks all other task threads). This makes the design harder.

My current idea was to implement this in terms of a type of Dependency which the user could derive from. I've not put enough thought into exactly how this would work, and whether it is possible to make lock free.

@BrodyHiggerson
Copy link
Author

BrodyHiggerson commented Nov 21, 2023 via email

@BrodyHiggerson
Copy link
Author

Here is my first naive attempt, wasn't sure if it made sense to bother adding back to the same thread we stole from or not, or if this even makes sense: 559a196

But in my mind, I'm only using a taskset to represent either a single logical task or a ParallelFor, and thus in both cases I only care about rejecting the entire set wholesale. In the above code, I had to reject each subtask until the overall set's interface says it can execute. Not sure if that makes sense.

@dougbinks
Copy link
Owner

dougbinks commented Nov 22, 2023

My plan was to add this functionality via dynamic dependencies, in which tasks which couldn't run aren't added to the task queue until they are able to run.

Some comments on your implementation:

ITaskSet implementation

  1. The tasks pipes m_pPipesPerThread can only be written to by the owning thread, so you cannot use threadReadFrom, but must always use threadNum_ to write to.
  2. returning false from TryRunTask should not occur unless no runnable tasks were found. So the CanRun test needs to be after each bHaveTask check, setting bHaveTask false rather than exiting.
  3. A potential infinite loop or stuck queue could occur if the task was found from the current thread's pipe, as it could be placed on the task queue and then retrieved again. Instead, we need to grab the next head task before replacing the current task. However this could also be a task which can't run, in which case we need to go through the tasks in the queue until we find one which can run. This is tricky without creating a new queue for unrunnable tasks (enkiTS does no allocation outside of initialization).
  4. Finding only an un-runnable task could stall enkiTS, as the task threads could sleep and then there will be no new tasks to wake up the threads. So we need a signal to enkiTS that tasks can run.
  5. When task stealing (reading from another threads queue), un-runnable tasks are placed on the current threads queue. This could result in the queue being full, which needs to be handled somehow or that task thread will be stalled.
  6. I would likely add a set of flags to the ITaskSet interface to denote that a given task uses the CanRun interface, though first I would check the performance degradation.
  7. Question: should a task which has already run some of it's range use the CanRun check, or should it implicitly be in a runnable state?
  8. How does CanRun lock any resource if it returns true, and how is that resource then unlocked? How does that work with split tasks (ones with m_SetSize > 1)?

IPinnedTask implementation

  1. I think the current loop could run infinitely. Fixing this will require going through the entire list until we hit the first task we couldn't run I think.
  2. We also need to wake any enkiTS tasks threads waiting for pinned tasks with WaitForNewPinnedTasks when a pinned task can now run.

@dougbinks
Copy link
Owner

Thanks for this initial implementation by the way, it's a good start towards such an API and has helped uncover some of the issues with delaying / not running tasks.

@dougbinks
Copy link
Owner

After some thought I have an idea for how this could work, and will try to develop solution on a branch in the near future. I'll ping this thread once I have a working prototype.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants