Skip to content

Commit 934adee

Browse files
committed
feat: multithreading implementation
1 parent 98c1cad commit 934adee

5 files changed

Lines changed: 392 additions & 77 deletions

File tree

CONTRIBUTING.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ This doesn't mean *ignoring* these concerns — it just means extending the API
104104
The project uses NX for task orchestration. Common commands:
105105

106106
```bash
107-
nx run flowthru:build # Build the solution
108-
nx run flowthru:test # Run all tests with coverage
109-
nx run flowthru:format:csharp # Format code with CSharpier
107+
dotnet build # Confirm solution builds fully
108+
nx run affected -t test # Run all test projects affected by current changes
109+
dotnet test # Run all tests for the project.
110110
```
111111

112112
To run a subset of tests by category:

src/core/Flowthru.Core/Flows/ExecutionOptions.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
using Flowthru.Core.Results;
21
using Flowthru.Core.Graph;
2+
using Flowthru.Core.Results;
33

44
namespace Flowthru.Core.Flows;
55

@@ -34,13 +34,23 @@ public class ExecutionOptions
3434
public bool StopOnFirstError { get; set; } = true;
3535

3636
/// <summary>
37-
/// Whether to enable parallel execution of nodes within the same layer.
37+
/// Maximum number of steps that may execute concurrently.
3838
/// </summary>
3939
/// <remarks>
40-
/// Phase 2 feature - currently not implemented.
41-
/// When true, nodes in the same execution layer run concurrently.
40+
/// <para>
41+
/// Controls the degree of parallelism in the task-graph scheduler. Steps whose
42+
/// dependencies are all satisfied are dispatched immediately, up to this limit.
43+
/// </para>
44+
/// <para>
45+
/// <list type="bullet">
46+
/// <item><c>1</c> (default) — sequential execution; one step at a time in topological order.</item>
47+
/// <item><c>N &gt; 1</c> — up to N independent steps run concurrently.</item>
48+
/// <item><c>-1</c> or <see cref="int.MaxValue"/> — unbounded parallelism; all ready steps
49+
/// are dispatched immediately.</item>
50+
/// </list>
51+
/// </para>
4252
/// </remarks>
43-
public bool EnableParallelExecution { get; set; } = false;
53+
public int MaxDegreeOfParallelism { get; set; } = 1;
4454

4555
/// <summary>
4656
/// The result formatter to use for displaying execution results.

src/core/Flowthru.Core/Flows/Flow.cs

Lines changed: 67 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -553,20 +553,40 @@ public DagMetadata ExportDag()
553553
}
554554

