Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"dev": "nodemon index.js",
"lint": "eslint src index.js",
"test": "npm run test:unit && npm run test:performance",
"test:unit": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/routing.test.js test/hybrid-routing-integration.test.js test/web-tools.test.js test/passthrough-mode.test.js test/openrouter-error-resilience.test.js test/format-conversion.test.js test/azure-openai-config.test.js test/azure-openai-format-conversion.test.js test/azure-openai-routing.test.js test/azure-openai-streaming.test.js test/azure-openai-error-resilience.test.js test/azure-openai-integration.test.js test/openai-integration.test.js test/toon-compression.test.js test/llamacpp-integration.test.js test/resilience.test.js test/telemetry-routing.test.js test/memory/store.test.js test/memory/surprise.test.js test/memory/extractor.test.js test/memory/search.test.js test/memory/retriever.test.js test/distill.test.js test/large-payload.test.js test/code-mode.test.js test/prompt-cache-injection.test.js test/risk-analyzer.test.js test/interaction-block.test.js test/preflight.test.js test/token-reduction.test.js test/session-affinity.test.js test/model-registry-cost.test.js test/task-decomposition.test.js test/output-format-guard.test.js test/tier-fallback.test.js test/wrap.test.js test/init.test.js",
"test:unit": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/routing.test.js test/hybrid-routing-integration.test.js test/web-tools.test.js test/passthrough-mode.test.js test/openrouter-error-resilience.test.js test/format-conversion.test.js test/azure-openai-config.test.js test/azure-openai-format-conversion.test.js test/azure-openai-routing.test.js test/azure-openai-streaming.test.js test/azure-openai-error-resilience.test.js test/azure-openai-integration.test.js test/openai-integration.test.js test/toon-compression.test.js test/llamacpp-integration.test.js test/resilience.test.js test/telemetry-routing.test.js test/memory/store.test.js test/memory/surprise.test.js test/memory/extractor.test.js test/memory/search.test.js test/memory/retriever.test.js test/distill.test.js test/large-payload.test.js test/code-mode.test.js test/prompt-cache-injection.test.js test/risk-analyzer.test.js test/interaction-block.test.js test/preflight.test.js test/token-reduction.test.js test/session-affinity.test.js test/model-registry-cost.test.js test/task-decomposition.test.js test/output-format-guard.test.js test/tier-fallback.test.js test/wrap.test.js test/init.test.js test/agent-learning.test.js test/tool-call-response-metadata.test.js",
"test:memory": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/memory/store.test.js test/memory/surprise.test.js test/memory/extractor.test.js test/memory/search.test.js test/memory/retriever.test.js",
"test:new-features": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node --test test/passthrough-mode.test.js test/openrouter-error-resilience.test.js test/format-conversion.test.js",
"test:performance": "DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node test/hybrid-routing-performance.test.js && DATABRICKS_API_KEY=test-key DATABRICKS_API_BASE=http://test.com node test/performance-tests.js",
Expand Down
20 changes: 18 additions & 2 deletions src/agents/context-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class ContextManager {
allowedTools: agentDef.allowedTools,
startTime: Date.now(),

// Original task prompt — consumed by the Reflector to infer task type.
taskPrompt,

// In-memory transcript mirror of the JSONL file — consumed by the
// Reflector for tool-usage / error-recovery analysis. The JSONL file
// remains the durable record; this array is the hot-path copy.
transcript: [],

// Token tracking
inputTokens: 0,
outputTokens: 0,
Expand Down Expand Up @@ -102,7 +110,7 @@ class ContextManager {
* Record tool execution in transcript
*/
recordToolCall(context, toolName, input, output, error = null) {
this.writeTranscriptEntry(context.transcriptPath, {
const entry = {
type: "tool_call",
agentId: context.agentId,
step: context.steps,
Expand All @@ -111,7 +119,15 @@ class ContextManager {
output: error ? null : output,
error: error ? error.message : null,
timestamp: Date.now()
});
};

// Keep the in-memory transcript in sync so the Reflector (which reads
// context.transcript) sees tool calls without re-parsing the JSONL file.
if (Array.isArray(context.transcript)) {
context.transcript.push(entry);
}

this.writeTranscriptEntry(context.transcriptPath, entry);
}

/**
Expand Down
90 changes: 90 additions & 0 deletions src/agents/definitions/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ const yaml = require("js-yaml");
const logger = require("../../logger");
const Skillbook = require("../skillbook");

// How often to prune low-quality learned skills. 6 hours is frequent enough to
// keep skillbooks tidy on a long-running process but rare enough to be
// negligible overhead. Hardcoded on purpose — not worth a config knob.
const SKILL_PRUNE_INTERVAL_MS = 6 * 60 * 60 * 1000;

class AgentDefinitionLoader {
constructor() {
this.agents = new Map();
this.skillbooks = new Map(); // agentType → Skillbook
this.initialized = false;
this.pruneTimer = null;

// Initialize synchronously for compatibility
this.loadBuiltInAgentsSync();
Expand Down Expand Up @@ -82,13 +88,97 @@ class AgentDefinitionLoader {
return this.skillbooks.get(agentType);
}

/**
* Replace an agent's skillbook with a freshly-learned instance and re-inject
* its skills into the (in-memory) system prompt. Called by the executor after
* a run persists new skills, so learning takes effect within the running
* process instead of only after a restart. Injection rebuilds from
* originalSystemPrompt, so repeated calls do not duplicate the skills block.
*/
setSkillbook(agentType, skillbook) {
if (!agentType || !skillbook) return;
// Match the case-insensitive key an agent is actually stored under.
const key = this.agents.has(agentType)
? agentType
: Array.from(this.agents.keys()).find(
k => k.toLowerCase() === String(agentType).toLowerCase()
);
if (!key) return;

this.skillbooks.set(key, skillbook);
this._injectSkillsIntoPrompt(key);
}

/**
* Reload skillbooks and update prompts (call after learning)
*/
async reloadSkillbooks() {
await this._loadSkillbooksAsync();
}

/**
* Prune low-quality skills from every loaded skillbook. Skillbook.prune()
* only drops skills that have been tried enough times to prove they don't
* help (default: useCount >= 3 && confidence < 0.2), so fresh skills are
* never removed. Persists and re-injects only the skillbooks that changed.
* @returns {Promise<number>} total skills pruned across all agents
*/
async pruneSkillbooks() {
let totalPruned = 0;

for (const [agentType, skillbook] of this.skillbooks.entries()) {
try {
const pruned = skillbook.prune();
if (pruned > 0) {
await skillbook.save();
this._injectSkillsIntoPrompt(agentType);
totalPruned += pruned;
}
} catch (error) {
logger.warn(
{ agentType, error: error.message },
"Failed to prune skillbook"
);
}
}

if (totalPruned > 0) {
logger.info({ totalPruned }, "Pruned low-quality agent skills");
}
return totalPruned;
}

/**
* Start periodic skillbook pruning. Idempotent; a non-positive interval
* disables pruning. The timer is unref'd so it never keeps the process alive.
*/
startSkillPruning(intervalMs = SKILL_PRUNE_INTERVAL_MS) {
if (this.pruneTimer) return; // already running
if (!Number.isInteger(intervalMs) || intervalMs <= 0) {
logger.debug({ intervalMs }, "Skillbook pruning disabled");
return;
}

this.pruneTimer = setInterval(() => {
this.pruneSkillbooks().catch((error) => {
logger.warn({ error: error.message }, "Skillbook pruning failed");
});
}, intervalMs);
this.pruneTimer.unref();

logger.info({ intervalMs }, "Skillbook pruning started");
}

/**
* Stop periodic skillbook pruning (for shutdown / tests).
*/
stopSkillPruning() {
if (this.pruneTimer) {
clearInterval(this.pruneTimer);
this.pruneTimer = null;
}
}

/**
* Load built-in agents (Explore, Plan, General)
*/
Expand Down
26 changes: 24 additions & 2 deletions src/agents/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ const Reflector = require("./reflector");
const contextManager = new ContextManager();

class SubagentExecutor {
/**
* @param {Object} [options]
* @param {Object} [options.definitionLoader] - Agent definition loader. When
* provided, newly-learned skills are re-injected into agent prompts live
* (within the running process). Optional so the executor still works
* standalone (e.g. in tests); without it, learning still persists to disk
* and is picked up on the next process start.
*/
constructor({ definitionLoader = null } = {}) {
this.definitionLoader = definitionLoader;
}

/**
* Execute a single subagent
* @param {Object} agentDef - Agent definition
Expand Down Expand Up @@ -403,8 +415,12 @@ class SubagentExecutor {
return; // Nothing to learn
}

// Load skillbook for this agent type
const skillbook = await Skillbook.load(context.agentName);
// Prefer the loader's live skillbook instance (so use-counts/confidence
// accumulate in one place); fall back to loading from disk when running
// standalone.
const skillbook =
this.definitionLoader?.getSkillbook(context.agentName) ||
(await Skillbook.load(context.agentName));

// Add each learned pattern
for (const pattern of patterns) {
Expand All @@ -414,6 +430,12 @@ class SubagentExecutor {
// Save skillbook (persists learning)
await skillbook.save();

// Re-inject into the agent's prompt so the next run benefits without a
// restart. No-op when no loader was provided.
if (this.definitionLoader) {
this.definitionLoader.setSkillbook(context.agentName, skillbook);
}

logger.info({
agentType: context.agentName,
patternsLearned: patterns.length,
Expand Down
10 changes: 9 additions & 1 deletion src/agents/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@ const ParallelCoordinator = require("./parallel-coordinator");
const agentStore = require("./store");

const definitionLoader = new AgentDefinitionLoader();
const coordinator = new ParallelCoordinator(config.agents?.maxConcurrent || 10);
const coordinator = new ParallelCoordinator(config.agents?.maxConcurrent || 10, {
definitionLoader,
});

// Periodically drop learned skills that have proven unhelpful. Only when the
// agents subsystem is active; the timer is unref'd so it never blocks exit.
if (config.agents?.enabled) {
definitionLoader.startSkillPruning();
}

/**
* Spawn and execute subagent(s)
Expand Down
4 changes: 2 additions & 2 deletions src/agents/parallel-coordinator.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ const logger = require("../logger");
const SubagentExecutor = require("./executor");

class ParallelCoordinator {
constructor(maxConcurrent = 10) {
constructor(maxConcurrent = 10, { definitionLoader = null } = {}) {
this.maxConcurrent = maxConcurrent;
this.executor = new SubagentExecutor();
this.executor = new SubagentExecutor({ definitionLoader });
}

/**
Expand Down
12 changes: 11 additions & 1 deletion src/agents/reflector.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ class Reflector {
// Pattern 1: Recovered from errors
if (successful) {
const failedTools = errorEntries.map(e => e.toolName);
// Tools invoked *after* the last error entry are the recovery path.
// Use the entry's position in the transcript, not its timestamp
// (slicing by a ~1.7e12 timestamp always yielded []).
const lastErrorEntry = errorEntries[errorEntries.length - 1];
const lastErrorIndex = transcript.indexOf(lastErrorEntry);
const recoveryTools = transcript
.slice(errorEntries[errorEntries.length - 1].timestamp)
.slice(lastErrorIndex + 1)
.filter(e => e.type === "tool_call" && !e.error)
.map(e => e.toolName);

Expand Down Expand Up @@ -248,6 +253,11 @@ class Reflector {
* Infer task type from prompt
*/
static _inferTaskType(prompt) {
// Guard against a missing/non-string prompt so reflection never throws
// (a throw here previously aborted all learning silently).
if (typeof prompt !== "string" || prompt.length === 0) {
return null;
}
const lower = prompt.toLowerCase();

const taskTypes = [
Expand Down
27 changes: 24 additions & 3 deletions src/orchestrator/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,13 @@ function toAnthropicResponse(openai, requestedModel, wantsThinking) {
id: openai.id ?? `msg_${Date.now()}`,
type: "message",
role: "assistant",
model: requestedModel,
// Prefer the model the provider actually served with; fall back to the
// requested model only when the provider omits it. Mirrors the direct
// (non-tool) path at `databricksResponse.json.model || requestedModel`, so
// tool-call responses no longer report a stale/aliased client-request model.
model: (typeof openai?.model === "string" && openai.model.trim())
? openai.model
: requestedModel,
content: contentItems,
stop_reason:
choice?.finish_reason === "stop"
Expand All @@ -1123,8 +1129,11 @@ function toAnthropicResponse(openai, requestedModel, wantsThinking) {
: choice?.finish_reason ?? "end_turn",
stop_sequence: null,
usage: {
input_tokens: usage.prompt_tokens ?? 0,
output_tokens: usage.completion_tokens ?? 0,
// Accept both OpenAI (prompt_tokens/completion_tokens) and
// already-Anthropic (input_tokens/output_tokens) usage shapes so token
// counts survive regardless of which provider/converter produced them.
input_tokens: usage.prompt_tokens ?? usage.input_tokens ?? 0,
output_tokens: usage.completion_tokens ?? usage.output_tokens ?? 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
},
Expand Down Expand Up @@ -4053,6 +4062,16 @@ async function processMessage({ payload, headers, session, cwd, options = {} })
anthropicPayload.usage.cache_read_input_tokens = promptTokens;
anthropicPayload.usage.cache_creation_input_tokens = 0;

// Carry routing metadata on the cache-hit path too, so downstream model
// name resolution (OpenClaw) behaves the same as the live loop path.
if (cachedResponse.routingDecision) {
anthropicPayload._routingMeta = {
provider: cachedResponse.routingDecision.provider,
model: cachedResponse.routingDecision.model,
tier: cachedResponse.routingDecision.tier,
};
}

appendTurnToSession(session, {
role: "assistant",
type: "message",
Expand Down Expand Up @@ -4147,4 +4166,6 @@ async function processMessage({ payload, headers, session, cwd, options = {} })

module.exports = {
processMessage,
// Exported for unit testing of response-metadata conversion.
toAnthropicResponse,
};
Loading
Loading