Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions src/common/utils/ai/cacheStrategy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import { describe, it, expect } from "bun:test";
import type { ModelMessage, Tool } from "ai";
import { tool } from "ai";
import { z } from "zod";
import {
supportsAnthropicCache,
applyCacheControl,
createCachedSystemMessage,
applyCacheControlToTools,
} from "./cacheStrategy";

describe("cacheStrategy", () => {
describe("supportsAnthropicCache", () => {
it("should return true for Anthropic models", () => {
expect(supportsAnthropicCache("anthropic:claude-3-5-sonnet-20241022")).toBe(true);
expect(supportsAnthropicCache("anthropic:claude-3-5-haiku-20241022")).toBe(true);
});

it("should return false for non-Anthropic models", () => {
expect(supportsAnthropicCache("openai:gpt-4")).toBe(false);
expect(supportsAnthropicCache("google:gemini-2.0")).toBe(false);
expect(supportsAnthropicCache("openrouter:meta-llama/llama-3.1")).toBe(false);
});
});

describe("applyCacheControl", () => {
it("should not modify messages for non-Anthropic models", () => {
const messages: ModelMessage[] = [
{ role: "user", content: "Hello" },
{ role: "assistant", content: "Hi there!" },
{ role: "user", content: "How are you?" },
];
const result = applyCacheControl(messages, "openai:gpt-4");
expect(result).toEqual(messages);
});

it("should not modify messages if less than 2 messages", () => {
const messages: ModelMessage[] = [{ role: "user", content: "Hello" }];
const result = applyCacheControl(messages, "anthropic:claude-3-5-sonnet");
expect(result).toEqual(messages);
});

it("should add cache control to second-to-last message for Anthropic models", () => {
const messages: ModelMessage[] = [
{ role: "user", content: "Hello" },
{ role: "assistant", content: "Hi there!" },
{ role: "user", content: "How are you?" },
];
const result = applyCacheControl(messages, "anthropic:claude-3-5-sonnet");

expect(result[0]).toEqual(messages[0]); // First message unchanged
expect(result[1]).toEqual({
// Second message has cache control
...messages[1],
providerOptions: {
anthropic: {
cacheControl: {
type: "ephemeral",
},
},
},
});
expect(result[2]).toEqual(messages[2]); // Last message unchanged
});

it("should work with exactly 2 messages", () => {
const messages: ModelMessage[] = [
{ role: "user", content: "Hello" },
{ role: "assistant", content: "Hi there!" },
];
const result = applyCacheControl(messages, "anthropic:claude-3-5-sonnet");

expect(result[0]).toEqual({
// First message gets cache control
...messages[0],
providerOptions: {
anthropic: {
cacheControl: {
type: "ephemeral",
},
},
},
});
expect(result[1]).toEqual(messages[1]); // Last message unchanged
});
});

describe("createCachedSystemMessage", () => {
describe("integration with streamText parameters", () => {
it("should handle empty system message correctly", () => {
// When system message is converted to cached message, the system parameter
// should be undefined, not empty string, to avoid Anthropic API error
const systemContent = "You are a helpful assistant";
const cachedMessage = createCachedSystemMessage(
systemContent,
"anthropic:claude-3-5-sonnet"
);

expect(cachedMessage).toBeDefined();
expect(cachedMessage?.role).toBe("system");
expect(cachedMessage?.content).toBe(systemContent);

// When using this cached message, system parameter should be set to undefined
// Example: system: cachedMessage ? undefined : originalSystem
});
});

it("should return null for non-Anthropic models", () => {
const result = createCachedSystemMessage("You are a helpful assistant", "openai:gpt-4");
expect(result).toBeNull();
});

it("should return null for empty system content", () => {
const result = createCachedSystemMessage("", "anthropic:claude-3-5-sonnet");
expect(result).toBeNull();
});

it("should create cached system message for Anthropic models", () => {
const systemContent = "You are a helpful assistant";
const result = createCachedSystemMessage(systemContent, "anthropic:claude-3-5-sonnet");

expect(result).toEqual({
role: "system",
content: systemContent,
providerOptions: {
anthropic: {
cacheControl: {
type: "ephemeral",
},
},
},
});
});
});

describe("applyCacheControlToTools", () => {
const mockTools: Record<string, Tool> = {
readFile: tool({
description: "Read a file",
inputSchema: z.object({
path: z.string(),
}),
execute: () => Promise.resolve({ success: true }),
}),
writeFile: tool({
description: "Write a file",
inputSchema: z.object({
path: z.string(),
content: z.string(),
}),
execute: () => Promise.resolve({ success: true }),
}),
};

it("should not modify tools for non-Anthropic models", () => {
const result = applyCacheControlToTools(mockTools, "openai:gpt-4");
expect(result).toEqual(mockTools);
});

it("should return empty object for empty tools", () => {
const result = applyCacheControlToTools({}, "anthropic:claude-3-5-sonnet");
expect(result).toEqual({});
});

it("should add cache control only to the last tool for Anthropic models", () => {
const result = applyCacheControlToTools(mockTools, "anthropic:claude-3-5-sonnet");

// Get the keys to identify first and last tools
const keys = Object.keys(mockTools);
const lastKey = keys[keys.length - 1];

// Check that only the last tool has cache control
for (const [key, tool] of Object.entries(result)) {
if (key === lastKey) {
// Last tool should have cache control
expect(tool).toEqual({
...mockTools[key],
providerOptions: {
anthropic: {
cacheControl: {
type: "ephemeral",
},
},
},
});
} else {
// Other tools should be unchanged
expect(tool).toEqual(mockTools[key]);
}
}

// Verify all tools are present
expect(Object.keys(result)).toEqual(Object.keys(mockTools));
});

it("should not modify original tools object", () => {
const originalTools = { ...mockTools };
applyCacheControlToTools(mockTools, "anthropic:claude-3-5-sonnet");
expect(mockTools).toEqual(originalTools);
});
});
});
91 changes: 86 additions & 5 deletions src/common/utils/ai/cacheStrategy.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import type { ModelMessage } from "ai";
import type { ModelMessage, Tool } from "ai";