555555
/// <summary>
556-
/// /// Builds and executes the flow, returning comprehensive execution results.
556+
/// Builds and executes the flow, returning comprehensive execution results.
557557
/// </summary>
558558
/// <param name="cancellationToken">Cancellation token to signal graceful shutdown</param>
559-
/// <returns>FlowResult containing execution status, timing, and Flow results</returns>
559+
/// <returns>FlowResult containing execution status, timing, and step results</returns>
560560
/// <remarks>
561-
/// <para>
562561
/// This is the primary high-level API for executing flows. It automatically
563-
/// calls Build() if the Flow hasn't been built yet, then executes and tracks results.
562+
/// calls Build() if the Flow hasn't been built yet, then executes via the
563+
/// task-graph scheduler with default options (sequential, stop on first error).
564+
/// </remarks>
565+
public Task<FlowResult> RunAsync(CancellationToken cancellationToken) =>
566+
RunAsync(new ExecutionOptions(), cancellationToken);
567+
568+
/// <summary>
569+
/// Builds and executes the flow with the supplied execution options.
570+
/// </summary>
571+
/// <param name="options">Controls parallelism, error policy, and other execution behaviour.</param>
572+
/// <param name="cancellationToken">Cancellation token to signal graceful shutdown</param>
573+
/// <returns>FlowResult containing execution status, timing, and step results</returns>
574+
/// <remarks>
575+
/// <para>
576+
/// Steps are dispatched by the task-graph scheduler as soon as all their dependencies
577+
/// complete, up to <see cref="ExecutionOptions.MaxDegreeOfParallelism"/> concurrent steps.
578+
/// </para>
579+
/// <para>
580+
/// With <c>MaxDegreeOfParallelism = 1</c> (default) execution is sequential and
581+
/// behaviourally equivalent to the previous layer-by-layer loop.
564582
/// </para>
565583
/// </remarks>
566-
public async Task<FlowResult> RunAsync(CancellationToken cancellationToken)
584+
public async Task<FlowResult> RunAsync(
585+
ExecutionOptions options,
586+
CancellationToken cancellationToken
587+
)
567588
{
568589
var stopwatch = Stopwatch.StartNew();
569-
var stepResults = new Dictionary<string, StepResult>();
570590

571591
try
572592
{
@@ -577,33 +597,34 @@ public async Task<FlowResult> RunAsync(CancellationToken cancellationToken)
577597
Build();
578598
}
579599

580-
Logger?.LogInformation("Starting flow execution via RunAsync()");
600+
var stepList = (IReadOnlyList<FlowStep>)(_slicedSteps ?? _steps);
581601

582-
// Execute all layers
583-
foreach (var layer in ExecutionLayers!)
584-
{
585-
Logger?.LogInformation("Executing layer with {StepCount} steps", layer.Count);
602+
Logger?.LogInformation(
603+
"Starting flow execution via RunAsync() ({StepCount} steps, parallelism={Parallelism})",
604+
stepList.Count,
605+
options.MaxDegreeOfParallelism
606+
);
586607

587-
foreach (var flowStep in layer)
588-
{
589-
// Check for cancellation before starting each step
590-
cancellationToken.ThrowIfCancellationRequested();
608+
var executor = new Graph.TaskGraphExecutor(
609+
stepList,
610+
options.MaxDegreeOfParallelism,
611+
ExecuteStepWithTrackingAsync,
612+
Logger
613+
);
591614

592-
var stepResult = await ExecuteStepWithTrackingAsync(flowStep, cancellationToken);
593-
stepResults[flowStep.Label] = stepResult;
615+
var stepResults = await executor.RunAsync(options.StopOnFirstError, cancellationToken);
594616

595-
// If step failed, stop execution
596-
if (!stepResult.Success)
597-
{
598-
stopwatch.Stop();
599-
return FlowResult.CreateFailure(
600-
stopwatch.Elapsed,
601-
stepResult.Exception!,
602-
stepResults,
603-
Name
604-
);
605-
}
606-
}
617+
// Surface the first failure when StopOnFirstError is true.
618+
var firstFailure = stepResults.Values.FirstOrDefault(r => !r.Success);
619+
if (firstFailure != null)
620+
{
621+
stopwatch.Stop();
622+
return FlowResult.CreateFailure(
623+
stopwatch.Elapsed,
624+
firstFailure.Exception!,
625+
stepResults,
626+
Name
627+
);
607628
}
608629

609630
stopwatch.Stop();
@@ -616,40 +637,31 @@ public async Task<FlowResult> RunAsync(CancellationToken cancellationToken)
616637
}
617638
catch (OperationCanceledException)
618639
{
619-
// Re-throw cancellation exceptions so they propagate to the caller
620-
// Cancellation is not a failure but a requested abort
640+
// Re-throw — cancellation is not a failure but a requested abort.
621641
stopwatch.Stop();
622642
throw;
623643
}
624644
catch (Exception ex)
625645
{
626646
stopwatch.Stop();
627647
Logger?.LogError(ex, "Flow execution failed: {ErrorMessage}", ex.Message);
628-
return FlowResult.CreateFailure(stopwatch.Elapsed, ex, stepResults, Name);
648+
return FlowResult.CreateFailure(
649+
stopwatch.Elapsed,
650+
ex,
651+
new Dictionary<string, StepResult>(),
652+
Name
653+
);
629654
}
630655
}
631656

632657
/// <summary>
633-
/// Executes the Flow sequentially, layer by layer.
658+
/// Executes the flow in topological order, throwing on the first step failure.
634659
/// </summary>
635660
/// <param name="cancellationToken">Cancellation token to signal graceful shutdown</param>
636-
/// <returns>Task representing the Flow execution</returns>
637-
/// <exception cref="InvalidOperationException">Thrown if Flow has not been built</exception>
661+
/// <returns>Task representing the flow execution</returns>
662+
/// <exception cref="InvalidOperationException">Thrown if the flow has not been built</exception>
638663
/// <remarks>
639-
/// <para>
640-
/// This method executes Flow in topological order:
641-
/// 1. Execute all Flow in layer 0 sequentially
642-
/// 2. Execute all Flow in layer 1 sequentially
643-
/// 3. Continue until all layers are complete
644-
/// </para>
645-
/// <para>
646-
/// <strong>Note:</strong> This method throws exceptions on failure. For result-based
647-
/// execution with error handling, use RunAsync() instead.
648-
/// </para>
649-
/// <para>
650-
/// In Phase 2, this will be replaced with a parallel executor that can run
651-
/// steps within the same layer concurrently.
652-
/// </para>
664+
/// For structured result-based execution (including parallel), use <see cref="RunAsync(CancellationToken)"/>.
653665
/// </remarks>
654666
public async Task ExecuteAsync(CancellationToken cancellationToken)
655667
{
@@ -664,22 +676,17 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
664676

665677
try
666678
{
667-
foreach (var layer in ExecutionLayers!)
668-
{
669-
Logger?.LogInformation("Executing layer with {StepCount} steps", layer.Count);
679+
var result = await RunAsync(cancellationToken);
670680

671-
foreach (var flowStep in layer)
672-
{
673-
// Check for cancellation before starting each step
674-
cancellationToken.ThrowIfCancellationRequested();
675-
676-
await ExecuteStepAsync(flowStep, cancellationToken);
677-
}
681+
if (!result.Success)
682+
{
683+
throw result.Exception
684+
?? new InvalidOperationException("Flow execution failed with no exception details.");
678685
}
679686

680687
Logger?.LogInformation("Flow execution completed successfully");
681688
}
682-
catch (Exception ex)
689+
catch (Exception ex) when (ex is not OperationCanceledException)
683690
{
684691
Logger?.LogError(ex, "Flow execution failed: {ErrorMessage}", ex.Message);
685692
throw;

0 commit comments

Comments
 (0)