From aed1ab38e70d35afbfd93ce3bde0977fee733aa6 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 24 Jan 2026 10:22:55 +0000 Subject: [PATCH] feat: add CPU limits per agent via cgroups v2 Implement per-agent CPU isolation using Linux cgroups v2 to prevent one agent (e.g., running npm install or build) from starving other agents of CPU resources. Features: - CgroupManager utility in @agent-relay/resiliency package - Auto-detects cgroups v2 availability and gracefully degrades - Per-agent CPU limits with configurable percentage - Automatic cleanup on agent exit Configuration: - cpuLimitPercent option in RelayPtyOrchestratorConfig - AGENT_CPU_LIMIT env var to set default in cloud (default: 100%) - Only enabled when WORKSPACE_ID is set (cloud environment) Files: - packages/resiliency/src/cgroup-manager.ts: New cgroup manager - packages/resiliency/src/index.ts: Export cgroup manager - packages/wrapper/src/relay-pty-orchestrator.ts: Integrate cgroups - packages/bridge/src/spawner.ts: Pass cpuLimitPercent config Example cgroup structure: /sys/fs/cgroup/agent-relay/{agentName}/ cpu.max: "100000 100000" (100% of one core) cgroup.procs: {pid} --- packages/bridge/src/spawner.ts | 6 + packages/resiliency/src/cgroup-manager.ts | 468 ++++++++++++++++++ packages/resiliency/src/index.ts | 8 + .../wrapper/src/relay-pty-orchestrator.ts | 54 ++ 4 files changed, 536 insertions(+) create mode 100644 packages/resiliency/src/cgroup-manager.ts diff --git a/packages/bridge/src/spawner.ts b/packages/bridge/src/spawner.ts index c3f019bc1..69adc09eb 100644 --- a/packages/bridge/src/spawner.ts +++ b/packages/bridge/src/spawner.ts @@ -1007,6 +1007,12 @@ export class AgentSpawner { onRelease: onReleaseHandler, onExit: onExitHandler, headless: true, // Force headless mode for spawned agents to enable task injection via stdin + // In cloud environments (WORKSPACE_ID set), limit CPU per agent to prevent + // one agent (e.g., running npm install) from starving others + // Default: 100% of one core per agent. Set AGENT_CPU_LIMIT to override. + cpuLimitPercent: process.env.WORKSPACE_ID + ? parseInt(process.env.AGENT_CPU_LIMIT || '100', 10) + : undefined, }; const pty = new RelayPtyOrchestrator(ptyConfig); if (debug) log.debug(`Using RelayPtyOrchestrator for ${name}`); diff --git a/packages/resiliency/src/cgroup-manager.ts b/packages/resiliency/src/cgroup-manager.ts new file mode 100644 index 000000000..4ff7ae5e4 --- /dev/null +++ b/packages/resiliency/src/cgroup-manager.ts @@ -0,0 +1,468 @@ +/** + * CgroupManager - Manage CPU limits for agents using Linux cgroups v2 + * + * Provides per-agent CPU isolation to prevent one agent (e.g., running npm install) + * from starving other agents of CPU resources. + * + * Features: + * - Auto-detects cgroups v2 availability + * - Creates per-agent cgroups with CPU limits + * - Gracefully degrades when cgroups unavailable + * - Cleans up cgroups when agents exit + * + * Usage: + * ```typescript + * const manager = getCgroupManager(); + * await manager.createAgentCgroup('worker1', { cpuPercent: 50 }); + * await manager.addProcess('worker1', pid); + * // ... agent runs with CPU limit ... + * await manager.removeAgentCgroup('worker1'); + * ``` + * + * Requirements: + * - Linux with cgroups v2 (unified hierarchy) + * - Write access to cgroup directory (delegated or root) + * - cpu controller enabled in cgroup + */ + +import { existsSync, mkdirSync, writeFileSync, readFileSync, rmSync, readdirSync } from 'node:fs'; +import { join } from 'node:path'; +import { EventEmitter } from 'node:events'; + +/** + * CPU limit configuration for an agent + */ +export interface CpuLimitConfig { + /** CPU percentage limit (1-100 per core, e.g., 200 = 2 full cores). Default: 100 */ + cpuPercent?: number; + /** CPU period in microseconds. Default: 100000 (100ms) */ + cpuPeriodUs?: number; +} + +/** + * Cgroup info for an agent + */ +export interface AgentCgroupInfo { + name: string; + path: string; + pids: number[]; + cpuLimit: CpuLimitConfig; + createdAt: number; +} + +/** + * Events emitted by CgroupManager + */ +export interface CgroupManagerEvents { + 'cgroup-created': (info: { agentName: string; path: string; cpuPercent: number }) => void; + 'cgroup-removed': (info: { agentName: string }) => void; + 'process-added': (info: { agentName: string; pid: number }) => void; + 'error': (error: Error) => void; +} + +/** + * Default cgroup base path for agent-relay + */ +const DEFAULT_CGROUP_BASE = '/sys/fs/cgroup/agent-relay'; + +/** + * Default CPU settings + */ +const DEFAULT_CPU_PERCENT = 100; // 100% of one core +const DEFAULT_CPU_PERIOD_US = 100000; // 100ms period + +/** + * CgroupManager singleton for managing agent CPU limits + */ +export class CgroupManager extends EventEmitter { + private cgroupBase: string; + private available: boolean; + private agentCgroups: Map = new Map(); + private initialized = false; + + constructor(cgroupBase: string = DEFAULT_CGROUP_BASE) { + super(); + this.cgroupBase = cgroupBase; + this.available = false; + } + + /** + * Initialize the cgroup manager and detect availability + */ + async initialize(): Promise { + if (this.initialized) { + return this.available; + } + + this.initialized = true; + this.available = await this.detectCgroupsV2(); + + if (this.available) { + // Ensure base directory exists + try { + await this.ensureBaseCgroup(); + } catch (err: any) { + console.warn(`[cgroup-manager] Failed to create base cgroup: ${err.message}`); + this.available = false; + } + } + + return this.available; + } + + /** + * Check if cgroups v2 is available and we have write access + */ + private async detectCgroupsV2(): Promise { + // Check for cgroups v2 unified hierarchy + const cgroupRoot = '/sys/fs/cgroup'; + + // Check if cgroup2 is mounted (unified hierarchy) + if (!existsSync(join(cgroupRoot, 'cgroup.controllers'))) { + console.info('[cgroup-manager] cgroups v2 not detected (no unified hierarchy)'); + return false; + } + + // Check if cpu controller is available + try { + const controllers = readFileSync(join(cgroupRoot, 'cgroup.controllers'), 'utf-8'); + if (!controllers.includes('cpu')) { + console.info('[cgroup-manager] CPU controller not available in cgroups'); + return false; + } + } catch (err: any) { + console.info(`[cgroup-manager] Cannot read cgroup controllers: ${err.message}`); + return false; + } + + // Check if we can write to cgroup directory + // In production, agent-relay cgroup should be pre-created with proper delegation + try { + const testPath = join(cgroupRoot, 'agent-relay-test-' + process.pid); + mkdirSync(testPath, { recursive: true }); + rmSync(testPath, { recursive: true, force: true }); + console.info('[cgroup-manager] cgroups v2 available with write access'); + return true; + } catch (err: any) { + // Try delegated cgroup path + if (existsSync(this.cgroupBase)) { + console.info('[cgroup-manager] Using delegated cgroup at ' + this.cgroupBase); + return true; + } + console.info(`[cgroup-manager] No write access to cgroups: ${err.message}`); + return false; + } + } + + /** + * Ensure base cgroup directory exists with proper controllers + */ + private async ensureBaseCgroup(): Promise { + if (!existsSync(this.cgroupBase)) { + mkdirSync(this.cgroupBase, { recursive: true }); + } + + // Enable cpu controller in subtree + const subtreeControlPath = join(this.cgroupBase, 'cgroup.subtree_control'); + if (existsSync(subtreeControlPath)) { + try { + writeFileSync(subtreeControlPath, '+cpu'); + } catch { + // Controller might already be enabled or not available + } + } + } + + /** + * Check if cgroups are available + */ + isAvailable(): boolean { + return this.available; + } + + /** + * Create a cgroup for an agent with CPU limits + * + * @param agentName - Unique agent identifier + * @param config - CPU limit configuration + * @returns true if cgroup was created, false if not available + */ + async createAgentCgroup(agentName: string, config: CpuLimitConfig = {}): Promise { + if (!this.initialized) { + await this.initialize(); + } + + if (!this.available) { + return false; + } + + // Validate agent name (no path traversal) + if (agentName.includes('/') || agentName.includes('..')) { + throw new Error(`Invalid agent name: ${agentName}`); + } + + const cgroupPath = join(this.cgroupBase, agentName); + + try { + // Create cgroup directory + if (!existsSync(cgroupPath)) { + mkdirSync(cgroupPath, { recursive: true }); + } + + // Configure CPU limit + const cpuPercent = config.cpuPercent ?? DEFAULT_CPU_PERCENT; + const cpuPeriodUs = config.cpuPeriodUs ?? DEFAULT_CPU_PERIOD_US; + + // cpu.max format: "$MAX $PERIOD" in microseconds + // For 50% of one CPU: "50000 100000" (50ms max per 100ms period) + const cpuMaxUs = Math.floor((cpuPercent / 100) * cpuPeriodUs); + const cpuMaxValue = `${cpuMaxUs} ${cpuPeriodUs}`; + + writeFileSync(join(cgroupPath, 'cpu.max'), cpuMaxValue); + + // Track the cgroup + const info: AgentCgroupInfo = { + name: agentName, + path: cgroupPath, + pids: [], + cpuLimit: { cpuPercent, cpuPeriodUs }, + createdAt: Date.now(), + }; + this.agentCgroups.set(agentName, info); + + this.emit('cgroup-created', { agentName, path: cgroupPath, cpuPercent }); + console.info(`[cgroup-manager] Created cgroup for ${agentName} with ${cpuPercent}% CPU limit`); + + return true; + } catch (err: any) { + const error = new Error(`Failed to create cgroup for ${agentName}: ${err.message}`); + this.emit('error', error); + console.warn(`[cgroup-manager] ${error.message}`); + return false; + } + } + + /** + * Add a process to an agent's cgroup + * + * @param agentName - Agent name + * @param pid - Process ID to add + * @returns true if process was added + */ + async addProcess(agentName: string, pid: number): Promise { + if (!this.available) { + return false; + } + + const info = this.agentCgroups.get(agentName); + if (!info) { + console.warn(`[cgroup-manager] No cgroup found for agent ${agentName}`); + return false; + } + + try { + // Write PID to cgroup.procs + writeFileSync(join(info.path, 'cgroup.procs'), String(pid)); + info.pids.push(pid); + + this.emit('process-added', { agentName, pid }); + console.info(`[cgroup-manager] Added process ${pid} to cgroup ${agentName}`); + + return true; + } catch (err: any) { + console.warn(`[cgroup-manager] Failed to add process ${pid} to cgroup ${agentName}: ${err.message}`); + return false; + } + } + + /** + * Remove an agent's cgroup + * + * @param agentName - Agent name + * @returns true if cgroup was removed + */ + async removeAgentCgroup(agentName: string): Promise { + if (!this.available) { + return false; + } + + const info = this.agentCgroups.get(agentName); + if (!info) { + return false; + } + + try { + // Move processes to parent cgroup first (required before removal) + const parentProcs = join(this.cgroupBase, 'cgroup.procs'); + for (const pid of info.pids) { + try { + writeFileSync(parentProcs, String(pid)); + } catch { + // Process might have exited + } + } + + // Remove the cgroup directory + rmSync(info.path, { recursive: true, force: true }); + this.agentCgroups.delete(agentName); + + this.emit('cgroup-removed', { agentName }); + console.info(`[cgroup-manager] Removed cgroup for ${agentName}`); + + return true; + } catch (err: any) { + console.warn(`[cgroup-manager] Failed to remove cgroup ${agentName}: ${err.message}`); + return false; + } + } + + /** + * Update CPU limit for an existing agent cgroup + * + * @param agentName - Agent name + * @param cpuPercent - New CPU percentage limit + */ + async updateCpuLimit(agentName: string, cpuPercent: number): Promise { + if (!this.available) { + return false; + } + + const info = this.agentCgroups.get(agentName); + if (!info) { + return false; + } + + try { + const cpuPeriodUs = info.cpuLimit.cpuPeriodUs ?? DEFAULT_CPU_PERIOD_US; + const cpuMaxUs = Math.floor((cpuPercent / 100) * cpuPeriodUs); + const cpuMaxValue = `${cpuMaxUs} ${cpuPeriodUs}`; + + writeFileSync(join(info.path, 'cpu.max'), cpuMaxValue); + info.cpuLimit.cpuPercent = cpuPercent; + + console.info(`[cgroup-manager] Updated CPU limit for ${agentName} to ${cpuPercent}%`); + return true; + } catch (err: any) { + console.warn(`[cgroup-manager] Failed to update CPU limit for ${agentName}: ${err.message}`); + return false; + } + } + + /** + * Get current CPU usage for an agent (if available) + * + * @param agentName - Agent name + * @returns CPU usage stats or null + */ + getCpuStats(agentName: string): { usageUsec: number; throttledUsec: number; periods: number } | null { + if (!this.available) { + return null; + } + + const info = this.agentCgroups.get(agentName); + if (!info) { + return null; + } + + try { + const statPath = join(info.path, 'cpu.stat'); + if (!existsSync(statPath)) { + return null; + } + + const stat = readFileSync(statPath, 'utf-8'); + const lines = stat.trim().split('\n'); + const stats: Record = {}; + + for (const line of lines) { + const [key, value] = line.split(' '); + stats[key] = parseInt(value, 10); + } + + return { + usageUsec: stats['usage_usec'] ?? 0, + throttledUsec: stats['throttled_usec'] ?? 0, + periods: stats['nr_periods'] ?? 0, + }; + } catch { + return null; + } + } + + /** + * Get info about all agent cgroups + */ + getAllAgentCgroups(): AgentCgroupInfo[] { + return Array.from(this.agentCgroups.values()); + } + + /** + * Clean up orphaned cgroups (e.g., after crash) + */ + async cleanupOrphanedCgroups(): Promise { + if (!this.available || !existsSync(this.cgroupBase)) { + return 0; + } + + let cleaned = 0; + + try { + const entries = readdirSync(this.cgroupBase, { withFileTypes: true }); + for (const entry of entries) { + if (!entry.isDirectory()) continue; + + // Skip if we're tracking this cgroup + if (this.agentCgroups.has(entry.name)) continue; + + // Try to remove orphaned cgroup + try { + const cgroupPath = join(this.cgroupBase, entry.name); + rmSync(cgroupPath, { recursive: true, force: true }); + cleaned++; + console.info(`[cgroup-manager] Cleaned up orphaned cgroup: ${entry.name}`); + } catch { + // Might still have processes + } + } + } catch { + // Ignore errors during cleanup + } + + return cleaned; + } + + /** + * Shutdown and clean up all cgroups + */ + async shutdown(): Promise { + for (const [agentName] of this.agentCgroups) { + await this.removeAgentCgroup(agentName); + } + } +} + +// Singleton instance +let cgroupManagerInstance: CgroupManager | null = null; + +/** + * Get the singleton CgroupManager instance + */ +export function getCgroupManager(cgroupBase?: string): CgroupManager { + if (!cgroupManagerInstance) { + cgroupManagerInstance = new CgroupManager(cgroupBase); + } + return cgroupManagerInstance; +} + +/** + * Format bytes for display + */ +export function formatCpuTime(usec: number): string { + if (usec < 1000) { + return `${usec}µs`; + } else if (usec < 1000000) { + return `${(usec / 1000).toFixed(2)}ms`; + } else { + return `${(usec / 1000000).toFixed(2)}s`; + } +} diff --git a/packages/resiliency/src/index.ts b/packages/resiliency/src/index.ts index 6665cb001..a00911da6 100644 --- a/packages/resiliency/src/index.ts +++ b/packages/resiliency/src/index.ts @@ -147,3 +147,11 @@ export { type PeerHealth, type GossipHealthConfig, } from './gossip-health.js'; + +export { + CgroupManager, + getCgroupManager, + formatCpuTime, + type CpuLimitConfig, + type AgentCgroupInfo, +} from './cgroup-manager.js'; diff --git a/packages/wrapper/src/relay-pty-orchestrator.ts b/packages/wrapper/src/relay-pty-orchestrator.ts index 99413d341..5a90022df 100644 --- a/packages/wrapper/src/relay-pty-orchestrator.ts +++ b/packages/wrapper/src/relay-pty-orchestrator.ts @@ -46,6 +46,8 @@ import { type AgentMemoryMonitor, type MemoryAlert, formatBytes, + getCgroupManager, + type CgroupManager, } from '@agent-relay/resiliency'; // ============================================================================ @@ -138,6 +140,8 @@ export interface RelayPtyOrchestratorConfig extends BaseWrapperConfig { debug?: boolean; /** Force headless mode (use pipes instead of inheriting TTY) */ headless?: boolean; + /** CPU limit percentage per agent (1-100 per core, e.g., 50 = 50% of one core). Requires cgroups v2. */ + cpuLimitPercent?: number; } /** @@ -227,6 +231,10 @@ export class RelayPtyOrchestrator extends BaseWrapper { private memoryMonitor: AgentMemoryMonitor; private memoryAlertHandler: ((alert: MemoryAlert) => void) | null = null; + // CPU limiting via cgroups (optional, Linux only) + private cgroupManager: CgroupManager; + private hasCgroupSetup = false; + // Note: sessionEndProcessed and lastSummaryRawContent are inherited from BaseWrapper constructor(config: RelayPtyOrchestratorConfig) { @@ -314,6 +322,9 @@ export class RelayPtyOrchestrator extends BaseWrapper { // Initialize memory monitor (shared singleton, 10s polling interval) this.memoryMonitor = getMemoryMonitor({ checkIntervalMs: 10_000 }); + + // Initialize cgroup manager for CPU limiting (shared singleton) + this.cgroupManager = getCgroupManager(); } /** @@ -491,6 +502,12 @@ export class RelayPtyOrchestrator extends BaseWrapper { this.memoryAlertHandler = null; } + // Clean up cgroup if we set one up + if (this.hasCgroupSetup) { + await this.cgroupManager.removeAgentCgroup(this.config.name); + this.hasCgroupSetup = false; + } + this.log(` Stopping...`); // Send shutdown command via socket @@ -673,6 +690,12 @@ export class RelayPtyOrchestrator extends BaseWrapper { this.memoryAlertHandler = null; } + // Clean up cgroup (fire and forget - process already exited) + if (this.hasCgroupSetup) { + this.cgroupManager.removeAgentCgroup(this.config.name).catch(() => {}); + this.hasCgroupSetup = false; + } + // Broadcast crash notification if not a graceful stop if (!this.isGracefulStop && this.client.state === 'READY') { const canBroadcast = typeof (this.client as any).broadcast === 'function'; @@ -734,6 +757,14 @@ export class RelayPtyOrchestrator extends BaseWrapper { this.memoryMonitor.register(this.config.name, proc.pid); this.memoryMonitor.start(); // Idempotent - starts if not already running + // Set up CPU limiting via cgroups (if configured and available) + // This prevents one agent from starving others during npm install/build + if (this.config.cpuLimitPercent && this.config.cpuLimitPercent > 0) { + this.setupCgroupLimit(proc.pid, this.config.cpuLimitPercent).catch((err) => { + this.log(` Failed to set up cgroup CPU limit: ${err.message}`); + }); + } + // Set up alert handler to send resource alerts to dashboard only (not other agents) this.memoryAlertHandler = (alert: MemoryAlert) => { if (alert.agentName !== this.config.name) return; @@ -758,6 +789,29 @@ export class RelayPtyOrchestrator extends BaseWrapper { } } + /** + * Set up cgroup CPU limit for this agent + */ + private async setupCgroupLimit(pid: number, cpuPercent: number): Promise { + await this.cgroupManager.initialize(); + + if (!this.cgroupManager.isAvailable()) { + this.log(` cgroups not available, skipping CPU limit`); + return; + } + + const created = await this.cgroupManager.createAgentCgroup(this.config.name, { cpuPercent }); + if (!created) { + return; + } + + const added = await this.cgroupManager.addProcess(this.config.name, pid); + if (added) { + this.hasCgroupSetup = true; + this.log(` CPU limit set to ${cpuPercent}% for agent ${this.config.name}`); + } + } + /** * Handle output from relay-pty stdout (headless mode only) * In interactive mode, stdout goes directly to terminal via inherited stdio