/**
* Apply cache control to messages for Anthropic models
* MVP: Single cache breakpoint before the last message
* Check if a model supports Anthropic cache control
*/
export function supportsAnthropicCache(modelString: string): boolean {
return modelString.startsWith("anthropic:");
}

/**
* Apply cache control to messages for Anthropic models.
* Caches all messages except the last user message for optimal cache hits.
*/
export function applyCacheControl(messages: ModelMessage[], modelString: string): ModelMessage[] {
// Only apply cache control for Anthropic models
if (!modelString.startsWith("anthropic:")) {
if (!supportsAnthropicCache(modelString)) {
return messages;
}

Expand All @@ -27,7 +34,6 @@ export function applyCacheControl(messages: ModelMessage[], modelString: string)
anthropic: {
cacheControl: {
type: "ephemeral" as const,
ttl: "5m",
},
},
},
Expand All @@ -36,3 +42,78 @@ export function applyCacheControl(messages: ModelMessage[], modelString: string)
return msg;
});
}

/**
* Create a system message with cache control for Anthropic models.
* System messages rarely change and should always be cached.
*/
export function createCachedSystemMessage(
systemContent: string,
modelString: string
): ModelMessage | null {
if (!systemContent || !supportsAnthropicCache(modelString)) {
return null;
}

return {
role: "system" as const,
content: systemContent,
providerOptions: {
anthropic: {
cacheControl: {
type: "ephemeral" as const,
},
},
},
};
}

/**
* Apply cache control to tool definitions for Anthropic models.
* Tools are static per model and should always be cached.
*
* IMPORTANT: Anthropic has a 4 cache breakpoint limit. We use:
* 1. System message (1 breakpoint)
* 2. Conversation history (1 breakpoint)
* 3. Last tool only (1 breakpoint) - caches all tools up to and including this one
* = 3 total, leaving 1 for future use
*/
export function applyCacheControlToTools<T extends Record<string, Tool>>(
tools: T,
modelString: string
): T {
// Only apply cache control for Anthropic models
if (!supportsAnthropicCache(modelString) || !tools || Object.keys(tools).length === 0) {
return tools;
}

// Get the last tool key (tools are ordered, last one gets cached)
const toolKeys = Object.keys(tools);
const lastToolKey = toolKeys[toolKeys.length - 1];

// Clone tools and add cache control ONLY to the last tool
// Anthropic caches everything up to the cache breakpoint, so marking
// only the last tool will cache all tools
const cachedTools = {} as unknown as T;
for (const [key, tool] of Object.entries(tools)) {
if (key === lastToolKey) {
// Last tool gets cache control
const cachedTool = {
...tool,
providerOptions: {
anthropic: {
cacheControl: {
type: "ephemeral" as const,
},
},
},
};
cachedTools[key as keyof T] = cachedTool as unknown as T[keyof T];
} else {
// Other tools are copied as-is (use unknown for type safety)
cachedTools[key as keyof T] = tool as unknown as T[keyof T];
}
}

return cachedTools;
}
7 changes: 7 additions & 0 deletions src/node/services/streamManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,3 +510,10 @@ describe("StreamManager - previousResponseId recovery", () => {
expect(streamManager.isResponseIdLost("resp_cafebabe")).toBe(true);
});
});

// Note: Anthropic cache control tests are in cacheStrategy.test.ts
// Those tests verify the cache control structure without requiring

// Note: Comprehensive Anthropic cache control tests are in cacheStrategy.test.ts
// Those unit tests cover all cache control functionality without requiring
// complex setup. StreamManager integrates those functions directly.
29 changes: 26 additions & 3 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import type { ToolPolicy } from "@/common/utils/tools/toolPolicy";
import { StreamingTokenTracker } from "@/node/utils/main/StreamingTokenTracker";
import type { Runtime } from "@/node/runtime/Runtime";
import { execBuffered } from "@/node/utils/runtime/helpers";
import {
createCachedSystemMessage,
applyCacheControlToTools,
} from "@/common/utils/ai/cacheStrategy";

// Type definitions for stream parts with extended properties
interface ReasoningDeltaPart {
Expand Down Expand Up @@ -485,15 +489,34 @@ export class StreamManager extends EventEmitter {
}
}

// Apply cache control for Anthropic models
let finalMessages = messages;
let finalTools = tools;
let finalSystem: string | undefined = system;

// For Anthropic models, convert system message to a cached message at the start
const cachedSystemMessage = createCachedSystemMessage(system, modelString);
if (cachedSystemMessage) {
// Prepend cached system message and set system parameter to undefined
// Note: Must be undefined, not empty string, to avoid Anthropic API error
finalMessages = [cachedSystemMessage, ...messages];
finalSystem = undefined;
}

// Apply cache control to tools for Anthropic models
if (tools) {
finalTools = applyCacheControlToTools(tools, modelString);
}

// Start streaming - this can throw immediately if API key is missing
let streamResult;
try {
streamResult = streamText({
model,
messages,
system,
messages: finalMessages,
system: finalSystem,
abortSignal: abortController.signal,
tools,
tools: finalTools,
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-assignment
toolChoice: toolChoice as any, // Force tool use when required by policy
// When toolChoice is set (required tool), limit to 1 step to prevent infinite loops
Expand Down
Loading