Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ dotnet test PatternKit.slnx -c Release
* **Commerce Backends for Frontends:** `CommerceBackendsForFrontendsDemo` (+ `CommerceBackendsForFrontendsDemoTests`) — fluent and generated client-specific facade shaping with DI and ASP.NET Core mapping.
* **Inventory Ambassador:** `InventoryAmbassadorDemo` (+ `InventoryAmbassadorDemoTests`) — fluent and generated outbound connectivity wrapper with DI and ASP.NET Core mapping.
* **Warehouse Leader Election:** `WarehouseLeaderElectionDemo` (+ `WarehouseLeaderElectionDemoTests`) — fluent and generated active worker lease coordination with DI and Generic Host mapping.
* **Warehouse Scheduler Agent Supervisor:** `WarehouseSchedulerAgentSupervisorDemo` (+ `WarehouseSchedulerAgentSupervisorDemoTests`) — fluent and generated scheduled worker supervision with DI and Generic Host mapping.
* **Production-Ready Example Catalog:** `PatternKitExampleCatalog` (+ `PatternKitExampleCatalogTests`) — DI registration, generic host validation, ASP.NET Core endpoint mapping, and source/test/docs manifest checks.
* **Tests:** `PatternKit.Examples.Tests/*` use TinyBDD scenarios that read like specs.

Expand Down
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,6 @@

- name: Warehouse Leader Election
href: warehouse-leader-election.md

- name: Warehouse Scheduler Agent Supervisor
href: warehouse-scheduler-agent-supervisor.md
12 changes: 12 additions & 0 deletions docs/examples/warehouse-scheduler-agent-supervisor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Warehouse Scheduler Agent Supervisor

The warehouse scheduler demo shows scheduled replenishment work imported through Microsoft dependency injection and Generic Host.

```csharp
services.AddWarehouseSchedulerAgentSupervisorDemo();

var runner = provider.GetRequiredService<WarehouseSchedulerDemoRunner>();
var results = runner.RunGenerated();
```

The example includes fluent and source-generated construction, an `IServiceCollection` extension, retry supervision, result capture, and a hosted service that schedules and dispatches work during host startup.
1 change: 1 addition & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
| [**Backends for Frontends**](backends-for-frontends.md) | Client-specific facade factories | `[GenerateBackendsForFrontends]` |
| [**Ambassador**](ambassador.md) | Outbound connectivity wrapper factories | `[GenerateAmbassador]` |
| [**Leader Election**](leader-election.md) | Lease-backed active worker factories | `[GenerateLeaderElection]` |
| [**Scheduler Agent Supervisor**](scheduler-agent-supervisor.md) | Scheduled worker supervision factories | `[GenerateSchedulerAgentSupervisor]` |

## Quick Reference

Expand Down
19 changes: 19 additions & 0 deletions docs/generators/scheduler-agent-supervisor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Scheduler Agent Supervisor Generator

`[GenerateSchedulerAgentSupervisor]` creates a typed `SchedulerAgentSupervisor<TWork, TResult>` factory from agent methods and an optional retry predicate.

```csharp
[GenerateSchedulerAgentSupervisor(typeof(WarehouseReplenishmentWork), typeof(WarehouseReplenishmentSummary), SupervisorName = "warehouse-replenishment-scheduler")]
public static partial class GeneratedWarehouseScheduler
{
[SchedulerAgent("release-replenishment")]
private static WarehouseReplenishmentSummary Release(SchedulerAgentContext<WarehouseReplenishmentWork> context)
=> new(context.Work.BatchId, context.Attempt);

[SchedulerRetryWhen]
private static bool Retry(Exception exception, SchedulerAgentContext<WarehouseReplenishmentWork> context)
=> exception is InvalidOperationException;
}
```

