Skip to content
87 changes: 87 additions & 0 deletions src/agents/pi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,50 @@ export const createPiAgentAdapter: AgentAdapterFactory = (config) => {
return enqueue(threadId, () => doPrompt(threadId, formatMessage(message)));
},

async promptWithModel(threadId: string, message: AgentMessage, modelId: string): Promise<AgentResponse> {
return enqueue(threadId, async () => {
const entry = await getOrCreate(threadId);
const currentModel = entry.session.model;

// Resolve the target model (format: "provider/model-id")
let targetModel;
const [provider, ...rest] = modelId.split("/");
const id = rest.join("/");
if (provider && id) {
targetModel = modelRegistry.find(provider, id);
}

if (!targetModel) {
console.warn(`[pi-agent] flush model "${modelId}" not found, using default`);
return doPrompt(threadId, formatMessage(message));
}

// Verify auth is available for the target model
if (!modelRegistry.hasConfiguredAuth(targetModel)) {
console.warn(`[pi-agent] no auth for flush model "${modelId}", using default`);
return doPrompt(threadId, formatMessage(message));
}

// Swap model in-memory only (no persistence to settings.json or session log).
// This avoids a crash-window where settings could be left on the flush model.
const agentState = (entry.session as any).agent?.state;
if (!agentState) {
console.warn(`[pi-agent] cannot access agent state for model swap, using default`);
return doPrompt(threadId, formatMessage(message));
}

agentState.model = targetModel;
console.log(`[pi-agent] switched to flush model (in-memory): ${modelId}`);

try {
return await doPrompt(threadId, formatMessage(message));
} finally {
// Restore original model (in-memory only) — even if undefined
agentState.model = currentModel;
}
});
},

promptStream(threadId: string, message: AgentMessage): AsyncIterable<AgentStreamEvent> {
const text = formatMessage(message);
// Return an async iterable that is single-use by design.
Expand Down Expand Up @@ -465,6 +509,49 @@ export const createPiAgentAdapter: AgentAdapterFactory = (config) => {
});
},

async compactWithModel(threadId: string, modelId: string): Promise<{ tokensBefore: number; tokensAfter: number | null } | null> {
return enqueue(threadId, async () => {
const entry = sessions.get(threadId);
if (!entry) return null;

const agentState = (entry.session as any).agent?.state;
let currentModel: any;
let modelSwapped = false;

// Resolve and swap model for compact
if (!agentState) {
console.warn(`[pi-agent] cannot access agent state for compact model swap, using default`);
} else {
const [provider, ...rest] = modelId.split("/");
const id = rest.join("/");
const targetModel = (provider && id) ? modelRegistry.find(provider, id) : null;
if (!targetModel) {
console.warn(`[pi-agent] compact model "${modelId}" not found, using default`);
} else if (!modelRegistry.hasConfiguredAuth(targetModel)) {
console.warn(`[pi-agent] no auth for compact model "${modelId}", using default`);
} else {
currentModel = agentState.model;
agentState.model = targetModel;
modelSwapped = true;
console.log(`[pi-agent] compact using model (in-memory): ${modelId}`);
}
}

try {
const result = await entry.session.compact();
const usage = entry.session.getContextUsage();
return {
tokensBefore: result.tokensBefore,
tokensAfter: usage?.tokens ?? null,
};
} finally {
if (modelSwapped) {
agentState.model = currentModel;
}
}
});
},

