diff --git a/.env.example b/.env.example index 0e1c1f2..243d6c3 100644 --- a/.env.example +++ b/.env.example @@ -8,6 +8,9 @@ SLACK_HISTORY_API_MAX_LIMIT=50 SLACK_ACTIVE_TURN_RECONCILE_INTERVAL_MS=15000 SLACK_PROGRESS_REMINDER_AFTER_MS=120000 SLACK_PROGRESS_REMINDER_REPEAT_MS=120000 +SESSION_ARTIFACT_INACTIVE_TTL_MS=21600000 +SESSION_ARTIFACT_CLEANUP_INTERVAL_MS=3600000 +SESSION_ARTIFACT_CLEANUP_MAX_PER_SWEEP=20 # Service storage PORT=3000 diff --git a/README.md b/README.md index ab0d332..1bae906 100644 --- a/README.md +++ b/README.md @@ -292,6 +292,8 @@ Disposable runtime state: - `logs/` - `repos/` +The worker also runs a session artifact janitor for inactive sessions. By default it removes rebuildable macOS worktree artifacts like `frontend/macos/.build` after 6 hours of inactivity, while leaving the session workspace and source checkout in place. + The macOS bare-run deploy path only reuses the durable broker-owned subset that defines behavior and identity. It intentionally leaves the disposable runtime state behind and starts the VM with a clean `sessions/`, `jobs/`, `logs/`, and `repos/`. ## Logging diff --git a/src/config.ts b/src/config.ts index 28ed096..8f4e7ad 100644 --- a/src/config.ts +++ b/src/config.ts @@ -12,6 +12,9 @@ export interface AppConfig { readonly slackMissedThreadRecoveryIntervalMs: number; readonly slackProgressReminderAfterMs: number; readonly slackProgressReminderRepeatMs: number; + readonly sessionArtifactInactiveTtlMs: number; + readonly sessionArtifactCleanupIntervalMs: number; + readonly sessionArtifactCleanupMaxPerSweep: number; readonly stateDir: string; readonly jobsRoot: string; readonly sessionsRoot: string; @@ -171,6 +174,9 @@ export function loadConfig(env = process.env): AppConfig { ), slackProgressReminderAfterMs: getNumber(env, "SLACK_PROGRESS_REMINDER_AFTER_MS", 120_000), slackProgressReminderRepeatMs: getNumber(env, "SLACK_PROGRESS_REMINDER_REPEAT_MS", 120_000), + sessionArtifactInactiveTtlMs: getNumber(env, "SESSION_ARTIFACT_INACTIVE_TTL_MS", 6 * 60 * 60 * 1_000), + sessionArtifactCleanupIntervalMs: getNumber(env, "SESSION_ARTIFACT_CLEANUP_INTERVAL_MS", 60 * 60 * 1_000), + sessionArtifactCleanupMaxPerSweep: getNumber(env, "SESSION_ARTIFACT_CLEANUP_MAX_PER_SWEEP", 20), stateDir, jobsRoot, sessionsRoot, diff --git a/src/index.ts b/src/index.ts index 3a0368b..351eb9a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,6 +10,7 @@ import { CodexRuntimeControl } from "./services/codex-runtime-control.js"; import { IsolatedMcpService } from "./services/codex/isolated-mcp-service.js"; import { GitHubAuthorMappingService } from "./services/github-author-mapping-service.js"; import { JobManager } from "./services/job-manager.js"; +import { SessionArtifactJanitor } from "./services/session-artifact-janitor.js"; import { SessionManager } from "./services/session-manager.js"; import { SlackCodexBridge } from "./services/slack/slack-codex-bridge.js"; import { StateStore } from "./store/state-store.js"; @@ -71,6 +72,12 @@ export async function startService(): Promise<{ await bridge.acceptBackgroundJobEvent(event); } }); + const sessionArtifactJanitor = new SessionArtifactJanitor({ + sessions: sessionManager, + inactivityTtlMs: config.sessionArtifactInactiveTtlMs, + cleanupIntervalMs: config.sessionArtifactCleanupIntervalMs, + cleanupMaxPerSweep: config.sessionArtifactCleanupMaxPerSweep + }); const authProfiles = new AuthProfileService({ config }); @@ -95,11 +102,13 @@ export async function startService(): Promise<{ try { await bridge.start(); await jobManager.start(); + await sessionArtifactJanitor.start(); await new Promise((resolve, reject) => { server.listen(config.port, () => resolve()); server.once("error", reject); }); } catch (error) { + await sessionArtifactJanitor.stop().catch(() => {}); await jobManager.stop().catch(() => {}); await bridge.stop().catch(() => {}); if (server.listening) { @@ -118,6 +127,7 @@ export async function startService(): Promise<{ return { stop: async () => { + await sessionArtifactJanitor.stop(); await bridge.stop(); await jobManager.stop(); await new Promise((resolve, reject) => { diff --git a/src/services/session-artifact-janitor.ts b/src/services/session-artifact-janitor.ts new file mode 100644 index 0000000..89a43a1 --- /dev/null +++ b/src/services/session-artifact-janitor.ts @@ -0,0 +1,342 @@ +import fs from "node:fs/promises"; +import type { Dirent } from "node:fs"; +import path from "node:path"; + +import { logger } from "../logger.js"; +import type { + PersistedBackgroundJob, + PersistedInboundMessage, + SlackSessionRecord +} from "../types.js"; +import { SessionManager } from "./session-manager.js"; + +interface SessionArtifactJanitorCandidate { + readonly session: SlackSessionRecord; + readonly lastActivityAt: string; + readonly lastActivityAtMs: number; +} + +export interface SessionArtifactJanitorSweepResult { + readonly reason: string; + readonly checkedCount: number; + readonly cleanedCount: number; + readonly cleanedSessionKeys: readonly string[]; +} + +export class SessionArtifactJanitor { + readonly #sessions: SessionManager; + readonly #inactivityTtlMs: number; + readonly #cleanupIntervalMs: number; + readonly #cleanupMaxPerSweep: number; + readonly #now: () => number; + + #timer: NodeJS.Timeout | undefined; + + constructor(options: { + readonly sessions: SessionManager; + readonly inactivityTtlMs: number; + readonly cleanupIntervalMs: number; + readonly cleanupMaxPerSweep: number; + readonly now?: (() => number) | undefined; + }) { + this.#sessions = options.sessions; + this.#inactivityTtlMs = options.inactivityTtlMs; + this.#cleanupIntervalMs = options.cleanupIntervalMs; + this.#cleanupMaxPerSweep = options.cleanupMaxPerSweep; + this.#now = options.now ?? (() => Date.now()); + } + + async start(): Promise { + if (!this.#isSweepEnabled()) { + logger.info("Session artifact janitor disabled", { + inactivityTtlMs: this.#inactivityTtlMs, + cleanupIntervalMs: this.#cleanupIntervalMs, + cleanupMaxPerSweep: this.#cleanupMaxPerSweep + }); + return; + } + + await this.runSweep("startup"); + + if (!this.#isPeriodicSweepEnabled()) { + return; + } + + this.#timer = setInterval(() => { + void this.runSweep("periodic").catch((error) => { + logger.error("Session artifact janitor periodic sweep failed", { + error: error instanceof Error ? error.message : String(error) + }); + }); + }, this.#cleanupIntervalMs); + + logger.info("Session artifact janitor started", { + inactivityTtlMs: this.#inactivityTtlMs, + cleanupIntervalMs: this.#cleanupIntervalMs, + cleanupMaxPerSweep: this.#cleanupMaxPerSweep + }); + } + + async stop(): Promise { + if (!this.#timer) { + return; + } + + clearInterval(this.#timer); + this.#timer = undefined; + } + + async runSweep(reason: string): Promise { + if (!this.#isSweepEnabled()) { + return { + reason, + checkedCount: 0, + cleanedCount: 0, + cleanedSessionKeys: [] + }; + } + + const nowMs = this.#now(); + const sessions = this.#sessions.listSessions(); + const candidates = sessions + .map((session) => this.#getCandidate(session, nowMs)) + .filter((candidate): candidate is SessionArtifactJanitorCandidate => candidate !== null) + .sort((left, right) => left.lastActivityAtMs - right.lastActivityAtMs) + .slice(0, this.#cleanupMaxPerSweep); + + const cleanedSessionKeys: string[] = []; + + for (const candidate of candidates) { + try { + const cleaned = await this.#cleanupCandidate(candidate, nowMs); + if (cleaned) { + cleanedSessionKeys.push(candidate.session.key); + } + } catch (error) { + logger.error("Failed to clean inactive session artifacts", { + sessionKey: candidate.session.key, + error: error instanceof Error ? error.message : String(error) + }); + } + } + + if (cleanedSessionKeys.length > 0 || reason === "startup") { + logger.info("Session artifact janitor sweep finished", { + reason, + checkedCount: sessions.length, + candidateCount: candidates.length, + cleanedCount: cleanedSessionKeys.length, + cleanupMaxPerSweep: this.#cleanupMaxPerSweep, + inactivityTtlMs: this.#inactivityTtlMs + }); + } + + return { + reason, + checkedCount: sessions.length, + cleanedCount: cleanedSessionKeys.length, + cleanedSessionKeys + }; + } + + #isSweepEnabled(): boolean { + return this.#inactivityTtlMs > 0 && this.#cleanupMaxPerSweep > 0; + } + + #isPeriodicSweepEnabled(): boolean { + return this.#isSweepEnabled() && this.#cleanupIntervalMs > 0; + } + + #getCandidate(session: SlackSessionRecord, nowMs: number): SessionArtifactJanitorCandidate | null { + if (session.activeTurnId) { + return null; + } + + const openInboundMessages = this.#sessions.listInboundMessages({ + channelId: session.channelId, + rootThreadTs: session.rootThreadTs, + status: ["pending", "inflight"] + }); + if (openInboundMessages.length > 0) { + return null; + } + + const backgroundJobs = this.#sessions.listBackgroundJobs({ + channelId: session.channelId, + rootThreadTs: session.rootThreadTs + }); + if (backgroundJobs.some((job) => job.status === "registered" || job.status === "running")) { + return null; + } + + const inboundMessages = this.#sessions.listInboundMessages({ + channelId: session.channelId, + rootThreadTs: session.rootThreadTs + }); + const lastActivity = computeLastActivity(session, inboundMessages, backgroundJobs); + if (nowMs - lastActivity.atMs < this.#inactivityTtlMs) { + return null; + } + + return { + session, + lastActivityAt: lastActivity.at, + lastActivityAtMs: lastActivity.atMs + }; + } + + async #cleanupCandidate(candidate: SessionArtifactJanitorCandidate, nowMs: number): Promise { + const session = this.#sessions.getSession(candidate.session.channelId, candidate.session.rootThreadTs); + if (!session) { + return false; + } + + const freshCandidate = this.#getCandidate(session, nowMs); + if (!freshCandidate) { + return false; + } + + const artifactPaths = await findDisposableArtifactPaths(session.workspacePath); + if (artifactPaths.length === 0) { + return false; + } + + for (const artifactPath of artifactPaths) { + await fs.rm(artifactPath, { recursive: true, force: true }); + } + + logger.info("Cleaned inactive session artifacts", { + sessionKey: session.key, + lastActivityAt: freshCandidate.lastActivityAt, + cleanedArtifactCount: artifactPaths.length, + cleanedArtifacts: artifactPaths + }); + + return true; + } +} + +async function findDisposableArtifactPaths(workspacePath: string): Promise { + const artifactPaths = new Set(); + const roots = [workspacePath, ...(await listImmediateChildDirectories(workspacePath))]; + + for (const root of roots) { + const macosRoot = path.join(root, "frontend", "macos"); + const buildRoot = path.join(macosRoot, ".build"); + if (await isDirectory(buildRoot)) { + artifactPaths.add(buildRoot); + } + + const defaultProfraw = path.join(macosRoot, "default.profraw"); + if (await isFile(defaultProfraw)) { + artifactPaths.add(defaultProfraw); + } + + const macosEntries = await safeReadDir(macosRoot); + for (const entry of macosEntries) { + if (!entry.isFile()) { + continue; + } + + if (entry.name.startsWith("xcodebuild") && entry.name.endsWith(".log")) { + artifactPaths.add(path.join(macosRoot, entry.name)); + } + } + } + + return [...artifactPaths].sort(); +} + +async function listImmediateChildDirectories(directoryPath: string): Promise { + const entries = await safeReadDir(directoryPath); + return entries + .filter((entry) => entry.isDirectory()) + .map((entry) => path.join(directoryPath, entry.name)); +} + +async function safeReadDir(directoryPath: string): Promise { + try { + return await fs.readdir(directoryPath, { withFileTypes: true }); + } catch (error) { + if (isMissingPathError(error)) { + return []; + } + + throw error; + } +} + +async function isDirectory(targetPath: string): Promise { + try { + return (await fs.stat(targetPath)).isDirectory(); + } catch (error) { + if (isMissingPathError(error)) { + return false; + } + + throw error; + } +} + +async function isFile(targetPath: string): Promise { + try { + return (await fs.stat(targetPath)).isFile(); + } catch (error) { + if (isMissingPathError(error)) { + return false; + } + + throw error; + } +} + +function isMissingPathError(error: unknown): boolean { + return error instanceof Error && "code" in error && (error as NodeJS.ErrnoException).code === "ENOENT"; +} + +function computeLastActivity( + session: SlackSessionRecord, + inboundMessages: readonly PersistedInboundMessage[], + backgroundJobs: readonly PersistedBackgroundJob[] +): { + readonly at: string; + readonly atMs: number; +} { + const candidates = [ + session.updatedAt, + session.lastSlackReplyAt, + session.lastTurnSignalAt, + ...inboundMessages.flatMap((message) => [message.createdAt, message.updatedAt]), + ...backgroundJobs.flatMap((job) => [ + job.createdAt, + job.startedAt, + job.heartbeatAt, + job.lastEventAt, + job.completedAt, + job.cancelledAt, + job.updatedAt + ]) + ].filter((value): value is string => typeof value === "string" && value.length > 0); + + let latestAt = session.updatedAt; + let latestAtMs = Date.parse(latestAt); + + for (const candidate of candidates) { + const parsed = Date.parse(candidate); + if (!Number.isFinite(parsed)) { + continue; + } + + if (!Number.isFinite(latestAtMs) || parsed > latestAtMs) { + latestAt = candidate; + latestAtMs = parsed; + } + } + + if (!Number.isFinite(latestAtMs)) { + latestAt = new Date(0).toISOString(); + latestAtMs = 0; + } + + return { at: latestAt, atMs: latestAtMs }; +} diff --git a/src/worker-index.ts b/src/worker-index.ts index 8852fac..9ff9b8a 100644 --- a/src/worker-index.ts +++ b/src/worker-index.ts @@ -7,6 +7,7 @@ import { CodexBroker } from "./services/codex/codex-broker.js"; import { IsolatedMcpService } from "./services/codex/isolated-mcp-service.js"; import { GitHubAuthorMappingService } from "./services/github-author-mapping-service.js"; import { JobManager } from "./services/job-manager.js"; +import { SessionArtifactJanitor } from "./services/session-artifact-janitor.js"; import { SessionManager } from "./services/session-manager.js"; import { SlackCodexBridge } from "./services/slack/slack-codex-bridge.js"; import { StateStore } from "./store/state-store.js"; @@ -68,6 +69,12 @@ export async function startWorkerService(): Promise<{ await bridge.acceptBackgroundJobEvent(event); } }); + const sessionArtifactJanitor = new SessionArtifactJanitor({ + sessions: sessionManager, + inactivityTtlMs: config.sessionArtifactInactiveTtlMs, + cleanupIntervalMs: config.sessionArtifactCleanupIntervalMs, + cleanupMaxPerSweep: config.sessionArtifactCleanupMaxPerSweep + }); const server = http.createServer( createHttpHandler({ bridge, @@ -80,11 +87,13 @@ export async function startWorkerService(): Promise<{ try { await bridge.start(); await jobManager.start(); + await sessionArtifactJanitor.start(); await new Promise((resolve, reject) => { server.listen(config.port, config.workerBindHost, () => resolve()); server.once("error", reject); }); } catch (error) { + await sessionArtifactJanitor.stop().catch(() => {}); await jobManager.stop().catch(() => {}); await bridge.stop().catch(() => {}); if (server.listening) { @@ -104,6 +113,7 @@ export async function startWorkerService(): Promise<{ return { stop: async () => { + await sessionArtifactJanitor.stop(); await bridge.stop(); await jobManager.stop(); await new Promise((resolve, reject) => { diff --git a/test/config.test.ts b/test/config.test.ts index 248f036..ff210a0 100644 --- a/test/config.test.ts +++ b/test/config.test.ts @@ -27,6 +27,9 @@ describe("loadConfig", () => { expect(config.slackActiveTurnReconcileIntervalMs).toBe(15_000); expect(config.slackProgressReminderAfterMs).toBe(120_000); expect(config.slackProgressReminderRepeatMs).toBe(120_000); + expect(config.sessionArtifactInactiveTtlMs).toBe(6 * 60 * 60 * 1_000); + expect(config.sessionArtifactCleanupIntervalMs).toBe(60 * 60 * 1_000); + expect(config.sessionArtifactCleanupMaxPerSweep).toBe(20); expect(config.logLevel).toBe("info"); expect(config.logRawSlackEvents).toBe(true); expect(config.logRawCodexRpc).toBe(true); diff --git a/test/session-artifact-janitor.test.ts b/test/session-artifact-janitor.test.ts new file mode 100644 index 0000000..1877d95 --- /dev/null +++ b/test/session-artifact-janitor.test.ts @@ -0,0 +1,135 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { SessionArtifactJanitor } from "../src/services/session-artifact-janitor.js"; +import { SessionManager } from "../src/services/session-manager.js"; +import { StateStore } from "../src/store/state-store.js"; + +describe("SessionArtifactJanitor", () => { + it("removes macOS build artifacts for inactive sessions", async () => { + const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "slack-codex-state-")); + const sessionsRoot = await fs.mkdtemp(path.join(os.tmpdir(), "slack-codex-sessions-")); + const store = new StateStore(stateDir, sessionsRoot); + const manager = new SessionManager({ + stateStore: store, + sessionsRoot + }); + + await manager.load(); + const session = await manager.ensureSession("C123", "111.222"); + const buildRoot = path.join(session.workspacePath, "cueboard", "frontend", "macos", ".build", "DerivedData"); + const profrawPath = path.join(session.workspacePath, "cueboard", "frontend", "macos", "default.profraw"); + const xcodebuildLogPath = path.join(session.workspacePath, "cueboard", "frontend", "macos", "xcodebuild.log"); + await fs.mkdir(buildRoot, { recursive: true }); + await fs.writeFile(path.join(buildRoot, "build.db"), "artifact\n", "utf8"); + await fs.writeFile(profrawPath, "profile\n", "utf8"); + await fs.writeFile(xcodebuildLogPath, "log\n", "utf8"); + + const now = Date.parse("2026-04-08T10:00:00.000Z"); + await store.patchSession(session.key, { + updatedAt: "2026-04-08T01:00:00.000Z", + lastSlackReplyAt: "2026-04-08T01:30:00.000Z" + }); + + const janitor = new SessionArtifactJanitor({ + sessions: manager, + inactivityTtlMs: 60 * 60 * 1_000, + cleanupIntervalMs: 0, + cleanupMaxPerSweep: 10, + now: () => now + }); + + const result = await janitor.runSweep("test"); + + expect(result.cleanedCount).toBe(1); + await expect(fs.stat(path.join(session.workspacePath, "cueboard"))).resolves.toBeTruthy(); + await expect(fs.access(path.join(session.workspacePath, "cueboard", "frontend", "macos", ".build"))).rejects.toBeTruthy(); + await expect(fs.access(profrawPath)).rejects.toBeTruthy(); + await expect(fs.access(xcodebuildLogPath)).rejects.toBeTruthy(); + }); + + it("keeps artifacts for sessions that are still active", async () => { + const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "slack-codex-state-")); + const sessionsRoot = await fs.mkdtemp(path.join(os.tmpdir(), "slack-codex-sessions-")); + const store = new StateStore(stateDir, sessionsRoot); + const manager = new SessionManager({ + stateStore: store, + sessionsRoot + }); + + await manager.load(); + const session = await manager.ensureSession("C123", "111.222"); + const buildRoot = path.join(session.workspacePath, "cueboard", "frontend", "macos", ".build"); + await fs.mkdir(buildRoot, { recursive: true }); + await fs.writeFile(path.join(buildRoot, "build.db"), "artifact\n", "utf8"); + await store.patchSession(session.key, { + updatedAt: "2026-04-08T01:00:00.000Z", + activeTurnId: "turn-1", + activeTurnStartedAt: "2026-04-08T09:30:00.000Z" + }); + + const janitor = new SessionArtifactJanitor({ + sessions: manager, + inactivityTtlMs: 60 * 60 * 1_000, + cleanupIntervalMs: 0, + cleanupMaxPerSweep: 10, + now: () => Date.parse("2026-04-08T10:00:00.000Z") + }); + + const result = await janitor.runSweep("test"); + + expect(result.cleanedCount).toBe(0); + await expect(fs.stat(buildRoot)).resolves.toBeTruthy(); + }); + + it("keeps artifacts while a background job is still running", async () => { + const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "slack-codex-state-")); + const sessionsRoot = await fs.mkdtemp(path.join(os.tmpdir(), "slack-codex-sessions-")); + const store = new StateStore(stateDir, sessionsRoot); + const manager = new SessionManager({ + stateStore: store, + sessionsRoot + }); + + await manager.load(); + const session = await manager.ensureSession("C123", "111.222"); + const buildRoot = path.join(session.workspacePath, "cueboard", "frontend", "macos", ".build"); + await fs.mkdir(buildRoot, { recursive: true }); + await fs.writeFile(path.join(buildRoot, "build.db"), "artifact\n", "utf8"); + await store.patchSession(session.key, { + updatedAt: "2026-04-08T01:00:00.000Z" + }); + await manager.upsertBackgroundJob({ + id: "job-1", + token: "token-1", + sessionKey: session.key, + channelId: session.channelId, + rootThreadTs: session.rootThreadTs, + kind: "watch_ci", + shell: "/bin/bash", + cwd: session.workspacePath, + scriptPath: path.join(session.workspacePath, "job.sh"), + restartOnBoot: false, + status: "running", + createdAt: "2026-04-08T09:00:00.000Z", + updatedAt: "2026-04-08T09:30:00.000Z", + startedAt: "2026-04-08T09:00:00.000Z" + }); + + const janitor = new SessionArtifactJanitor({ + sessions: manager, + inactivityTtlMs: 60 * 60 * 1_000, + cleanupIntervalMs: 0, + cleanupMaxPerSweep: 10, + now: () => Date.parse("2026-04-08T10:00:00.000Z") + }); + + const result = await janitor.runSweep("test"); + + expect(result.cleanedCount).toBe(0); + await expect(fs.stat(buildRoot)).resolves.toBeTruthy(); + }); +});