Generated factories configure `MaxAttempts`, `RetryDelayMilliseconds`, all declared agents, and the retry predicate. Agent methods must be static and return the configured result type from `SchedulerAgentContext<TWork>`.
3 changes: 3 additions & 0 deletions docs/generators/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@
- name: Leader Election
href: leader-election.md

- name: Scheduler Agent Supervisor
href: scheduler-agent-supervisor.md

- name: Queue Load Leveling
href: queue-load-leveling.md

Expand Down
1 change: 1 addition & 0 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr
| Cloud Architecture | Backends for Frontends | `BackendsForFrontends<TRequest,TResponse>` | Backends for Frontends generator |
| Cloud Architecture | Ambassador | `Ambassador<TRequest,TResponse>` | Ambassador generator |
| Cloud Architecture | Leader Election | `LeaderElection<TContext>` | Leader Election generator |
| Cloud Architecture | Scheduler Agent Supervisor | `SchedulerAgentSupervisor<TWork,TResult>` | Scheduler Agent Supervisor generator |
| Application Architecture | CQRS | Mediator/dispatcher command-query split | Dispatcher generator |
| Application Architecture | Specification | `Specification<T>` and named registries | Specification generator |
| Application Architecture | Repository | `IRepository<TEntity,TKey>` and `InMemoryRepository<TEntity,TKey>` | Repository generator |
Expand Down
21 changes: 21 additions & 0 deletions docs/patterns/cloud/scheduler-agent-supervisor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Scheduler Agent Supervisor

Scheduler Agent Supervisor coordinates due work, dispatches it to an agent, and applies supervision rules for retries and exhaustion.

```csharp
var scheduler = SchedulerAgentSupervisor<WarehouseReplenishmentWork, WarehouseReplenishmentSummary>
.Create("warehouse-replenishment-scheduler")
.Supervision(SchedulerSupervisionPolicy<WarehouseReplenishmentWork>.Create()
.MaxAttempts(2)
.RetryDelay(TimeSpan.FromSeconds(5))
.Build())
.Agent("release-replenishment", ctx => new(ctx.Work.BatchId, ctx.Attempt))
.Build();

scheduler.Schedule("replenish:B-100", work, dueAt);
var results = scheduler.RunDue(now);
```

Use it for scheduled background jobs where the application needs deterministic result capture, retry policy, and a clear supervision boundary. Production integrations should back scheduling state with durable storage and import the supervisor through `IServiceCollection`.

The source-generated path uses `[GenerateSchedulerAgentSupervisor]`, `[SchedulerAgent]`, and `[SchedulerRetryWhen]`. Import the example through `AddWarehouseSchedulerAgentSupervisorDemo()` or `AddPatternKitExamples()`.
2 changes: 2 additions & 0 deletions docs/patterns/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,8 @@
href: cloud/ambassador.md
- name: Leader Election
href: cloud/leader-election.md
- name: Scheduler Agent Supervisor
href: cloud/scheduler-agent-supervisor.md
- name: Application Architecture
items:
- name: Anti-Corruption Layer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
namespace PatternKit.Cloud.SchedulerAgentSupervisor;

public sealed class SchedulerAgentContext<TWork>
{
internal SchedulerAgentContext(string supervisorName, string jobName, TWork work, int attempt)
=> (SupervisorName, JobName, Work, Attempt) = (supervisorName, jobName, work, attempt);

public string SupervisorName { get; }

public string JobName { get; }

public TWork Work { get; }

public int Attempt { get; }

public IDictionary<string, object?> Items { get; } = new Dictionary<string, object?>();

public IList<string> Events { get; } = new List<string>();
}