async abort(threadId: string): Promise<void> {
const entry = sessions.get(threadId);
if (entry) {
Expand Down
51 changes: 36 additions & 15 deletions src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import { formatSchedule, formatRunCounts, jobEnabledIcon } from "./cron/format";
import { BOT_COMMANDS } from "./commands";
import { prepareMemoryForTurn, finalizeMemoryForTurn, flushMemoryThenCompact, determineMemoryMode } from "./memory/lifecycle";
import { maxPressure } from "./memory/policy";
import type { PressureLevel } from "./memory/types";
import type { PressureLevel, CompactResult } from "./memory/types";
import { READ_ONLY_TOOLS } from "./memory/types";
import { readPendingPairing, completePendingPairing, isStartForNonce } from "./pairing";
import { createProgressMessage } from "./telegram-progress";

/** Match a Telegram command, handling optional @botname suffix */
/** Bot username for command suffix validation (set during gateway init) */
Expand Down Expand Up @@ -493,7 +495,7 @@ export class Gateway {
threadLocks.set(agentThreadId, lockPromise);
if (prevLock) await prevLock;

await thread.post("📝 Saving memory and compacting...");
const progress = await createProgressMessage(thread, "📝 Saving memory and compacting...");
const stopTyping = startTypingLoop(thread);
try {
const agentCwd = (agent.getInfo?.()?.cwd as string) ?? process.cwd();
Expand All @@ -502,23 +504,28 @@ export class Gateway {
if (this.config.memory?.enabled === false) {
const result = await agent.compact(agentThreadId);
if (!result) {
await thread.post("⚠️ No active session to compact. Send a message first.");
await progress.update("⚠️ No active session to compact. Send a message first.");
} else {
const beforeK = (result.tokensBefore / 1000).toFixed(1);
await thread.post(`✅ Compaction complete\n\nCompacted ${beforeK}K tokens down to a summary.\nContext usage will update after your next message.`);
await progress.update(`✅ Compaction complete\n\nCompacted ${beforeK}K tokens down to a summary.\nContext usage will update after your next message.`);
}
} else {
const result = await flushMemoryThenCompact(agentThreadId, agent, memoryRoot, "manual", this.config.memory);
const result = await flushMemoryThenCompact(
agentThreadId, agent, memoryRoot, "manual", this.config.memory,
(step) => progress.update(step),
);
if (!result) {
await thread.post("⚠️ No active session to compact. Send a message first.");
await progress.update("⚠️ No active session to compact. Send a message first.");
} else {
const beforeK = (result.tokensBefore / 1000).toFixed(1);
await thread.post(`✅ Memory saved & compacted\n\nCompacted ${beforeK}K tokens down to a summary.\nContext usage will update after your next message.`);
const timing = result.timing;
const timingLine = timing ? `\nTiming: flush ${(timing.flushMs / 1000).toFixed(1)}s, compact ${(timing.compactMs / 1000).toFixed(1)}s, total ${(timing.totalMs / 1000).toFixed(1)}s\nModel: ${timing.model}` : "";
await progress.update(`✅ Memory saved & compacted\n\nCompacted ${beforeK}K tokens down to a summary.\nContext usage will update after your next message.${timingLine}`);
}
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await thread.post(`⚠️ Compaction failed: ${msg.slice(0, 200)}`);
await progress.update(`⚠️ Compaction failed: ${msg.slice(0, 200)}`);
} finally {
stopTyping();
releaseLock!();
Expand Down Expand Up @@ -687,27 +694,31 @@ export class Gateway {
const stopTyping = startTypingLoop(thread);

try {
let turnUsedTools = false;
if (agent.promptStream) {
const ac = new AbortController();
abortControllers.set(agentThreadId, ac);
try {
await this.handleStreaming(thread, agent.promptStream(agentThreadId, agentMessage), verboseThreads.has(agentThreadId), ac.signal);
const streamResult = await this.handleStreaming(thread, agent.promptStream(agentThreadId, agentMessage), verboseThreads.has(agentThreadId), ac.signal);
turnUsedTools = streamResult.usedTools;
} finally {
abortControllers.delete(agentThreadId);
}
} else {
// Fallback: non-streaming prompt
// Fallback: non-streaming prompt (assume tools may have been used)
const reply = await agent.prompt(agentThreadId, agentMessage);
turnUsedTools = true;
if (reply.text) {
await this.postWithFallback(thread, reply.text);
}
}

// ── Memory: post-turn finalize + pressure check ───
try {
if (memoryPrepared) memoryPrepared.turnUsedTools = turnUsedTools;
const pressure = await finalizeMemoryForTurn(
agentThreadId,
memoryPrepared?.beforeDigest ?? null,
memoryPrepared ?? { message: agentMessage, beforeDigest: null, injected: false },
agent, memoryRoot, this.config.memory,
);
// Use higher severity between pending compact and current pressure
Expand Down Expand Up @@ -907,11 +918,17 @@ export class Gateway {

// Hard or emergency: flush + compact
try {
await thread.post(`📝 ${pressure === "emergency" ? "⚠️ Context nearly full! " : ""}Saving memory and compacting...`);
const result = await flushMemoryThenCompact(agentThreadId, agent, memoryRoot, pressure, this.config.memory);
const prefix = pressure === "emergency" ? "⚠️ Context nearly full! " : "";
const progress = await createProgressMessage(thread, `📝 ${prefix}Saving memory and compacting...`);
const result = await flushMemoryThenCompact(
agentThreadId, agent, memoryRoot, pressure, this.config.memory,
(step) => progress.update(step),
);
if (result) {
const beforeK = (result.tokensBefore / 1000).toFixed(1);
await thread.post(`✅ Auto-compacted: ${beforeK}K tokens → summary.`);
const timing = result.timing;
const timingLine = timing ? ` (${(timing.totalMs / 1000).toFixed(1)}s: flush ${(timing.flushMs / 1000).toFixed(1)}s + compact ${(timing.compactMs / 1000).toFixed(1)}s)` : "";
await progress.update(`✅ Auto-compacted: ${beforeK}K tokens → summary.${timingLine}`);
}
} catch (err) {
console.error(`[roundhouse] ${pressure} compact error:`, (err as Error).message);
Expand All @@ -927,8 +944,9 @@ export class Gateway {
* - Tool starts/ends are sent as compact status messages.
* - Turn boundaries trigger a new message for the next turn's text.
*/
private async handleStreaming(thread: any, stream: AsyncIterable<AgentStreamEvent>, verbose: boolean, signal?: AbortSignal) {
private async handleStreaming(thread: any, stream: AsyncIterable<AgentStreamEvent>, verbose: boolean, signal?: AbortSignal): Promise<{ usedTools: boolean }> {
let activeTools = new Map<string, string>(); // toolCallId -> toolName
let usedFileModifyingTools = false;

// Per-turn streaming state — each turn gets a fresh iterable + promise
let currentPush: ((text: string) => void) | null = null;
Expand Down Expand Up @@ -1032,6 +1050,7 @@ export class Gateway {

case "tool_start": {
activeTools.set(event.toolCallId, event.toolName);
if (!READ_ONLY_TOOLS.has(event.toolName)) usedFileModifyingTools = true;
if (verbose) {
try {
await thread.post(`${toolIcon(event.toolName)} Running \`${event.toolName}\`…`);
Expand Down Expand Up @@ -1102,6 +1121,8 @@ export class Gateway {
if (currentPromise) {
await flushCurrentStream();
}

return { usedTools: usedFileModifyingTools };
}

/** Post text with markdown, falling back to plain text */
Expand Down
Loading