diff --git a/.changeset/fix-code-context-isolation.md b/.changeset/fix-code-context-isolation.md new file mode 100644 index 00000000..bd212d99 --- /dev/null +++ b/.changeset/fix-code-context-isolation.md @@ -0,0 +1,5 @@ +--- +'@cloudflare/sandbox': patch +--- + +Fix code context isolation bug where contexts leaked state after 10 executions. Each code context now gets a dedicated executor process from creation to deletion, ensuring complete isolation between contexts. Removed maximum pool size limits to allow organic scaling. diff --git a/package-lock.json b/package-lock.json index 4b4a715e..7d333084 100644 --- a/package-lock.json +++ b/package-lock.json @@ -401,7 +401,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -423,7 +422,6 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-6.4.1.tgz", "integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==", "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.4", @@ -563,7 +561,6 @@ "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.28.5.tgz", "integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==", "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.5", @@ -1509,8 +1506,7 @@ "resolved": "https://registry.npmjs.org/@cloudflare/workers-types/-/workers-types-4.20251126.0.tgz", "integrity": "sha512-DSeI1Q7JYmh5/D/tw5eZCjrKY34v69rwj63hHt60nSQW5QLwWCbj/lLtNz9f2EPa+JCACwpLXHgCXfzJ29x66w==", "devOptional": true, - "license": "MIT OR Apache-2.0", - "peer": true + "license": "MIT OR Apache-2.0" }, "node_modules/@cspotcode/source-map-support": { "version": "0.8.1", @@ -2824,7 +2820,6 @@ "integrity": "sha512-/g2d4sW9nUDJOMz3mabVQvOGhVa4e/BN/Um7yca9Bb2XTzPPnfTWHWQg+IsEYO7M3Vx+EXvaM/I2pJWIMun1bg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@octokit/auth-token": "^4.0.0", "@octokit/graphql": "^7.1.0", @@ -4294,7 +4289,6 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-24.10.1.tgz", "integrity": "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ==", "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -4310,7 +4304,6 @@ "resolved": "https://registry.npmjs.org/@types/react/-/react-19.2.7.tgz", "integrity": "sha512-MWtvHrGZLFttgeEj28VXHxpmwYbor/ATPYbBfSFZEIRK0ecCFLl2Qo55z52Hss+UV9CRN7trSeq1zbgx7YDWWg==", "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -4320,7 +4313,6 @@ "resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-19.2.3.tgz", "integrity": "sha512-jp2L/eY6fn+KgVVQAOqYItbF0VY/YApe5Mz2F0aykSO8gx31bYCZyvSeYxCHKvzHG5eZjc+zyaS5BrBWya2+kQ==", "license": "MIT", - "peer": true, "peerDependencies": { "@types/react": "^19.2.0" } @@ -4447,7 +4439,6 @@ "integrity": "sha512-oukfKT9Mk41LreEW09vt45f8wx7DordoWUZMYdY/cyAk7w5TWkTRCNZYF7sX7n2wB7jyGAl74OxgwhPgKaqDMQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/utils": "3.2.4", "pathe": "^2.0.3", @@ -4463,7 +4454,6 @@ "integrity": "sha512-dEYtS7qQP2CjU27QBC5oUOxLE/v5eLkGqPE0ZKEIDGMs4vKWe7IjgLOeauHsR0D5YuuycGRO5oSRXnwnmA78fQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/pretty-format": "3.2.4", "magic-string": "^0.30.17", @@ -4492,7 +4482,6 @@ "integrity": "sha512-hGISOaP18plkzbWEcP/QvtRW1xDXF2+96HbEX6byqQhAUbiS5oH6/9JwW+QsQCIYON2bI6QZBF+2PvOmrRZ9wA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/utils": "3.2.4", "fflate": "^0.8.2", @@ -4640,7 +4629,6 @@ "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.17.1.tgz", "integrity": "sha512-B/gBuNg5SiMTrPkC+A2+cW0RszwxYmn6VYxB/inlBStS5nx6xHIt/ehKRhIMhqusl7a8LjQoZnjCs5vhwxOQ1g==", "license": "MIT", - "peer": true, "dependencies": { "fast-deep-equal": "^3.1.3", "fast-uri": "^3.0.1", @@ -5386,7 +5374,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -5513,6 +5500,15 @@ } } }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "license": "MIT", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/axobject-query": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/axobject-query/-/axobject-query-4.1.0.tgz", @@ -5671,7 +5667,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.8.25", "caniuse-lite": "^1.0.30001754", @@ -7606,7 +7601,6 @@ "integrity": "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==", "devOptional": true, "license": "MIT", - "peer": true, "bin": { "jiti": "lib/jiti-cli.mjs" } @@ -7871,7 +7865,6 @@ "integrity": "sha512-utfs7Pr5uJyyvDETitgsaqSyjCb2qNRAtuqUeWIAKztsOYdcACf2KtARYXg2pSvhkt+9NfoaNY7fxjl6nuMjIQ==", "devOptional": true, "license": "MPL-2.0", - "peer": true, "dependencies": { "detect-libc": "^2.0.3" }, @@ -7908,6 +7901,7 @@ "os": [ "android" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -7928,6 +7922,7 @@ "os": [ "darwin" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -7948,6 +7943,7 @@ "os": [ "darwin" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -7968,6 +7964,7 @@ "os": [ "freebsd" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -7988,6 +7985,7 @@ "os": [ "linux" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -8008,6 +8006,7 @@ "os": [ "linux" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -8028,6 +8027,7 @@ "os": [ "linux" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -8048,6 +8048,7 @@ "os": [ "linux" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -8068,6 +8069,7 @@ "os": [ "linux" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -8088,6 +8090,7 @@ "os": [ "win32" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -8108,6 +8111,7 @@ "os": [ "win32" ], + "peer": true, "engines": { "node": ">= 12.0.0" }, @@ -8157,6 +8161,7 @@ "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.4.0.tgz", "integrity": "sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==", "license": "MIT", + "peer": true, "dependencies": { "js-tokens": "^3.0.0 || ^4.0.0" }, @@ -9725,7 +9730,6 @@ "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.6.2.tgz", "integrity": "sha512-I7AIg5boAr5R0FFtJ6rCfD+LFsWHp81dolrFD8S79U9tb8Az2nGrJncnMSnys+bpQJfRUzqs9hnA81OAA3hCuQ==", "license": "MIT", - "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -9942,7 +9946,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.2.0.tgz", "integrity": "sha512-tmbWg6W31tQLeB5cdIBOicJDJRR2KzXsV7uSK9iNfLWQ5bIZfxuPEHp7M8wiHyHnn0DD1i7w3Zmin0FtkrwoCQ==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -9952,7 +9955,6 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.2.0.tgz", "integrity": "sha512-UlbRu4cAiGaIewkPyiRGJk0imDN2T3JjieT6spoL2UeSf5od4n5LB/mQ4ejmxhCFT1tYe8IvaFulzynWovsEFQ==", "license": "MIT", - "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -9964,7 +9966,8 @@ "version": "16.13.1", "resolved": "https://registry.npmjs.org/react-is/-/react-is-16.13.1.tgz", "integrity": "sha512-24e6ynE2H+OKt4kqsOvNd8kBpV65zoxbA4BVsEOB3ARVWQki/DHzaUoC5KuON/BiccDaCCTZBuOcfZs70kR8bQ==", - "license": "MIT" + "license": "MIT", + "peer": true }, "node_modules/react-katex": { "version": "3.1.0", @@ -10338,7 +10341,6 @@ "integrity": "sha512-ZRLgPlS91l4JztLYEZnmMcd3Umcla1hkXJgiEiR4HloRJBBoeaX8qogTu5Jfu36rRMVLndzqYv0h+M5gJAkUfg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@oxc-project/types": "=0.98.0", "@rolldown/pluginutils": "1.0.0-beta.51" @@ -11154,7 +11156,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -11354,7 +11355,6 @@ "integrity": "sha512-ytQKuwgmrrkDTFP4LjR0ToE2nqgy886GpvRSpU0JAnrdBYppuY5rLkRUYPU1yCryb24SsKBTL/hlDQAEFVwtZg==", "devOptional": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "~0.25.0", "get-tsconfig": "^4.7.5" @@ -11514,7 +11514,6 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -11584,7 +11583,6 @@ "resolved": "https://registry.npmjs.org/unenv/-/unenv-2.0.0-rc.24.tgz", "integrity": "sha512-i7qRCmY42zmCwnYlh9H2SvLEypEFGye5iRmEMKjcGi7zk9UquigRjFtTLz0TYqr0ZGLZhaMHl/foy1bZR+Cwlw==", "license": "MIT", - "peer": true, "dependencies": { "pathe": "^2.0.3" } @@ -12026,7 +12024,6 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-7.2.4.tgz", "integrity": "sha512-NL8jTlbo0Tn4dUEXEsUg8KeyG/Lkmc4Fnzb8JXN/Ykm9G4HNImjtABMJgkQoVjOBN/j2WAwDTRytdqJbZsah7w==", "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", @@ -12141,7 +12138,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -12174,7 +12170,6 @@ "integrity": "sha512-LUCP5ev3GURDysTWiP47wRRUpLKMOfPh+yKTx3kVIEiu5KOMeqzpnYNsKyOoVrULivR8tLcks4+lga33Whn90A==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/chai": "^5.2.2", "@vitest/expect": "3.2.4", @@ -12600,7 +12595,6 @@ "integrity": "sha512-Om5ns0Lyx/LKtYI04IV0bjIrkBgoFNg0p6urzr2asekJlfP18RqFzyqMFZKf0i9Gnjtz/JfAS/Ol6tjCe5JJsQ==", "hasInstallScript": true, "license": "Apache-2.0", - "peer": true, "bin": { "workerd": "bin/workerd" }, @@ -13364,7 +13358,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } @@ -13435,6 +13428,7 @@ "version": "0.0.2", "dependencies": { "@repo/shared": "*", + "async-mutex": "^0.5.0", "esbuild": "^0.27.0", "zod": "^3.22.3" }, diff --git a/packages/sandbox-container/package.json b/packages/sandbox-container/package.json index 6bf42f95..e3ebef48 100644 --- a/packages/sandbox-container/package.json +++ b/packages/sandbox-container/package.json @@ -13,6 +13,7 @@ }, "dependencies": { "@repo/shared": "*", + "async-mutex": "^0.5.0", "esbuild": "^0.27.0", "zod": "^3.22.3" }, diff --git a/packages/sandbox-container/src/handlers/interpreter-handler.ts b/packages/sandbox-container/src/handlers/interpreter-handler.ts index bc4ff395..cb9d4151 100644 --- a/packages/sandbox-container/src/handlers/interpreter-handler.ts +++ b/packages/sandbox-container/src/handlers/interpreter-handler.ts @@ -9,8 +9,10 @@ import type { import { ErrorCode } from '@repo/shared/errors'; import type { RequestContext } from '../core/types'; -import type { CreateContextRequest } from '../interpreter-service'; -import type { InterpreterService } from '../services/interpreter-service'; +import type { + CreateContextRequest, + InterpreterService +} from '../services/interpreter-service'; import { BaseHandler } from './base-handler'; export class InterpreterHandler extends BaseHandler { diff --git a/packages/sandbox-container/src/interpreter-service.ts b/packages/sandbox-container/src/interpreter-service.ts deleted file mode 100644 index 3d36a1ec..00000000 --- a/packages/sandbox-container/src/interpreter-service.ts +++ /dev/null @@ -1,299 +0,0 @@ -import { randomUUID } from 'node:crypto'; -import type { Logger } from '@repo/shared'; -import { - type InterpreterLanguage, - processPool, - type RichOutput -} from './runtime/process-pool'; - -export interface CreateContextRequest { - language?: string; - cwd?: string; -} - -export interface Context { - id: string; - language: string; - cwd: string; - createdAt: string; - lastUsed: string; -} - -export interface HealthStatus { - ready: boolean; - initializing: boolean; - progress: number; -} - -export class InterpreterNotReadyError extends Error { - progress: number; - retryAfter: number; - - constructor(message: string, progress: number = 100, retryAfter: number = 1) { - super(message); - this.progress = progress; - this.retryAfter = retryAfter; - this.name = 'InterpreterNotReadyError'; - } -} - -export class InterpreterService { - private contexts: Map = new Map(); - private logger: Logger; - - constructor(logger: Logger) { - this.logger = logger; - } - - async getHealthStatus(): Promise { - return { - ready: true, - initializing: false, - progress: 100 - }; - } - - async createContext(request: CreateContextRequest): Promise { - const id = randomUUID(); - const language = this.mapLanguage(request.language || 'python'); - - const context: Context = { - id, - language, - cwd: request.cwd || '/workspace', - createdAt: new Date().toISOString(), - lastUsed: new Date().toISOString() - }; - - this.contexts.set(id, context); - return context; - } - - async listContexts(): Promise { - return Array.from(this.contexts.values()); - } - - async deleteContext(contextId: string): Promise { - if (!this.contexts.has(contextId)) { - throw new Error(`Context ${contextId} not found`); - } - - this.contexts.delete(contextId); - } - - async executeCode( - contextId: string, - code: string, - language?: string, - timeoutMs?: number - ): Promise { - const context = this.contexts.get(contextId); - if (!context) { - return new Response( - JSON.stringify({ - error: `Context ${contextId} not found` - }), - { - status: 404, - headers: { 'Content-Type': 'application/json' } - } - ); - } - - context.lastUsed = new Date().toISOString(); - - const execLanguage = this.mapLanguage(language || context.language); - - // Store reference to this for use in async function - const self = this; - - const stream = new ReadableStream({ - async start(controller) { - const encoder = new TextEncoder(); - - try { - // Pass through user-provided timeout (undefined = unlimited) - const result = await processPool.execute( - execLanguage, - code, - contextId, - timeoutMs - ); - - if (result.stdout) { - controller.enqueue( - encoder.encode( - self.formatSSE({ - type: 'stdout', - text: result.stdout - }) - ) - ); - } - - if (result.stderr) { - controller.enqueue( - encoder.encode( - self.formatSSE({ - type: 'stderr', - text: result.stderr - }) - ) - ); - } - - if (result.outputs && result.outputs.length > 0) { - for (const output of result.outputs) { - const outputData = self.formatOutputData(output); - controller.enqueue( - encoder.encode( - self.formatSSE({ - type: 'result', - ...outputData, - metadata: output.metadata || {} - }) - ) - ); - } - } - - if (result.success) { - controller.enqueue( - encoder.encode( - self.formatSSE({ - type: 'execution_complete', - execution_count: 1 - }) - ) - ); - } else if (result.error) { - controller.enqueue( - encoder.encode( - self.formatSSE({ - type: 'error', - ename: result.error.type || 'ExecutionError', - evalue: result.error.message || 'Code execution failed', - traceback: result.error.traceback - ? result.error.traceback.split('\n') - : [] - }) - ) - ); - } else { - controller.enqueue( - encoder.encode( - self.formatSSE({ - type: 'error', - ename: 'ExecutionError', - evalue: result.stderr || 'Code execution failed', - traceback: [] - }) - ) - ); - } - - controller.close(); - } catch (error) { - self.logger.error('Code execution failed', error as Error, { - contextId, - language: execLanguage - }); - - controller.enqueue( - encoder.encode( - self.formatSSE({ - type: 'error', - ename: 'InternalError', - evalue: error instanceof Error ? error.message : String(error), - traceback: [] - }) - ) - ); - - controller.close(); - } - } - }); - - return new Response(stream, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive' - } - }); - } - - private mapLanguage(language: string): InterpreterLanguage { - const normalized = language.toLowerCase(); - - switch (normalized) { - case 'python': - case 'python3': - return 'python'; - case 'javascript': - case 'js': - case 'node': - return 'javascript'; - case 'typescript': - case 'ts': - return 'typescript'; - default: - this.logger.warn('Unknown language, defaulting to python', { - requestedLanguage: language - }); - return 'python'; - } - } - - private formatOutputData(output: RichOutput): Record { - const result: Record = {}; - - switch (output.type) { - case 'image': - result.png = output.data; - break; - case 'jpeg': - result.jpeg = output.data; - break; - case 'svg': - result.svg = output.data; - break; - case 'html': - result.html = output.data; - break; - case 'json': - result.json = - typeof output.data === 'string' - ? JSON.parse(output.data) - : output.data; - break; - case 'latex': - result.latex = output.data; - break; - case 'markdown': - result.markdown = output.data; - break; - case 'javascript': - result.javascript = output.data; - break; - case 'text': - result.text = output.data; - break; - default: - result.text = output.data || ''; - } - - return result; - } - - /** - * Format event as SSE (Server-Sent Events) - * SSE format requires "data: " prefix and double newline separator - * @param event - Event object to send - * @returns SSE-formatted string - */ - private formatSSE(event: Record): string { - return `data: ${JSON.stringify(event)}\n\n`; - } -} diff --git a/packages/sandbox-container/src/mime-processor.ts b/packages/sandbox-container/src/mime-processor.ts deleted file mode 100644 index c9ad4d48..00000000 --- a/packages/sandbox-container/src/mime-processor.ts +++ /dev/null @@ -1,280 +0,0 @@ -export interface ExecutionResult { - type: 'result' | 'stdout' | 'stderr' | 'error' | 'execution_complete'; - text?: string; - html?: string; - png?: string; // base64 - jpeg?: string; // base64 - svg?: string; - latex?: string; - markdown?: string; - javascript?: string; - json?: any; - chart?: ChartData; - data?: any; - metadata?: any; - execution_count?: number; - ename?: string; - evalue?: string; - traceback?: string[]; - timestamp: number; -} - -export interface ChartData { - type: - | 'line' - | 'bar' - | 'scatter' - | 'pie' - | 'histogram' - | 'heatmap' - | 'unknown'; - title?: string; - data: any; - layout?: any; - config?: any; - library?: 'matplotlib' | 'plotly' | 'altair' | 'seaborn' | 'unknown'; -} - -export function processMessage(msg: any): ExecutionResult | null { - const msgType = msg.header?.msg_type || msg.msg_type; - - switch (msgType) { - case 'execute_result': - case 'display_data': - return processDisplayData(msg.content.data, msg.content.metadata); - - case 'stream': - return { - type: msg.content.name === 'stdout' ? 'stdout' : 'stderr', - text: msg.content.text, - timestamp: Date.now() - }; - - case 'error': - return { - type: 'error', - ename: msg.content.ename, - evalue: msg.content.evalue, - traceback: msg.content.traceback, - timestamp: Date.now() - }; - - default: - return null; - } -} - -function processDisplayData(data: any, metadata?: any): ExecutionResult { - const result: ExecutionResult = { - type: 'result', - timestamp: Date.now(), - metadata - }; - - // Process different MIME types in order of preference - - // Interactive/Rich formats - if (data['application/vnd.plotly.v1+json']) { - result.chart = extractPlotlyChart(data['application/vnd.plotly.v1+json']); - result.json = data['application/vnd.plotly.v1+json']; - } - - if (data['application/vnd.vega.v5+json']) { - result.chart = extractVegaChart( - data['application/vnd.vega.v5+json'], - 'vega' - ); - result.json = data['application/vnd.vega.v5+json']; - } - - if ( - data['application/vnd.vegalite.v4+json'] || - data['application/vnd.vegalite.v5+json'] - ) { - const vegaData = - data['application/vnd.vegalite.v4+json'] || - data['application/vnd.vegalite.v5+json']; - result.chart = extractVegaChart(vegaData, 'vega-lite'); - result.json = vegaData; - } - - // HTML content (tables, formatted output) - if (data['text/html']) { - result.html = data['text/html']; - - // Check if it's a pandas DataFrame - if (isPandasDataFrame(data['text/html'])) { - result.data = { type: 'dataframe', html: data['text/html'] }; - } - } - - // Images - if (data['image/png']) { - result.png = data['image/png']; - - // Try to detect if it's a chart - if (isLikelyChart(data, metadata)) { - result.chart = { - type: 'unknown', - library: 'matplotlib', - data: { image: data['image/png'] } - }; - } - } - - if (data['image/jpeg']) { - result.jpeg = data['image/jpeg']; - } - - if (data['image/svg+xml']) { - result.svg = data['image/svg+xml']; - } - - // Mathematical content - if (data['text/latex']) { - result.latex = data['text/latex']; - } - - // Code - if (data['application/javascript']) { - result.javascript = data['application/javascript']; - } - - // Structured data - if (data['application/json']) { - result.json = data['application/json']; - } - - // Markdown - if (data['text/markdown']) { - result.markdown = data['text/markdown']; - } - - // Plain text (fallback) - if (data['text/plain']) { - result.text = data['text/plain']; - } - - return result; -} - -function extractPlotlyChart(plotlyData: any): ChartData { - const data = plotlyData.data || plotlyData; - const layout = plotlyData.layout || {}; - - // Try to detect chart type from traces - let chartType: ChartData['type'] = 'unknown'; - if (data && data.length > 0) { - const firstTrace = data[0]; - if (firstTrace.type === 'scatter') { - chartType = firstTrace.mode?.includes('lines') ? 'line' : 'scatter'; - } else if (firstTrace.type === 'bar') { - chartType = 'bar'; - } else if (firstTrace.type === 'pie') { - chartType = 'pie'; - } else if (firstTrace.type === 'histogram') { - chartType = 'histogram'; - } else if (firstTrace.type === 'heatmap') { - chartType = 'heatmap'; - } - } - - return { - type: chartType, - title: layout.title?.text || layout.title, - data: data, - layout: layout, - config: plotlyData.config, - library: 'plotly' - }; -} - -function extractVegaChart( - vegaData: any, - format: 'vega' | 'vega-lite' -): ChartData { - // Try to detect chart type from mark or encoding - let chartType: ChartData['type'] = 'unknown'; - - if (format === 'vega-lite' && vegaData.mark) { - const mark = - typeof vegaData.mark === 'string' ? vegaData.mark : vegaData.mark.type; - switch (mark) { - case 'line': - chartType = 'line'; - break; - case 'bar': - chartType = 'bar'; - break; - case 'point': - case 'circle': - chartType = 'scatter'; - break; - case 'arc': - chartType = 'pie'; - break; - case 'rect': - if (vegaData.encoding?.color) { - chartType = 'heatmap'; - } - break; - } - } - - return { - type: chartType, - title: vegaData.title, - data: vegaData, - library: 'altair' // Altair outputs Vega-Lite - }; -} - -function isPandasDataFrame(html: string): boolean { - // Simple heuristic to detect pandas DataFrame HTML - return ( - html.includes('dataframe') || - (html.includes(' void; } export interface ExecutionResult { @@ -45,7 +46,7 @@ export interface RichOutput { } export interface PoolConfig { - maxProcesses: number; + maxProcesses?: number; idleTimeout: number; // milliseconds minSize: number; } @@ -61,19 +62,19 @@ const DEFAULT_EXECUTOR_CONFIGS: Record< python: { executor: 'python', minSize: 3, - maxProcesses: 15, + maxProcesses: undefined, // unlimited by default idleTimeout: 5 * 60 * 1000 // 5 minutes }, javascript: { executor: 'javascript', minSize: 3, - maxProcesses: 10, + maxProcesses: undefined, // unlimited by default idleTimeout: 5 * 60 * 1000 }, typescript: { executor: 'typescript', minSize: 3, - maxProcesses: 10, + maxProcesses: undefined, // unlimited by default idleTimeout: 5 * 60 * 1000 } }; @@ -84,6 +85,19 @@ export class ProcessPoolManager { private cleanupInterval?: NodeJS.Timeout; private logger: Logger; + // Track which executor belongs to which context + private contextExecutors: Map = new Map(); + + // Track unassigned executors available for new contexts + private availableExecutors: Map = + new Map(); + + // Per-language mutexes for atomic pool operations + private poolLocks: Map = new Map(); + + // Per-executor mutexes for serializing execution + private executorLocks: Map = new Map(); + constructor( customConfigs: Partial< Record> @@ -110,11 +124,15 @@ export class ProcessPoolManager { : userConfig.minSize || defaultConfig.minSize, maxProcesses: envMaxSize ? parseInt(envMaxSize, 10) - : userConfig.maxProcesses || defaultConfig.maxProcesses + : userConfig.maxProcesses !== undefined + ? userConfig.maxProcesses + : defaultConfig.maxProcesses }; this.poolConfigs.set(executor, config); this.pools.set(executor, []); + this.availableExecutors.set(executor, []); + this.poolLocks.set(executor, new Mutex()); } const pythonConfig = this.poolConfigs.get('python'); @@ -132,6 +150,44 @@ export class ProcessPoolManager { }); } + private getExecutorLock(executorId: string): Mutex { + let mutex = this.executorLocks.get(executorId); + if (!mutex) { + mutex = new Mutex(); + this.executorLocks.set(executorId, mutex); + } + return mutex; + } + + private async borrowExecutor( + language: InterpreterLanguage + ): Promise { + const mutex = this.poolLocks.get(language)!; + return await mutex.runExclusive(async () => { + const available = this.availableExecutors.get(language) || []; + if (available.length > 0) { + return available.shift()!; + } + // Create temporary executor if none available + const executor = await this.createProcess(language, undefined); + const pool = this.pools.get(language)!; + pool.push(executor); + return executor; + }); + } + + private async returnExecutor( + language: InterpreterLanguage, + executor: InterpreterProcess + ): Promise { + const mutex = this.poolLocks.get(language)!; + await mutex.runExclusive(async () => { + const available = this.availableExecutors.get(language) || []; + available.push(executor); + this.availableExecutors.set(language, available); + }); + } + async execute( language: InterpreterLanguage, code: string, @@ -139,89 +195,92 @@ export class ProcessPoolManager { timeout?: number ): Promise { const totalStartTime = Date.now(); - const process = await this.getProcess(language, sessionId); - const processAcquireTime = Date.now() - totalStartTime; - const executionId = randomUUID(); + if (sessionId) { + // Context execution: Get dedicated executor and lock on it + const contextExecutor = this.contextExecutors.get(sessionId); - try { - const execStartTime = Date.now(); - // Use provided timeout, or fall back to config (which may be undefined = unlimited) - const effectiveTimeout = - timeout ?? CONFIG.INTERPRETER_EXECUTION_TIMEOUT_MS; - const result = await this.executeCode( - process, - code, - executionId, - effectiveTimeout - ); - const execTime = Date.now() - execStartTime; - const totalTime = Date.now() - totalStartTime; - - this.logger.debug('Code execution complete', { - processAcquireTime, - execTime, - totalTime, - language - }); - return result; - } finally { - this.releaseProcess(process, sessionId); - } - } + if (!contextExecutor || contextExecutor.process.killed) { + if (contextExecutor) { + this.contextExecutors.delete(sessionId); + } + throw new Error( + `Context ${sessionId} not found or executor process terminated` + ); + } - private async getProcess( - language: InterpreterLanguage, - sessionId?: string - ): Promise { - const pool = this.pools.get(language)!; + if (contextExecutor.language !== language) { + throw new Error( + `Context ${sessionId} was created for ${contextExecutor.language}, cannot execute ${language} code` + ); + } - if (sessionId) { - const existingProcess = pool.find( - (p) => p.sessionId === sessionId && p.isAvailable + // Lock on the executor to serialize execution + const mutex = this.getExecutorLock(contextExecutor.id); + return await mutex.runExclusive(() => + this.executeInProcess(contextExecutor, code, totalStartTime, timeout) ); - if (existingProcess) { - existingProcess.isAvailable = false; - existingProcess.lastUsed = new Date(); - return existingProcess; + } else { + // Stateless execution: Borrow executor, execute, return + const executor = await this.borrowExecutor(language); + try { + const mutex = this.getExecutorLock(executor.id); + return await mutex.runExclusive(() => + this.executeInProcess(executor, code, totalStartTime, timeout) + ); + } finally { + await this.returnExecutor(language, executor); } } + } - const availableProcess = pool.find((p) => p.isAvailable && !p.sessionId); - if (availableProcess) { - availableProcess.isAvailable = false; - availableProcess.sessionId = sessionId; - availableProcess.lastUsed = new Date(); - return availableProcess; - } - - const config = this.poolConfigs.get(language)!; - if (pool.length < config.maxProcesses) { - const newProcess = await this.createProcess(language, sessionId); - pool.push(newProcess); - return newProcess; - } + private async executeInProcess( + process: InterpreterProcess, + code: string, + totalStartTime: number, + timeout?: number + ): Promise { + const processAcquireTime = Date.now() - totalStartTime; + const executionId = randomUUID(); - return new Promise((resolve) => { - const checkForAvailable = () => { - const available = pool.find((p) => p.isAvailable); - if (available) { - available.isAvailable = false; - available.sessionId = sessionId; - available.lastUsed = new Date(); - resolve(available); - } else { - setTimeout(checkForAvailable, 100); - } - }; - checkForAvailable(); + const execStartTime = Date.now(); + const effectiveTimeout = timeout ?? CONFIG.INTERPRETER_EXECUTION_TIMEOUT_MS; + const result = await this.executeCode( + process, + code, + executionId, + effectiveTimeout + ); + const execTime = Date.now() - execStartTime; + const totalTime = Date.now() - totalStartTime; + + this.logger.debug('Code execution complete', { + processAcquireTime, + execTime, + totalTime, + language: process.language }); + + return result; } private async createProcess( language: InterpreterLanguage, sessionId?: string ): Promise { + // Enforce per-language process limit if configured + const config = this.poolConfigs.get(language)!; + const pool = this.pools.get(language)!; + + if ( + config.maxProcesses !== undefined && + pool.length >= config.maxProcesses + ) { + throw new Error( + `Maximum ${language} executor limit reached (${config.maxProcesses}). Cannot create new executor.` + ); + } + const startTime = Date.now(); const id = randomUUID(); let command: string; @@ -270,10 +329,43 @@ export class ProcessPoolManager { language, process: childProcess, sessionId, - lastUsed: new Date(), - isAvailable: false + lastUsed: new Date() }; + // Register exit handler for cleanup (prevents memory leaks) + const exitHandler = ( + code: number | null, + signal: NodeJS.Signals | null + ) => { + this.logger.warn('Executor process exited unexpectedly', { + language, + processId: id, + sessionId, + exitCode: code, + signal + }); + + // Clean up from all tracking structures + if (sessionId) { + this.contextExecutors.delete(sessionId); + } + + const pool = this.pools.get(language); + if (pool) { + const index = pool.indexOf(interpreterProcess); + if (index > -1) pool.splice(index, 1); + } + + const available = this.availableExecutors.get(language); + if (available) { + const index = available.indexOf(interpreterProcess); + if (index > -1) available.splice(index, 1); + } + }; + + interpreterProcess.exitHandler = exitHandler; + childProcess.once('exit', exitHandler); + return new Promise((resolve, reject) => { let readyBuffer = ''; let errorBuffer = ''; @@ -401,16 +493,100 @@ export class ProcessPoolManager { }); } - private releaseProcess( - process: InterpreterProcess, - sessionId?: string - ): void { - if (!sessionId) { - process.sessionId = undefined; - process.isAvailable = true; - } else { - process.isAvailable = true; + async reserveExecutorForContext( + contextId: string, + language: InterpreterLanguage + ): Promise { + const mutex = this.poolLocks.get(language)!; + await mutex.runExclusive(async () => { + const available = this.availableExecutors.get(language) || []; + + let executor: InterpreterProcess; + + if (available.length > 0) { + // Use an available executor + executor = available.shift()!; + this.availableExecutors.set(language, available); + + this.logger.debug('Assigned available executor to context', { + contextId, + language, + executorId: executor.id + }); + } else { + // No available executors, create a new one + executor = await this.createProcess(language, contextId); + + // Add to main pool for tracking + const pool = this.pools.get(language)!; + pool.push(executor); + + this.logger.debug('Created new executor for context', { + contextId, + language, + executorId: executor.id + }); + } + + // Assign executor to context + executor.sessionId = contextId; + + // Track in contextExecutors map + this.contextExecutors.set(contextId, executor); + }); + } + + async releaseExecutorForContext( + contextId: string, + language: InterpreterLanguage + ): Promise { + const executor = this.contextExecutors.get(contextId); + if (!executor) { + this.logger.debug('Context already released or never existed', { + contextId + }); + return; + } + + this.logger.debug('Releasing executor for context', { + contextId, + language, + executorId: executor.id + }); + + // Remove from context ownership map + this.contextExecutors.delete(contextId); + + // Remove exit handler to prevent memory leak + if (executor.exitHandler) { + executor.process.removeListener('exit', executor.exitHandler); + } + + // Clean up executor lock + this.executorLocks.delete(executor.id); + + // Terminate the executor process immediately + executor.process.kill(); + + // Remove from main pool + const pool = this.pools.get(language); + if (pool) { + const index = pool.indexOf(executor); + if (index > -1) { + pool.splice(index, 1); + } + } + + // Ensure minimum pool is maintained + await this.ensureMinimumPool(language); + } + + isContextExecutorHealthy(contextId: string): boolean { + const executor = this.contextExecutors.get(contextId); + if (!executor) { + return false; } + return !executor.process.killed && executor.process.exitCode === null; } private async startPreWarming(): Promise { @@ -448,20 +624,10 @@ export class ProcessPoolManager { targetCount: config.minSize }); - const pool = this.pools.get(executor); - if (!pool) { - this.logger.debug('No pool found for executor', { executor }); - return; - } - + // Use the dedicated method for creating unassigned executors for (let i = 0; i < config.minSize; i++) { try { - const sessionId = `pre-warm-${executor}-${i}-${Date.now()}`; - const process = await this.createProcess(executor, sessionId); - - process.isAvailable = true; - process.sessionId = undefined; - pool.push(process); + await this.createUnassignedExecutor(executor); } catch (error) { this.logger.debug('Failed to pre-warm process', { executor, @@ -472,7 +638,7 @@ export class ProcessPoolManager { } const warmupTime = Date.now() - startTime; - const actualCount = pool.filter((p) => p.isAvailable).length; + const actualCount = this.availableExecutors.get(executor)?.length || 0; this.logger.debug('Pre-warming executor complete', { executor, actualCount, @@ -484,36 +650,94 @@ export class ProcessPoolManager { private cleanupIdleProcesses(): void { const now = new Date(); - const executors = Array.from(this.pools.keys()); - for (const executor of executors) { - const pool = this.pools.get(executor); - const config = this.poolConfigs.get(executor); - - if (!pool || !config) { - continue; - } + // Only clean up unassigned executors from availableExecutors pool + for (const [language, available] of this.availableExecutors.entries()) { + const config = this.poolConfigs.get(language); + if (!config) continue; - for (let i = pool.length - 1; i >= 0; i--) { - const process = pool[i]; + // Iterate backwards to safely remove during iteration + for (let i = available.length - 1; i >= 0; i--) { + const process = available[i]; const idleTime = now.getTime() - process.lastUsed.getTime(); - // Only clean up excess processes beyond minimum pool size + // Keep minimum pool size, clean up idle executors beyond that if ( - process.isAvailable && idleTime > config.idleTimeout && - pool.filter((p) => p.isAvailable).length > config.minSize + available.length > config.minSize ) { process.process.kill(); - pool.splice(i, 1); - this.logger.debug('Cleaned up idle process', { - executor, - remainingCount: pool.length + available.splice(i, 1); + + // Also remove from main pool + const pool = this.pools.get(language); + if (pool) { + const poolIndex = pool.indexOf(process); + if (poolIndex > -1) pool.splice(poolIndex, 1); + } + + this.logger.debug('Cleaned up idle unassigned executor', { + language, + remainingAvailable: available.length }); } } } } + async ensureMinimumPool(language: InterpreterLanguage): Promise { + const config = this.poolConfigs.get(language); + if (!config) return; + + const available = this.availableExecutors.get(language) || []; + const currentAvailable = available.length; + const needed = config.minSize - currentAvailable; + + if (needed > 0) { + this.logger.debug('Replenishing minimum pool', { + language, + currentAvailable, + needed, + targetMinimum: config.minSize + }); + + const spawnPromises = []; + for (let i = 0; i < needed; i++) { + spawnPromises.push(this.createUnassignedExecutor(language)); + } + await Promise.all(spawnPromises); + } + } + + private async createUnassignedExecutor( + language: InterpreterLanguage + ): Promise { + const executor = await this.createProcess(language, undefined); + + // Add to available pool + const available = this.availableExecutors.get(language) || []; + available.push(executor); + this.availableExecutors.set(language, available); + + // Add to main pool for tracking + const pool = this.pools.get(language)!; + pool.push(executor); + + this.logger.debug('Created unassigned executor', { + language, + executorId: executor.id + }); + } + + // For testing: get executor assigned to context + getExecutorForContext(contextId: string): InterpreterProcess | undefined { + return this.contextExecutors.get(contextId); + } + + // For testing: get available executors for language + getAvailableExecutors(language: InterpreterLanguage): InterpreterProcess[] { + return this.availableExecutors.get(language) || []; + } + async shutdown(): Promise { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); diff --git a/packages/sandbox-container/src/services/interpreter-service.ts b/packages/sandbox-container/src/services/interpreter-service.ts index c187042e..31c926a0 100644 --- a/packages/sandbox-container/src/services/interpreter-service.ts +++ b/packages/sandbox-container/src/services/interpreter-service.ts @@ -1,5 +1,4 @@ -// Wrapper service following init-testing's ServiceResult pattern - +import { randomUUID } from 'node:crypto'; import type { Logger } from '@repo/shared'; import type { CodeExecutionContext, @@ -10,22 +9,51 @@ import type { import { ErrorCode } from '@repo/shared/errors'; import type { ServiceResult } from '../core/types'; import { - type Context, - InterpreterService as CoreInterpreterService, - type CreateContextRequest, - type HealthStatus, - InterpreterNotReadyError -} from '../interpreter-service'; + type InterpreterLanguage, + processPool, + type RichOutput +} from '../runtime/process-pool'; + +export interface CreateContextRequest { + language?: string; + cwd?: string; +} + +export interface Context { + id: string; + language: string; + cwd: string; + createdAt: string; + lastUsed: string; +} + +export interface HealthStatus { + ready: boolean; + initializing: boolean; + progress: number; +} + +export class InterpreterNotReadyError extends Error { + progress: number; + retryAfter: number; + + constructor(message: string, progress: number = 100, retryAfter: number = 1) { + super(message); + this.progress = progress; + this.retryAfter = retryAfter; + this.name = 'InterpreterNotReadyError'; + } +} /** - * Interpreter service wrapper that follows the ServiceResult pattern - * Wraps the core InterpreterService to maintain consistency with other services + * Interpreter service for managing code execution contexts */ export class InterpreterService { - private coreService: CoreInterpreterService; + private contexts: Map = new Map(); + private logger: Logger; - constructor(private logger: Logger) { - this.coreService = new CoreInterpreterService(logger); + constructor(logger: Logger) { + this.logger = logger; } /** @@ -33,11 +61,13 @@ export class InterpreterService { */ async getHealthStatus(): Promise> { try { - const status = await this.coreService.getHealthStatus(); - return { success: true, - data: status + data: { + ready: true, + initializing: false, + progress: 100 + } }; } catch (error) { const errorMessage = @@ -66,14 +96,47 @@ export class InterpreterService { async createContext( request: CreateContextRequest ): Promise> { + let executorReserved = false; + let contextId: string | undefined; + let language: InterpreterLanguage | undefined; + try { - const context = await this.coreService.createContext(request); + contextId = randomUUID(); + language = this.mapLanguage( + request.language || 'python' + ) as InterpreterLanguage; + + const context: Context = { + id: contextId, + language, + cwd: request.cwd || '/workspace', + createdAt: new Date().toISOString(), + lastUsed: new Date().toISOString() + }; + + await processPool.reserveExecutorForContext(contextId, language); + executorReserved = true; + + this.contexts.set(contextId, context); return { success: true, data: context }; } catch (error) { + // If executor was reserved but context creation failed, release it + if (executorReserved && contextId && language) { + try { + await processPool.releaseExecutorForContext(contextId, language); + } catch (releaseError) { + this.logger.error( + 'Failed to release executor after context creation failure', + releaseError as Error, + { contextId, language } + ); + } + } + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; this.logger.error( @@ -114,7 +177,7 @@ export class InterpreterService { */ async listContexts(): Promise> { try { - const contexts = await this.coreService.listContexts(); + const contexts = Array.from(this.contexts.values()); return { success: true, @@ -146,7 +209,39 @@ export class InterpreterService { */ async deleteContext(contextId: string): Promise> { try { - await this.coreService.deleteContext(contextId); + const context = this.contexts.get(contextId); + if (!context) { + return { + success: false, + error: { + message: `Code context '${contextId}' not found`, + code: ErrorCode.CONTEXT_NOT_FOUND, + details: { + contextId + } satisfies ContextNotFoundContext + } + }; + } + + try { + await processPool.releaseExecutorForContext( + contextId, + context.language as InterpreterLanguage + ); + } catch (error) { + this.logger.error( + 'Failed to release executor for context', + error as Error, + { + contextId, + language: context.language + } + ); + throw error; + } finally { + // Always remove context from map, even if release fails + this.contexts.delete(contextId); + } return { success: true @@ -160,20 +255,6 @@ export class InterpreterService { { contextId } ); - // Check if it's a "not found" error - if (errorMessage.includes('not found')) { - return { - success: false, - error: { - message: `Code context '${contextId}' not found`, - code: ErrorCode.CONTEXT_NOT_FOUND, - details: { - contextId - } satisfies ContextNotFoundContext - } - }; - } - return { success: false, error: { @@ -202,14 +283,156 @@ export class InterpreterService { language?: string ): Promise { try { - // The core service returns a Response directly for streaming - const response = await this.coreService.executeCode( - contextId, + const context = this.contexts.get(contextId); + if (!context) { + return new Response( + JSON.stringify({ + error: `Context ${contextId} not found` + }), + { + status: 404, + headers: { 'Content-Type': 'application/json' } + } + ); + } + + context.lastUsed = new Date().toISOString(); + + // Check if executor is healthy + if (!processPool.isContextExecutorHealthy(contextId)) { + return new Response( + JSON.stringify({ + error: `Context executor has terminated. Please delete and recreate the context.`, + code: ErrorCode.INTERNAL_ERROR, + details: { + contextId + } + }), + { + status: 410, + headers: { 'Content-Type': 'application/json' } + } + ); + } + + const execLanguage = this.mapLanguage(language || context.language); + const self = this; + + const result = await processPool.execute( + execLanguage, code, - language + contextId, + undefined ); - return response; + const stream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + + try { + if (result.stdout) { + controller.enqueue( + encoder.encode( + self.formatSSE({ + type: 'stdout', + text: result.stdout + }) + ) + ); + } + + if (result.stderr) { + controller.enqueue( + encoder.encode( + self.formatSSE({ + type: 'stderr', + text: result.stderr + }) + ) + ); + } + + if (result.outputs && result.outputs.length > 0) { + for (const output of result.outputs) { + const outputData = self.formatOutputData(output); + controller.enqueue( + encoder.encode( + self.formatSSE({ + type: 'result', + ...outputData, + metadata: output.metadata || {} + }) + ) + ); + } + } + + if (result.success) { + controller.enqueue( + encoder.encode( + self.formatSSE({ + type: 'execution_complete', + execution_count: 1 + }) + ) + ); + } else if (result.error) { + controller.enqueue( + encoder.encode( + self.formatSSE({ + type: 'error', + ename: result.error.type || 'ExecutionError', + evalue: result.error.message || 'Code execution failed', + traceback: result.error.traceback + ? result.error.traceback.split('\n') + : [] + }) + ) + ); + } else { + controller.enqueue( + encoder.encode( + self.formatSSE({ + type: 'error', + ename: 'ExecutionError', + evalue: result.stderr || 'Code execution failed', + traceback: [] + }) + ) + ); + } + + controller.close(); + } catch (error) { + self.logger.error('Code execution failed', error as Error, { + contextId, + language: execLanguage + }); + + controller.enqueue( + encoder.encode( + self.formatSSE({ + type: 'error', + ename: 'InternalError', + evalue: + error instanceof Error ? error.message : String(error), + traceback: [] + }) + ) + ); + + controller.close(); + } + } + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive' + } + }); } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; @@ -238,4 +461,77 @@ export class InterpreterService { ); } } + + private mapLanguage(language: string): InterpreterLanguage { + const normalized = language.toLowerCase(); + + switch (normalized) { + case 'python': + case 'python3': + return 'python'; + case 'javascript': + case 'js': + case 'node': + return 'javascript'; + case 'typescript': + case 'ts': + return 'typescript'; + default: + this.logger.warn('Unknown language, defaulting to python', { + requestedLanguage: language + }); + return 'python'; + } + } + + private formatOutputData(output: RichOutput): Record { + const result: Record = {}; + + switch (output.type) { + case 'image': + result.png = output.data; + break; + case 'jpeg': + result.jpeg = output.data; + break; + case 'svg': + result.svg = output.data; + break; + case 'html': + result.html = output.data; + break; + case 'json': + result.json = + typeof output.data === 'string' + ? JSON.parse(output.data) + : output.data; + break; + case 'latex': + result.latex = output.data; + break; + case 'markdown': + result.markdown = output.data; + break; + case 'javascript': + result.javascript = output.data; + break; + case 'text': + result.text = output.data; + break; + default: + result.text = output.data || ''; + } + + return result; + } + + /** + * Format event as SSE (Server-Sent Events) + * SSE format requires "data: " prefix and double newline separator + * @param event - Event object to send + * @returns SSE-formatted string + */ + private formatSSE(event: Record): string { + return `data: ${JSON.stringify(event)}\n\n`; + } } diff --git a/packages/sandbox-container/tests/handlers/interpreter-handler.test.ts b/packages/sandbox-container/tests/handlers/interpreter-handler.test.ts index 52b52a4d..9a7a87d7 100644 --- a/packages/sandbox-container/tests/handlers/interpreter-handler.test.ts +++ b/packages/sandbox-container/tests/handlers/interpreter-handler.test.ts @@ -16,9 +16,9 @@ import { InterpreterHandler } from '@sandbox-container/handlers/interpreter-hand import type { Context, CreateContextRequest, - HealthStatus -} from '@sandbox-container/interpreter-service'; -import type { InterpreterService } from '@sandbox-container/services/interpreter-service'; + HealthStatus, + InterpreterService +} from '@sandbox-container/services/interpreter-service'; import { mocked } from '../test-utils'; // Mock the service dependencies diff --git a/packages/sandbox/Dockerfile b/packages/sandbox/Dockerfile index d377646f..767dec8c 100644 --- a/packages/sandbox/Dockerfile +++ b/packages/sandbox/Dockerfile @@ -172,13 +172,10 @@ COPY --from=prod-deps /app/node_modules ./node_modules COPY --from=prod-deps /app/packages/shared/dist ./packages/shared/dist COPY --from=prod-deps /app/packages/shared/package.json ./packages/shared/package.json -# Configure process pool sizes (can be overridden at runtime) +# Configure process pool sizes (minimum only) ENV PYTHON_POOL_MIN_SIZE=3 -ENV PYTHON_POOL_MAX_SIZE=15 ENV JAVASCRIPT_POOL_MIN_SIZE=3 -ENV JAVASCRIPT_POOL_MAX_SIZE=10 ENV TYPESCRIPT_POOL_MIN_SIZE=3 -ENV TYPESCRIPT_POOL_MAX_SIZE=10 # Create clean workspace directory for user code RUN mkdir -p /workspace diff --git a/tests/e2e/code-interpreter-workflow.test.ts b/tests/e2e/code-interpreter-workflow.test.ts index efdf3851..8e5da7ea 100644 --- a/tests/e2e/code-interpreter-workflow.test.ts +++ b/tests/e2e/code-interpreter-workflow.test.ts @@ -579,6 +579,219 @@ console.log('Sum:', sum); ); }, 120000); + test('should maintain isolation across many contexts (12+)', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + // Create 12 contexts and execute same code that declares a variable + for (let i = 0; i < 12; i++) { + const ctxResponse = await fetch(`${workerUrl}/api/code/context/create`, { + method: 'POST', + headers, + body: JSON.stringify({ language: 'javascript' }) + }); + + expect(ctxResponse.status).toBe(200); + const context = (await ctxResponse.json()) as CodeContext; + + const execResponse = await fetch(`${workerUrl}/api/code/execute`, { + method: 'POST', + headers, + body: JSON.stringify({ + code: 'const value = 2;', + options: { context } + }) + }); + + expect(execResponse.status).toBe(200); + const execution = (await execResponse.json()) as ExecutionResult; + expect( + execution.error, + `Context ${i + 1} should not have error` + ).toBeUndefined(); + + // Clean up immediately + await fetch(`${workerUrl}/api/code/context/${context.id}`, { + method: 'DELETE', + headers + }); + } + }, 120000); + + test('should maintain state isolation with concurrent context execution', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + // Create contexts sequentially + const contexts: CodeContext[] = []; + for (let i = 0; i < 4; i++) { + const response = await fetch(`${workerUrl}/api/code/context/create`, { + method: 'POST', + headers, + body: JSON.stringify({ language: 'javascript' }) + }); + expect(response.status).toBe(200); + const context = (await response.json()) as CodeContext; + contexts.push(context); + } + + // Execute different code in each context concurrently + // Each context sets its own unique value + const executionPromises = contexts.map((context, i) => + fetch(`${workerUrl}/api/code/execute`, { + method: 'POST', + headers, + body: JSON.stringify({ + code: `const value = ${i}; value;`, + options: { context } + }) + }).then((r) => r.json()) + ); + + const execResults = (await Promise.all( + executionPromises + )) as ExecutionResult[]; + + // Verify each execution succeeded and returned the correct value + for (let i = 0; i < contexts.length; i++) { + expect( + execResults[i].error, + `Context ${i} should not have error` + ).toBeUndefined(); + expect(execResults[i].results).toBeDefined(); + expect(execResults[i].results![0].text).toContain(String(i)); + } + + // Now verify each context maintained its isolated state by reading the value back + const verificationPromises = contexts.map((context) => + fetch(`${workerUrl}/api/code/execute`, { + method: 'POST', + headers, + body: JSON.stringify({ + code: 'value;', + options: { context } + }) + }).then((r) => r.json()) + ); + + const verifyResults = (await Promise.all( + verificationPromises + )) as ExecutionResult[]; + + // Each context should still have its own unique value + for (let i = 0; i < contexts.length; i++) { + expect( + verifyResults[i].error, + `Context ${i} verification should not have error` + ).toBeUndefined(); + expect(verifyResults[i].results).toBeDefined(); + expect(verifyResults[i].results![0].text).toContain(String(i)); + } + + // Clean up all contexts + await Promise.all( + contexts.map((context) => + fetch(`${workerUrl}/api/code/context/${context.id}`, { + method: 'DELETE', + headers + }) + ) + ); + }, 120000); + + test('should prevent concurrent execution on same context', async () => { + currentSandboxId = createSandboxId(); + const headers = createTestHeaders(currentSandboxId); + + // Create single context + const ctxResponse = await fetch(`${workerUrl}/api/code/context/create`, { + method: 'POST', + headers, + body: JSON.stringify({ language: 'javascript' }) + }); + + expect(ctxResponse.status).toBe(200); + const context = (await ctxResponse.json()) as CodeContext; + + // Set up initial state with a counter variable + const setupResponse = await fetch(`${workerUrl}/api/code/execute`, { + method: 'POST', + headers, + body: JSON.stringify({ + code: 'let counter = 0;', + options: { context } + }) + }); + expect(setupResponse.status).toBe(200); + + // Launch 20 concurrent executions that all try to increment the counter + // The mutex will queue these requests and execute them serially + const concurrentRequests = 20; + const requests = Array.from({ length: concurrentRequests }, () => + fetch(`${workerUrl}/api/code/execute`, { + method: 'POST', + headers, + body: JSON.stringify({ + code: 'counter++; counter;', + options: { context } + }) + }).then((r) => r.json()) + ); + + const results = await Promise.allSettled(requests); + + // Analyze results - collect all counter values + const counterValues: number[] = []; + + for (const result of results) { + if (result.status === 'fulfilled') { + const execution = result.value as ExecutionResult; + expect(execution.error).toBeUndefined(); + + // Extract the counter value from successful execution results + if (execution.results && execution.results.length > 0) { + const resultText = execution.results[0].text; + if (resultText) { + const match = resultText.match(/\d+/); + if (match) { + counterValues.push(parseInt(match[0], 10)); + } + } + } + } + } + + // Verify: all 20 requests succeeded + expect(counterValues.length).toBe(concurrentRequests); + + // Verify serial execution: counter values should be 1 through 20 + // Sort to handle out-of-order responses (execution was still serial) + counterValues.sort((a, b) => a - b); + expect(counterValues).toEqual(Array.from({ length: 20 }, (_, i) => i + 1)); + + // Verify final state: counter should be 20 (proving all executed serially) + const finalCheck = await fetch(`${workerUrl}/api/code/execute`, { + method: 'POST', + headers, + body: JSON.stringify({ + code: 'counter;', + options: { context } + }) + }); + const finalResult = (await finalCheck.json()) as ExecutionResult; + const finalCounterText = finalResult.results?.[0]?.text; + const finalCounterValue = finalCounterText + ? parseInt(finalCounterText.match(/\d+/)?.[0] || '0', 10) + : 0; + expect(finalCounterValue).toBe(20); + + // Clean up + await fetch(`${workerUrl}/api/code/context/${context.id}`, { + method: 'DELETE', + headers + }); + }, 120000); + // ============================================================================ // Error Handling // ============================================================================