public sealed class SchedulerAgentResult<TResult>
{
private SchedulerAgentResult(
string supervisorName,
string jobName,
string agentName,
int attempt,
TResult? response,
Exception? exception,
IReadOnlyList<string> events,
bool succeeded,
bool retryScheduled,
bool exhausted)
=> (SupervisorName, JobName, AgentName, Attempt, Response, Exception, Events, Succeeded, RetryScheduled, Exhausted) =
(supervisorName, jobName, agentName, attempt, response, exception, events, succeeded, retryScheduled, exhausted);

public string SupervisorName { get; }

public string JobName { get; }

public string AgentName { get; }

public int Attempt { get; }

public TResult? Response { get; }

public Exception? Exception { get; }

public IReadOnlyList<string> Events { get; }

public bool Succeeded { get; }

public bool Failed => !Succeeded;

public bool RetryScheduled { get; }

public bool Exhausted { get; }

public static SchedulerAgentResult<TResult> Success(string supervisorName, string jobName, string agentName, int attempt, TResult response, IReadOnlyList<string> events)
{
if (response is null)
throw new ArgumentNullException(nameof(response));
if (events is null)
throw new ArgumentNullException(nameof(events));
return new(supervisorName, jobName, agentName, attempt, response, null, events, succeeded: true, retryScheduled: false, exhausted: false);
}

public static SchedulerAgentResult<TResult> Failure(string supervisorName, string jobName, string agentName, int attempt, Exception exception, IReadOnlyList<string> events, bool retryScheduled, bool exhausted)
{
if (exception is null)
throw new ArgumentNullException(nameof(exception));
if (events is null)
throw new ArgumentNullException(nameof(events));
return new(supervisorName, jobName, agentName, attempt, default, exception, events, succeeded: false, retryScheduled, exhausted);
}
}

public sealed class SchedulerSupervisionPolicy<TWork>
{
private SchedulerSupervisionPolicy(int maxAttempts, TimeSpan retryDelay, Func<Exception, SchedulerAgentContext<TWork>, bool>? shouldRetry)
=> (MaxAttempts, RetryDelay, ShouldRetry) = (maxAttempts, retryDelay, shouldRetry ?? ((_, _) => true));

public int MaxAttempts { get; }

public TimeSpan RetryDelay { get; }

public Func<Exception, SchedulerAgentContext<TWork>, bool> ShouldRetry { get; }

public static Builder Create() => new();

public sealed class Builder
{
private int _maxAttempts = 3;
private TimeSpan _retryDelay = TimeSpan.FromSeconds(1);
private Func<Exception, SchedulerAgentContext<TWork>, bool>? _shouldRetry;

public Builder MaxAttempts(int maxAttempts)
{
if (maxAttempts <= 0)
throw new ArgumentOutOfRangeException(nameof(maxAttempts));
_maxAttempts = maxAttempts;
return this;
}

public Builder RetryDelay(TimeSpan retryDelay)
{
if (retryDelay < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(retryDelay));
_retryDelay = retryDelay;
return this;
}

public Builder RetryWhen(Func<Exception, SchedulerAgentContext<TWork>, bool> shouldRetry)
{
_shouldRetry = shouldRetry ?? throw new ArgumentNullException(nameof(shouldRetry));
return this;
}

public SchedulerSupervisionPolicy<TWork> Build() => new(_maxAttempts, _retryDelay, _shouldRetry);
}
}

public sealed class SchedulerAgentSupervisor<TWork, TResult>
{
private readonly List<ScheduledJob> _jobs = [];
private readonly List<Agent> _agents = [];
private readonly Func<DateTimeOffset> _clock;
private readonly SchedulerSupervisionPolicy<TWork> _policy;

private SchedulerAgentSupervisor(string name, Func<DateTimeOffset>? clock, SchedulerSupervisionPolicy<TWork>? policy, IReadOnlyList<Agent> agents)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Supervisor name is required.", nameof(name));
if (agents.Count == 0)
throw new InvalidOperationException("Scheduler Agent Supervisor requires at least one agent.");

Name = name;
_clock = clock ?? (() => DateTimeOffset.UtcNow);
_policy = policy ?? SchedulerSupervisionPolicy<TWork>.Create().Build();
_agents.AddRange(agents);
}

public string Name { get; }

