Skip to content

Commit 69efbd3

Browse files
jddunnclaude
andcommitted
feat(ml-classifiers): add ClassifierOrchestrator with parallel execution and worst-wins
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4e96f92 commit 69efbd3

File tree

2 files changed

+655
-0
lines changed

2 files changed

+655
-0
lines changed
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/**
2+
* @fileoverview Orchestrator for parallel ML classifier execution with worst-wins aggregation.
3+
*
4+
* The `ClassifierOrchestrator` runs all registered {@link IContentClassifier}
5+
* instances in parallel against a single text input and aggregates their
6+
* results into a single {@link ChunkEvaluation}. The aggregation policy is
7+
* **worst-wins**: if any classifier recommends BLOCK the overall result is
8+
* BLOCK, even if every other classifier returned ALLOW.
9+
*
10+
* Priority order (descending):
11+
* ```
12+
* BLOCK > FLAG > SANITIZE > ALLOW
13+
* ```
14+
*
15+
* Each classifier may have its own threshold overrides (via
16+
* `perClassifierThresholds`), and individual labels can be mapped to
17+
* hard-coded actions via `ClassifierConfig.labelActions`.
18+
*
19+
* @module agentos/extensions/packs/ml-classifiers/ClassifierOrchestrator
20+
*/
21+
22+
import type { IContentClassifier } from './IContentClassifier';
23+
import type {
24+
AnnotatedClassificationResult,
25+
ChunkEvaluation,
26+
ClassifierThresholds,
27+
ClassifierConfig,
28+
} from './types';
29+
import { DEFAULT_THRESHOLDS } from './types';
30+
import { GuardrailAction } from '../../../core/guardrails/IGuardrailService';
31+
32+
// ---------------------------------------------------------------------------
33+
// Action severity ranking — used by worst-wins aggregation
34+
// ---------------------------------------------------------------------------
35+
36+
/**
37+
* Numeric severity for each {@link GuardrailAction}, where higher values
38+
* represent more restrictive actions. Used to implement the worst-wins
39+
* comparison without brittle string ordering.
40+
*/
41+
const ACTION_SEVERITY: Record<GuardrailAction, number> = {
42+
[GuardrailAction.ALLOW]: 0,
43+
[GuardrailAction.SANITIZE]: 1,
44+
[GuardrailAction.FLAG]: 2,
45+
[GuardrailAction.BLOCK]: 3,
46+
};
47+
48+
// ---------------------------------------------------------------------------
49+
// ClassifierOrchestrator
50+
// ---------------------------------------------------------------------------
51+
52+
/**
53+
* Drives all registered ML classifiers in parallel and folds their results
54+
* into a single {@link ChunkEvaluation} using worst-wins aggregation.
55+
*
56+
* @example
57+
* ```typescript
58+
* const orchestrator = new ClassifierOrchestrator(
59+
* [toxicityClassifier, injectionClassifier],
60+
* DEFAULT_THRESHOLDS,
61+
* );
62+
*
63+
* const evaluation = await orchestrator.classifyAll('some user message');
64+
* if (evaluation.recommendedAction === GuardrailAction.BLOCK) {
65+
* // Terminate the interaction.
66+
* }
67+
* ```
68+
*/
69+
export class ClassifierOrchestrator {
70+
// -------------------------------------------------------------------------
71+
// Private state
72+
// -------------------------------------------------------------------------
73+
74+
/** Immutable list of classifiers to run on every `classifyAll()` call. */
75+
private readonly classifiers: IContentClassifier[];
76+
77+
/** Merged default thresholds (pack-level defaults + caller overrides). */
78+
private readonly defaultThresholds: ClassifierThresholds;
79+
80+
/**
81+
* Optional per-classifier threshold overrides, keyed by classifier ID.
82+
* When a classifier's ID appears here, the partial thresholds are merged
83+
* on top of {@link defaultThresholds} for that classifier only.
84+
*/
85+
private readonly perClassifierThresholds: Record<string, Partial<ClassifierThresholds>>;
86+
87+
// -------------------------------------------------------------------------
88+
// Constructor
89+
// -------------------------------------------------------------------------
90+
91+
/**
92+
* Create a new orchestrator.
93+
*
94+
* @param classifiers - Array of classifier instances to run in parallel.
95+
* @param defaultThresholds - Pack-level threshold defaults applied to every classifier
96+
* unless overridden by `perClassifierThresholds`.
97+
* @param perClassifierThresholds - Optional map from classifier ID to partial threshold
98+
* overrides. Missing fields fall back to `defaultThresholds`.
99+
*/
100+
constructor(
101+
classifiers: IContentClassifier[],
102+
defaultThresholds: ClassifierThresholds = DEFAULT_THRESHOLDS,
103+
perClassifierThresholds: Record<string, Partial<ClassifierThresholds>> = {},
104+
) {
105+
this.classifiers = classifiers;
106+
this.defaultThresholds = defaultThresholds;
107+
this.perClassifierThresholds = perClassifierThresholds;
108+
}
109+
110+
// -------------------------------------------------------------------------
111+
// Public API
112+
// -------------------------------------------------------------------------
113+
114+
/**
115+
* Classify `text` against every registered classifier in parallel and
116+
* return the aggregated {@link ChunkEvaluation}.
117+
*
118+
* Execution details:
119+
* 1. All classifiers run concurrently via `Promise.allSettled`.
120+
* 2. Fulfilled results are wrapped as {@link AnnotatedClassificationResult}
121+
* with provenance metadata (`classifierId`, `latencyMs`).
122+
* 3. Rejected promises log a warning and contribute an implicit ALLOW so
123+
* a single broken classifier does not block all content.
124+
* 4. Each result is mapped to a {@link GuardrailAction} using
125+
* per-classifier thresholds (if configured) or the pack defaults.
126+
* 5. The final `recommendedAction` is the most restrictive action across
127+
* all classifiers (worst-wins).
128+
*
129+
* @param text - The text to evaluate. Must not be empty.
130+
* @returns A promise resolving to the aggregated evaluation result.
131+
*/
132+
async classifyAll(text: string): Promise<ChunkEvaluation> {
133+
// Record wall-clock start time so `totalLatencyMs` reflects the
134+
// real-world time spent, not the sum of sequential latencies.
135+
const wallStart = performance.now();
136+
137+
// Fire all classifiers in parallel and wait for every one to settle.
138+
const settled = await Promise.allSettled(
139+
this.classifiers.map((c) => this.timedClassify(c, text)),
140+
);
141+
142+
// Accumulate annotated results and track the worst action seen.
143+
const results: AnnotatedClassificationResult[] = [];
144+
let worstAction = GuardrailAction.ALLOW;
145+
let triggeredBy: string | null = null;
146+
147+
for (let i = 0; i < settled.length; i++) {
148+
const outcome = settled[i];
149+
const classifier = this.classifiers[i];
150+
151+
if (outcome.status === 'fulfilled') {
152+
const annotated = outcome.value;
153+
results.push(annotated);
154+
155+
// Resolve the thresholds for this specific classifier.
156+
const thresholds = this.resolveThresholds(classifier.id);
157+
158+
// Map the raw confidence score to a guardrail action.
159+
const action = this.scoreToAction(annotated, thresholds);
160+
161+
// Worst-wins: keep the most restrictive action.
162+
if (ACTION_SEVERITY[action] > ACTION_SEVERITY[worstAction]) {
163+
worstAction = action;
164+
triggeredBy = classifier.id;
165+
}
166+
} else {
167+
// Classifier failed — log and contribute an implicit ALLOW.
168+
console.warn(
169+
`[ClassifierOrchestrator] Classifier "${classifier.id}" failed: ${outcome.reason}`,
170+
);
171+
}
172+
}
173+
174+
const wallEnd = performance.now();
175+
176+
return {
177+
results,
178+
recommendedAction: worstAction,
179+
triggeredBy,
180+
totalLatencyMs: Math.round(wallEnd - wallStart),
181+
};
182+
}
183+
184+
/**
185+
* Dispose every registered classifier, releasing model weights and any
186+
* other resources they hold.
187+
*
188+
* Calls each classifier's `dispose()` method (if present) and swallows
189+
* errors so a single failing classifier does not prevent cleanup of the
190+
* others.
191+
*/
192+
async dispose(): Promise<void> {
193+
await Promise.allSettled(
194+
this.classifiers.map(async (c) => {
195+
if (c.dispose) {
196+
await c.dispose();
197+
}
198+
}),
199+
);
200+
}
201+
202+
// -------------------------------------------------------------------------
203+
// Private helpers
204+
// -------------------------------------------------------------------------
205+
206+
/**
207+
* Invoke a single classifier with wall-clock latency tracking.
208+
*
209+
* Wraps `classifier.classify(text)` and returns the raw
210+
* {@link ClassificationResult} augmented with `classifierId` and
211+
* `latencyMs` fields.
212+
*
213+
* @param classifier - The classifier to invoke.
214+
* @param text - The text to classify.
215+
* @returns An annotated result with provenance metadata.
216+
*/
217+
private async timedClassify(
218+
classifier: IContentClassifier,
219+
text: string,
220+
): Promise<AnnotatedClassificationResult> {
221+
const start = performance.now();
222+
const result = await classifier.classify(text);
223+
const latencyMs = Math.round(performance.now() - start);
224+
225+
return {
226+
...result,
227+
classifierId: classifier.id,
228+
latencyMs,
229+
};
230+
}
231+
232+
/**
233+
* Map a classifier's confidence score to a {@link GuardrailAction}.
234+
*
235+
* The mapping checks `labelActions` first (from per-classifier config in
236+
* thresholds), then falls back to numeric threshold comparison:
237+
*
238+
* 1. `confidence >= blockThreshold` -> BLOCK
239+
* 2. `confidence >= flagThreshold` -> FLAG
240+
* 3. `confidence >= warnThreshold` -> SANITIZE
241+
* 4. otherwise -> ALLOW
242+
*
243+
* @param result - The annotated classification result.
244+
* @param thresholds - Resolved thresholds for this classifier.
245+
* @returns The appropriate guardrail action.
246+
*/
247+
private scoreToAction(
248+
result: AnnotatedClassificationResult,
249+
thresholds: ClassifierThresholds,
250+
): GuardrailAction {
251+
// Extract the confidence as a single number.
252+
// ClassificationResult.confidence may be number | number[]; normalise.
253+
const confidence = Array.isArray(result.confidence)
254+
? result.confidence[0] ?? 0
255+
: result.confidence;
256+
257+
// Threshold comparison — checked in descending severity order.
258+
if (confidence >= thresholds.blockThreshold) {
259+
return GuardrailAction.BLOCK;
260+
}
261+
if (confidence >= thresholds.flagThreshold) {
262+
return GuardrailAction.FLAG;
263+
}
264+
if (confidence >= thresholds.warnThreshold) {
265+
return GuardrailAction.SANITIZE;
266+
}
267+
268+
return GuardrailAction.ALLOW;
269+
}
270+
271+
/**
272+
* Resolve the effective thresholds for a given classifier by merging
273+
* per-classifier overrides on top of the pack-level defaults.
274+
*
275+
* @param classifierId - ID of the classifier to resolve thresholds for.
276+
* @returns Fully-resolved thresholds with no undefined fields.
277+
*/
278+
private resolveThresholds(classifierId: string): ClassifierThresholds {
279+
const overrides = this.perClassifierThresholds[classifierId];
280+
if (!overrides) {
281+
return this.defaultThresholds;
282+
}
283+
284+
return {
285+
blockThreshold: overrides.blockThreshold ?? this.defaultThresholds.blockThreshold,
286+
flagThreshold: overrides.flagThreshold ?? this.defaultThresholds.flagThreshold,
287+
warnThreshold: overrides.warnThreshold ?? this.defaultThresholds.warnThreshold,
288+
};
289+
}
290+
}

0 commit comments

Comments
 (0)