11using System . Collections . Concurrent ;
22using Flowthru . Core . Flows ;
3+ using Flowthru . Core . Graph . Scheduling ;
34using Microsoft . Extensions . Logging ;
45
56namespace Flowthru . Core . Graph ;
@@ -29,6 +30,7 @@ internal sealed class TaskGraphExecutor
2930{
3031 private readonly IReadOnlyList < FlowStep > _steps ;
3132 private readonly int _maxDegreeOfParallelism ;
33+ private readonly ISchedulingStrategy _strategy ;
3234 private readonly ILogger ? _logger ;
3335 private readonly Func < FlowStep , CancellationToken , Task < StepResult > > _executeStep ;
3436
@@ -42,11 +44,13 @@ internal sealed class TaskGraphExecutor
4244 /// Pass <see cref="int.MaxValue"/> for unbounded parallelism.
4345 /// </param>
4446 /// <param name="executeStep">Per-step execution delegate (matches <c>ExecuteStepWithTrackingAsync</c>).</param>
47+ /// <param name="strategy">Priority strategy used to order ready steps on each dispatch cycle.</param>
4548 /// <param name="logger">Optional logger.</param>
4649 internal TaskGraphExecutor (
4750 IReadOnlyList < FlowStep > steps ,
4851 int maxDegreeOfParallelism ,
4952 Func < FlowStep , CancellationToken , Task < StepResult > > executeStep ,
53+ ISchedulingStrategy strategy ,
5054 ILogger ? logger = null
5155 )
5256 {
@@ -61,6 +65,7 @@ internal TaskGraphExecutor(
6165 _steps = steps ;
6266 _maxDegreeOfParallelism = maxDegreeOfParallelism == - 1 ? int . MaxValue : maxDegreeOfParallelism ;
6367 _executeStep = executeStep ;
68+ _strategy = strategy ;
6469 _logger = logger ;
6570 }
6671
@@ -96,7 +101,10 @@ CancellationToken cancellationToken
96101 ) ;
97102
98103 // Reverse adjacency: for each step, which steps depend on it?
99- var dependents = _steps . ToDictionary ( s => s , _ => new List < FlowStep > ( ) ) ;
104+ var dependents = _steps . ToDictionary (
105+ s => s ,
106+ s => ( IReadOnlyList < FlowStep > ) new List < FlowStep > ( )
107+ ) ;
100108 foreach ( var step in _steps )
101109 {
102110 foreach ( var dep in step . Dependencies )
@@ -105,14 +113,17 @@ CancellationToken cancellationToken
105113 // steps that are in _steps (sliced or full set).
106114 if ( dependents . TryGetValue ( dep , out var list ) )
107115 {
108- list . Add ( step ) ;
116+ ( ( List < FlowStep > ) list ) . Add ( step ) ;
109117 }
110118 }
111119 }
112120
113- // Steps whose upstream is fully satisfied (or has no dependencies).
114- // Channel is unbounded write / bounded dispatch (controlled by the semaphore).
115- var readyQueue = new ConcurrentQueue < FlowStep > ( _steps . Where ( s => s . Dependencies . Count == 0 ) ) ;
121+ var schedulingContext = new SchedulingContext ( dependents ) ;
122+
123+ // Newly-ready steps are added here by concurrent task completions; a ConcurrentBag
124+ // is safe for multi-producer, single-consumer access. On each dispatch cycle the
125+ // main loop drains it into a List and passes it to the strategy for ordering.
126+ var readyBag = new ConcurrentBag < FlowStep > ( _steps . Where ( s => s . Dependencies . Count == 0 ) ) ;
116127
117128 // Tracks which steps were skipped because an upstream dependency failed.
118129 var skipped = new HashSet < FlowStep > ( ) ;
@@ -139,7 +150,16 @@ CancellationToken cancellationToken
139150 while ( results . Count + skipped . Count < totalSteps )
140151 {
141152 // Drain all currently runnable steps into in-flight tasks.
142- while ( readyQueue . TryDequeue ( out var step ) )
153+ // Collect from the concurrent bag into a snapshot, then ask the strategy to order them.
154+ var readySnapshot = new List < FlowStep > ( ) ;
155+ while ( readyBag . TryTake ( out var taken ) )
156+ {
157+ readySnapshot . Add ( taken ) ;
158+ }
159+
160+ var prioritised = _strategy . Prioritize ( readySnapshot , schedulingContext ) ;
161+
162+ foreach ( var step in prioritised )
143163 {
144164 if ( skipped . Contains ( step ) )
145165 {
@@ -208,7 +228,7 @@ CancellationToken cancellationToken
208228 ) ;
209229
210230 // Notify dependents — decrement their pending count.
211- EnqueueReadyDependents ( capturedStep , pendingDeps , dependents , skipped , readyQueue ) ;
231+ EnqueueReadyDependents ( capturedStep , pendingDeps , dependents , skipped , readyBag ) ;
212232 }
213233 }
214234 finally
@@ -223,7 +243,7 @@ CancellationToken cancellationToken
223243 inFlight . Add ( task ) ;
224244 }
225245
226- if ( inFlight . Count == 0 )
246+ if ( inFlight . Count == 0 && readyBag . IsEmpty )
227247 {
228248 // Nothing dispatched and nothing running. If not all steps accounted for,
229249 // the dependency graph has a cycle that AssignLayers should have caught.
@@ -290,9 +310,9 @@ CancellationToken cancellationToken
290310 private static void EnqueueReadyDependents (
291311 FlowStep completedStep ,
292312 ConcurrentDictionary < FlowStep , int > pendingDeps ,
293- Dictionary < FlowStep , List < FlowStep > > dependents ,
313+ IReadOnlyDictionary < FlowStep , IReadOnlyList < FlowStep > > dependents ,
294314 HashSet < FlowStep > skipped ,
295- ConcurrentQueue < FlowStep > readyQueue
315+ ConcurrentBag < FlowStep > readyBag
296316 )
297317 {
298318 foreach ( var dependent in dependents [ completedStep ] )
@@ -305,15 +325,15 @@ ConcurrentQueue<FlowStep> readyQueue
305325 var remaining = pendingDeps . AddOrUpdate ( dependent , 0 , ( _ , current ) => current - 1 ) ;
306326 if ( remaining == 0 )
307327 {
308- readyQueue . Enqueue ( dependent ) ;
328+ readyBag . Add ( dependent ) ;
309329 }
310330 }
311331 }
312332
313333 private static void SkipDownstream (
314334 FlowStep failedStep ,
315335 HashSet < FlowStep > skipped ,
316- Dictionary < FlowStep , List < FlowStep > > dependents
336+ IReadOnlyDictionary < FlowStep , IReadOnlyList < FlowStep > > dependents
317337 )
318338 {
319339 // BFS over the dependents graph.
0 commit comments