Skip to content

Commit 107ceea

Browse files
committed
fix: corrected log output
1 parent 67a1065 commit 107ceea

3 files changed

Lines changed: 21 additions & 14 deletions

File tree

src/core/Flowthru.Core/Graph/TaskGraphExecutor.cs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ CancellationToken cancellationToken
131131
var inFlight = new List<Task>();
132132
var totalSteps = _steps.Count;
133133

134+
// Pre-populate worker slot IDs (1..N). The semaphore guarantees at most N concurrent
135+
// tasks hold the semaphore — so a slot is always available on a successful WaitAsync.
136+
var slotCount = Math.Max(1, Math.Min(_maxDegreeOfParallelism, totalSteps));
137+
var workerSlots = new ConcurrentQueue<int>(Enumerable.Range(1, slotCount));
138+
134139
while (results.Count + skipped.Count < totalSteps)
135140
{
136141
// Drain all currently runnable steps into in-flight tasks.
@@ -144,20 +149,22 @@ CancellationToken cancellationToken
144149

145150
await semaphore.WaitAsync(dispatchToken).ConfigureAwait(false);
146151

152+
// Slot dequeue is safe here: semaphore ensures at most slotCount concurrent holders.
153+
workerSlots.TryDequeue(out var workerId);
147154
var capturedStep = step;
148155
var task = Task.Run(
149156
async () =>
150157
{
151158
try
152159
{
153-
var threadId = Environment.CurrentManagedThreadId;
154160
var startOrdinal = Interlocked.Increment(ref dispatchedCount);
155161
_logger?.LogInformation(
156-
" → {StepName} executing... ({StartOrdinal} of {Total} steps, thread {ThreadId})",
162+
" → {StepName} executing... ({StartOrdinal} of {Total} steps, worker {WorkerId}/{TotalWorkers})",
157163
capturedStep.Label,
158164
startOrdinal,
159165
totalSteps,
160-
threadId
166+
workerId,
167+
slotCount
161168
);
162169

163170
var result = await _executeStep(capturedStep, dispatchToken).ConfigureAwait(false);
@@ -167,11 +174,12 @@ CancellationToken cancellationToken
167174
if (!result.Success)
168175
{
169176
_logger?.LogWarning(
170-
" ✗ {StepName} failed ({CompletedCount} of {Total} steps, thread {ThreadId})",
177+
" ✗ {StepName} failed ({CompletedCount} of {Total} steps, worker {WorkerId}/{TotalWorkers})",
171178
capturedStep.Label,
172179
completedCount,
173180
totalSteps,
174-
threadId
181+
workerId,
182+
slotCount
175183
);
176184

177185
if (stopOnFirstError)
@@ -188,14 +196,15 @@ CancellationToken cancellationToken
188196
else
189197
{
190198
_logger?.LogInformation(
191-
" ✓ {StepName,-40} {Duration,6:F2}s ({InputCount,6} → {OutputCount,6} records) ({CompletedCount} of {Total} steps, thread {ThreadId})",
199+
" ✓ {StepName,-40} {Duration,6:F2}s ({InputCount,6} → {OutputCount,6} records) ({CompletedCount} of {Total} steps, worker {WorkerId}/{TotalWorkers})",
192200
capturedStep.Label,
193201
result.ExecutionTime.TotalSeconds,
194202
result.InputCount,
195203
result.OutputCount,
196204
completedCount,
197205
totalSteps,
198-
threadId
206+
workerId,
207+
slotCount
199208
);
200209

201210
// Notify dependents — decrement their pending count.
@@ -204,6 +213,7 @@ CancellationToken cancellationToken
204213
}
205214
finally
206215
{
216+
workerSlots.Enqueue(workerId);
207217
semaphore.Release();
208218
}
209219
},

src/core/Flowthru.Core/Services/FlowthruService.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,7 @@ public async Task<FlowResult> ExecuteFlowAsync(
103103

104104
// Resolve MaxDegreeOfParallelism: CLI/caller value wins; service default is fallback; 1 is the floor.
105105
options.MaxDegreeOfParallelism =
106-
options.MaxDegreeOfParallelism
107-
?? _executionDefaults.MaxDegreeOfParallelism
108-
?? 1;
106+
options.MaxDegreeOfParallelism ?? _executionDefaults.MaxDegreeOfParallelism ?? 1;
109107

110108
// ════════════════════════════════════════
111109
// PRE-FLIGHT CHECKS

src/core/Flowthru.Core/Services/FlowthruServiceBuilder.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -651,10 +651,9 @@ internal void RegisterFlowDictionary()
651651
{
652652
// Register the service-level default parallelism so FlowthruService can consume it.
653653
var defaultParallelism = _defaultMaxDegreeOfParallelism;
654-
_services.AddSingleton(new FlowthruExecutionDefaults
655-
{
656-
MaxDegreeOfParallelism = defaultParallelism,
657-
});
654+
_services.AddSingleton(
655+
new FlowthruExecutionDefaults { MaxDegreeOfParallelism = defaultParallelism }
656+
);
658657

659658
// Always register the catalog collection so FlowthruService can inject all catalogs.
660659
// Merges both type-registered catalogs (RegisterCatalog) and dynamically constructed

0 commit comments

Comments
 (0)