Skip to content

Commit ee7d58c

Browse files
committed
fix: testing for parallelized pre-flight checks
1 parent 90dba94 commit ee7d58c

2 files changed

Lines changed: 316 additions & 1 deletion

File tree

src/core/Flowthru.Core/Flows/Flow.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ public DagMetadata ExportDag()
335335
/// <summary>
336336
/// Validates all external inputs before Flow execution.
337337
/// </summary>
338-
/// <param name="cancellationToken">Cancellation token for validation I/O operations</param>
339338
/// <returns>ValidationResult containing any errors found</returns>
340339
/// <exception cref="InvalidOperationException">Thrown if Flow has not been built</exception>
341340
/// <remarks>
@@ -383,6 +382,12 @@ public DagMetadata ExportDag()
383382
/// await flow.RunAsync();
384383
/// </code>
385384
/// </remarks>
385+
/// <param name="maxDegreeOfParallelism">
386+
/// Maximum number of external inputs inspected concurrently. Defaults to 1 (sequential).
387+
/// Pass the resolved <c>ExecutionOptions.MaxDegreeOfParallelism</c> to fan out I/O-bound
388+
/// inspections in parallel.
389+
/// </param>
390+
/// <param name="cancellationToken">Cancellation token for async operations.</param>
386391
public async Task<Data.Validation.ValidationResult> ValidateExternalInputsAsync(
387392
int maxDegreeOfParallelism = 1,
388393
CancellationToken cancellationToken = default
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
using System.Collections.Concurrent;
2+
using Flowthru.Core.Data;
3+
using Flowthru.Core.Data.Capabilities;
4+
using Flowthru.Core.Data.Storage;
5+
using Flowthru.Core.Data.Validation;
6+
using Flowthru.Core.Effects;
7+
using Flowthru.Core.Flows;
8+
using Flowthru.Tests.Fixtures.TestCatalogs;
9+
using Flowthru.Tests.Fixtures.TestSteps;
10+
11+
namespace Flowthru.Tests.Validation.PreFlightInspection;
12+
13+
/// <summary>
14+
/// Tests verifying the parallel pre-flight inspection behaviour of
15+
/// <see cref="Flow.ValidateExternalInputsAsync"/>.
16+
/// </summary>
17+
/// <remarks>
18+
/// <para>
19+
/// These tests exercise the three contracts introduced by the parallel inspection path:
20+
/// </para>
21+
/// <list type="number">
22+
/// <item>
23+
/// <strong>Error aggregation</strong> — all per-entry errors are collected regardless of
24+
/// concurrency; none are dropped by the <c>ConcurrentBag</c> → sequential-merge pattern.
25+
/// </item>
26+
/// <item>
27+
/// <strong>Concurrency</strong> — with <c>maxDegreeOfParallelism &gt; 1</c>, independent
28+
/// I/O-bound inspections actually execute concurrently (overlapping windows).
29+
/// </item>
30+
/// <item>
31+
/// <strong>Sequential default</strong> — with <c>maxDegreeOfParallelism = 1</c>, inspections
32+
/// run serially (no overlapping windows), preserving the pre-existing behavior.
33+
/// </item>
34+
/// </list>
35+
/// <para>
36+
/// Concurrency is verified structurally (overlapping execution windows) rather than by
37+
/// elapsed time — wall-clock assertions are inherently flaky on loaded CI machines.
38+
/// </para>
39+
/// </remarks>
40+
[TestFixture]
41+
[Category("Validation")]
42+
[Category("PreFlight")]
43+
public class ParallelPreFlightTests
44+
{
45+
private static readonly TimeSpan InspectionDelay = TimeSpan.FromMilliseconds(200);
46+
47+
/// <summary>Returns true when two inspection windows overlap in time.</summary>
48+
private static bool Overlaps(
49+
(string Label, DateTime Start, DateTime End) a,
50+
(string Label, DateTime Start, DateTime End) b
51+
) => a.Start < b.End && b.Start < a.End;
52+
53+
// ─────────────────────────────────────────────────────────────────────────
54+
// Error aggregation
55+
// ─────────────────────────────────────────────────────────────────────────
56+
57+
[Test]
58+
public async Task ValidateExternalInputsAsync_WithParallelism_AggregatesAllErrors()
59+
{
60+
// Arrange: 3 independent external inputs, each backed by a failing adapter.
61+
// All 3 errors must survive the ConcurrentBag → sequential-merge path.
62+
var input1 = new Item<IEnumerable<TestData>>(
63+
"input_1",
64+
new FailingInspectionAdapter("input_1")
65+
);
66+
var input2 = new Item<IEnumerable<TestData>>(
67+
"input_2",
68+
new FailingInspectionAdapter("input_2")
69+
);
70+
var input3 = new Item<IEnumerable<TestData>>(
71+
"input_3",
72+
new FailingInspectionAdapter("input_3")
73+
);
74+
75+
var output1 = ItemFactory.Enumerable.Memory<TestData>("output_1");
76+
var output2 = ItemFactory.Enumerable.Memory<TestData>("output_2");
77+
var output3 = ItemFactory.Enumerable.Memory<TestData>("output_3");
78+
79+
var flow = FlowBuilder.CreateFlow(builder =>
80+
{
81+
builder.AddStep("Step1", PassthroughStep.Create(), input1, output1);
82+
builder.AddStep("Step2", PassthroughStep.Create(), input2, output2);
83+
builder.AddStep("Step3", PassthroughStep.Create(), input3, output3);
84+
});
85+
86+
flow.Build();
87+
88+
flow.ValidationOptions.Inspect(input1, InspectionLevel.Shallow);
89+
flow.ValidationOptions.Inspect(input2, InspectionLevel.Shallow);
90+
flow.ValidationOptions.Inspect(input3, InspectionLevel.Shallow);
91+
92+
// Act
93+
var result = await flow.ValidateExternalInputsAsync(
94+
maxDegreeOfParallelism: 3,
95+
cancellationToken: CancellationToken.None
96+
);
97+
98+
// Assert
99+
Assert.That(result.IsValid, Is.False);
100+
Assert.That(result.Errors, Has.Count.EqualTo(3), "All 3 per-entry errors must be aggregated");
101+
}
102+
103+
// ─────────────────────────────────────────────────────────────────────────
104+
// Concurrency — independent inputs overlap when parallelism > 1
105+
// ─────────────────────────────────────────────────────────────────────────
106+
107+
[Test]
108+
public async Task ValidateExternalInputsAsync_WithParallelism_IndependentInputsOverlapInTime()
109+
{
110+
// Arrange
111+
var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>();
112+
113+
var input1 = new Item<IEnumerable<TestData>>(
114+
"input_1",
115+
new RecordingInspectionAdapter(log, "input_1", InspectionDelay)
116+
);
117+
var input2 = new Item<IEnumerable<TestData>>(
118+
"input_2",
119+
new RecordingInspectionAdapter(log, "input_2", InspectionDelay)
120+
);
121+
var input3 = new Item<IEnumerable<TestData>>(
122+
"input_3",
123+
new RecordingInspectionAdapter(log, "input_3", InspectionDelay)
124+
);
125+
126+
var output1 = ItemFactory.Enumerable.Memory<TestData>("output_1");
127+
var output2 = ItemFactory.Enumerable.Memory<TestData>("output_2");
128+
var output3 = ItemFactory.Enumerable.Memory<TestData>("output_3");
129+
130+
var flow = FlowBuilder.CreateFlow(builder =>
131+
{
132+
builder.AddStep("Step1", PassthroughStep.Create(), input1, output1);
133+
builder.AddStep("Step2", PassthroughStep.Create(), input2, output2);
134+
builder.AddStep("Step3", PassthroughStep.Create(), input3, output3);
135+
});
136+
137+
flow.Build();
138+
139+
flow.ValidationOptions.Inspect(input1, InspectionLevel.Shallow);
140+
flow.ValidationOptions.Inspect(input2, InspectionLevel.Shallow);
141+
flow.ValidationOptions.Inspect(input3, InspectionLevel.Shallow);
142+
143+
// Act
144+
await flow.ValidateExternalInputsAsync(
145+
maxDegreeOfParallelism: 3,
146+
cancellationToken: CancellationToken.None
147+
);
148+
149+
// Assert: at least two inspection windows must overlap
150+
var entries = log.ToList();
151+
Assert.That(entries, Has.Count.EqualTo(3));
152+
153+
var anyOverlap = entries
154+
.SelectMany(a => entries, (a, b) => (a, b))
155+
.Where(pair => pair.a.Label != pair.b.Label)
156+
.Any(pair => Overlaps(pair.a, pair.b));
157+
158+
Assert.That(
159+
anyOverlap,
160+
Is.True,
161+
"With maxDegreeOfParallelism = 3, at least two inspection windows should overlap"
162+
);
163+
}
164+
165+
// ─────────────────────────────────────────────────────────────────────────
166+
// Sequential default — no overlap with maxDegreeOfParallelism = 1
167+
// ─────────────────────────────────────────────────────────────────────────
168+
169+
[Test]
170+
public async Task ValidateExternalInputsAsync_WithSequential_InspectionsDoNotOverlap()
171+
{
172+
// Arrange: same topology, but maxDegreeOfParallelism = 1 (default/sequential).
173+
var log = new ConcurrentBag<(string Label, DateTime Start, DateTime End)>();
174+
175+
var input1 = new Item<IEnumerable<TestData>>(
176+
"input_1",
177+
new RecordingInspectionAdapter(log, "input_1", InspectionDelay)
178+
);
179+
var input2 = new Item<IEnumerable<TestData>>(
180+
"input_2",
181+
new RecordingInspectionAdapter(log, "input_2", InspectionDelay)
182+
);
183+
var input3 = new Item<IEnumerable<TestData>>(
184+
"input_3",
185+
new RecordingInspectionAdapter(log, "input_3", InspectionDelay)
186+
);
187+
188+
var output1 = ItemFactory.Enumerable.Memory<TestData>("output_1");
189+
var output2 = ItemFactory.Enumerable.Memory<TestData>("output_2");
190+
var output3 = ItemFactory.Enumerable.Memory<TestData>("output_3");
191+
192+
var flow = FlowBuilder.CreateFlow(builder =>
193+
{
194+
builder.AddStep("Step1", PassthroughStep.Create(), input1, output1);
195+
builder.AddStep("Step2", PassthroughStep.Create(), input2, output2);
196+
builder.AddStep("Step3", PassthroughStep.Create(), input3, output3);
197+
});
198+
199+
flow.Build();
200+
201+
flow.ValidationOptions.Inspect(input1, InspectionLevel.Shallow);
202+
flow.ValidationOptions.Inspect(input2, InspectionLevel.Shallow);
203+
flow.ValidationOptions.Inspect(input3, InspectionLevel.Shallow);
204+
205+
// Act
206+
await flow.ValidateExternalInputsAsync(
207+
maxDegreeOfParallelism: 1,
208+
cancellationToken: CancellationToken.None
209+
);
210+
211+
// Assert: no windows should overlap when running sequentially
212+
var entries = log.ToList();
213+
Assert.That(entries, Has.Count.EqualTo(3));
214+
215+
var anyOverlap = entries
216+
.SelectMany(a => entries, (a, b) => (a, b))
217+
.Where(pair => pair.a.Label != pair.b.Label)
218+
.Any(pair => Overlaps(pair.a, pair.b));
219+
220+
Assert.That(
221+
anyOverlap,
222+
Is.False,
223+
"With maxDegreeOfParallelism = 1, inspection windows should never overlap"
224+
);
225+
}
226+
227+
// ─────────────────────────────────────────────────────────────────────────
228+
// Test doubles
229+
// ─────────────────────────────────────────────────────────────────────────
230+
231+
/// <summary>
232+
/// Storage adapter whose <see cref="InspectShallow"/> always returns a failing
233+
/// <see cref="ValidationResult"/> — used to verify error aggregation.
234+
/// </summary>
235+
private sealed class FailingInspectionAdapter : IStorageAdapter<IEnumerable<TestData>>
236+
{
237+
private readonly string _label;
238+
239+
public FailingInspectionAdapter(string label) => _label = label;
240+
241+
public StorageTraits Traits => new StorageTraits();
242+
243+
public FlowIO<IEnumerable<TestData>> Load() => FlowIO.Lift(() => Enumerable.Empty<TestData>());
244+
245+
public FlowIO<FlowUnit> Save(IEnumerable<TestData> data) => FlowIO.Pure(FlowUnit.Default);
246+
247+
public FlowIO<bool> Exists() => FlowIO.Pure(true);
248+
249+
public FlowIO<ValidationResult> InspectShallow(int sampleSize) =>
250+
FlowIO.Pure(
251+
new ValidationResult(
252+
new[]
253+
{
254+
new ValidationError(
255+
_label,
256+
ValidationErrorType.InspectionFailure,
257+
$"Simulated inspection failure for '{_label}'",
258+
null
259+
),
260+
}
261+
)
262+
);
263+
264+
public FlowIO<ValidationResult> InspectDeep() => InspectShallow(0);
265+
}
266+
267+
/// <summary>
268+
/// Storage adapter whose <see cref="InspectShallow"/> records a timestamped execution
269+
/// window to a shared log and then delays — used to verify concurrent vs. serial dispatch.
270+
/// </summary>
271+
private sealed class RecordingInspectionAdapter : IStorageAdapter<IEnumerable<TestData>>
272+
{
273+
private readonly ConcurrentBag<(string Label, DateTime Start, DateTime End)> _log;
274+
private readonly string _label;
275+
private readonly TimeSpan _delay;
276+
277+
public RecordingInspectionAdapter(
278+
ConcurrentBag<(string Label, DateTime Start, DateTime End)> log,
279+
string label,
280+
TimeSpan delay
281+
)
282+
{
283+
_log = log;
284+
_label = label;
285+
_delay = delay;
286+
}
287+
288+
public StorageTraits Traits => new StorageTraits();
289+
290+
public FlowIO<IEnumerable<TestData>> Load() => FlowIO.Lift(() => Enumerable.Empty<TestData>());
291+
292+
public FlowIO<FlowUnit> Save(IEnumerable<TestData> data) => FlowIO.Pure(FlowUnit.Default);
293+
294+
public FlowIO<bool> Exists() => FlowIO.Pure(true);
295+
296+
public FlowIO<ValidationResult> InspectShallow(int sampleSize)
297+
{
298+
Func<CancellationToken, ValueTask<ValidationResult>> inspector = async (ct) =>
299+
{
300+
var start = DateTime.UtcNow;
301+
await Task.Delay(_delay, ct);
302+
_log.Add((_label, start, DateTime.UtcNow));
303+
return ValidationResult.Success();
304+
};
305+
return FlowIO.LiftAsync(inspector);
306+
}
307+
308+
public FlowIO<ValidationResult> InspectDeep() => InspectShallow(0);
309+
}
310+
}

0 commit comments

Comments
 (0)