public IReadOnlyList<string> PendingJobs => _jobs.Select(static job => job.Name).ToArray();

public static Builder Create(string name = "scheduler-agent-supervisor") => new(name);

public SchedulerAgentSupervisor<TWork, TResult> Schedule(string jobName, TWork work, DateTimeOffset dueAt)
{
if (string.IsNullOrWhiteSpace(jobName))
throw new ArgumentException("Job name is required.", nameof(jobName));
if (work is null)
throw new ArgumentNullException(nameof(work));
_jobs.Add(new ScheduledJob(jobName, work, dueAt, attempt: 0));
return this;
}

public IReadOnlyList<SchedulerAgentResult<TResult>> RunDue() => RunDue(_clock());

public IReadOnlyList<SchedulerAgentResult<TResult>> RunDue(DateTimeOffset now)
{
var due = _jobs.Where(job => job.DueAt <= now).OrderBy(static job => job.DueAt).ToArray();
if (due.Length == 0)
return Array.Empty<SchedulerAgentResult<TResult>>();

foreach (var job in due)
_jobs.Remove(job);

var results = new List<SchedulerAgentResult<TResult>>(due.Length);
foreach (var job in due)
results.Add(Dispatch(job, now));

return results;
}

private SchedulerAgentResult<TResult> Dispatch(ScheduledJob job, DateTimeOffset now)
{
var agent = _agents[0];
var attempt = job.Attempt + 1;
Comment on lines +177 to +180
var context = new SchedulerAgentContext<TWork>(Name, job.Name, job.Work, attempt);
context.Events.Add($"dispatch:{agent.Name}:{attempt}");
try
{
var response = agent.Execute(context);
return SchedulerAgentResult<TResult>.Success(Name, job.Name, agent.Name, attempt, response, context.Events.ToArray());
}
catch (Exception exception)
{
var canRetry = attempt < _policy.MaxAttempts && _policy.ShouldRetry(exception, context);
if (canRetry)
_jobs.Add(new ScheduledJob(job.Name, job.Work, now.Add(_policy.RetryDelay), attempt));
return SchedulerAgentResult<TResult>.Failure(Name, job.Name, agent.Name, attempt, exception, context.Events.ToArray(), canRetry, exhausted: !canRetry);
}
}

public sealed class Builder
{
private readonly string _name;
private readonly List<Agent> _agents = [];
private Func<DateTimeOffset>? _clock;
private SchedulerSupervisionPolicy<TWork>? _policy;

internal Builder(string name) => _name = name;

public Builder Clock(Func<DateTimeOffset> clock)
{
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
return this;
}

public Builder Supervision(SchedulerSupervisionPolicy<TWork> policy)
{
_policy = policy ?? throw new ArgumentNullException(nameof(policy));
return this;
}

public Builder Agent(string name, Func<SchedulerAgentContext<TWork>, TResult> execute)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Agent name is required.", nameof(name));
if (execute is null)
throw new ArgumentNullException(nameof(execute));
if (_agents.Any(agent => agent.Name == name))
throw new InvalidOperationException($"Scheduler agent '{name}' is already registered.");
_agents.Add(new Agent(name, execute));
return this;
}

public SchedulerAgentSupervisor<TWork, TResult> Build() => new(_name, _clock, _policy, _agents);
}

private sealed class Agent
{
public Agent(string name, Func<SchedulerAgentContext<TWork>, TResult> execute)
=> (Name, Execute) = (name, execute);

public string Name { get; }

public Func<SchedulerAgentContext<TWork>, TResult> Execute { get; }
}

private sealed class ScheduledJob
{
public ScheduledJob(string name, TWork work, DateTimeOffset dueAt, int attempt)
=> (Name, Work, DueAt, Attempt) = (name, work, dueAt, attempt);

public string Name { get; }

public TWork Work { get; }

public DateTimeOffset DueAt { get; }

public int Attempt { get; }
}
}
Loading
Loading