diff --git a/services/bootloader/index.ts b/services/bootloader/index.ts new file mode 100644 index 0000000..26db0d2 --- /dev/null +++ b/services/bootloader/index.ts @@ -0,0 +1,449 @@ +/** + * Bootloader service — manages VM bootstrapping with selective module loading. + * + * When a lieutenant spins up agent VMs, this service handles the boot flow: + * 1. Prepare boot config (VM DNA: which modules + capabilities) + * 2. Generate boot scripts for different VM types + * 3. Track boot status and report to the VM tree + * + * VM Type Profiles: + * Full agent VM — reef + all core + lieutenant + pi-vers + * Swarm worker — reef + minimal (store, cron) + pi-vers + * Lightweight — reef + minimal, no pi-vers (short-lived haiku sessions) + * Infra VM — reef + core + specific service module + */ + +import { Hono } from "hono"; +import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import type { FleetClient, ServiceContext, ServiceModule } from "../../src/core/types.js"; + +// ============================================================================= +// Types +// ============================================================================= + +export type VMType = "full" | "swarm" | "lightweight" | "infra"; + +export interface BootProfile { + type: VMType; + organs: string[]; + capabilities: string[]; + installPiVers: boolean; + description: string; +} + +export interface BootRequest { + vmId: string; + name: string; + type: VMType; + parentVmId?: string; + extraOrgans?: string[]; + extraCapabilities?: string[]; + roofReefUrl?: string; +} + +export interface BootResult { + vmId: string; + name: string; + type: VMType; + script: string; + profile: BootProfile; +} + +// ============================================================================= +// VM Type profiles +// ============================================================================= + +const PROFILES: Record = { + full: { + type: "full", + organs: ["store", "cron", "lieutenant", "registry", "vm-tree", "vers-config", "docs", "installer"], + capabilities: ["vers-vm", "vers-vm-copy", "vers-swarm", "ssh"], + installPiVers: true, + description: "Full agent VM — all core modules + pi-vers. Can promote to lieutenant.", + }, + swarm: { + type: "swarm", + organs: ["store", "cron"], + capabilities: ["vers-vm", "vers-vm-copy", "vers-swarm"], + installPiVers: true, + description: "Swarm worker — minimal reef + pi-vers for fleet task execution.", + }, + lightweight: { + type: "lightweight", + organs: ["store"], + capabilities: [], + installPiVers: false, + description: "Lightweight worker — minimal reef, no pi-vers. For short-lived sessions.", + }, + infra: { + type: "infra", + organs: ["store", "cron", "docs"], + capabilities: [], + installPiVers: false, + description: "Infra VM — core reef + specific service. For Gitea, MinIO, persistent services.", + }, +}; + +// ============================================================================= +// Boot script generation +// ============================================================================= + +function generateBootScript(req: BootRequest): string { + const profile = { ...PROFILES[req.type] }; + + // Merge extra organs and capabilities + if (req.extraOrgans) { + for (const o of req.extraOrgans) { + if (!profile.organs.includes(o)) profile.organs.push(o); + } + } + if (req.extraCapabilities) { + for (const c of req.extraCapabilities) { + if (!profile.capabilities.includes(c)) profile.capabilities.push(c); + } + } + + const roofUrl = req.roofReefUrl || process.env.VERS_INFRA_URL || "http://localhost:3000"; + + return `#!/bin/bash +# Reef bootloader — auto-generated for ${req.name} (${req.type}) +# VM ID: ${req.vmId} +# Parent: ${req.parentVmId || "none"} +# Profile: ${profile.description} +set -e + +echo "[boot] Starting reef bootstrap for ${req.name} (${req.type})" + +# ===== 1. Fix DNS ===== +echo "nameserver 8.8.8.8" > /etc/resolv.conf + +# ===== 2. Ensure PATH ===== +export PATH="/root/.bun/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:$PATH" + +# ===== 3. Install bun if not present ===== +if ! command -v bun &> /dev/null; then + echo "[boot] Installing bun..." + curl -fsSL https://bun.sh/install | bash + export PATH="/root/.bun/bin:$PATH" +fi + +# ===== 4. Clone/update reef ===== +if [ -d /root/reef ]; then + echo "[boot] Updating existing reef..." + cd /root/reef && git pull --ff-only 2>/dev/null || true +else + echo "[boot] Cloning reef..." + git clone https://github.com/hdresearch/reef.git /root/reef 2>/dev/null || { + echo "[boot] Git clone failed — checking for tarball..." + # Fallback: download tarball from roof reef + curl -sf "${roofUrl}/installer/seeds" > /dev/null && { + echo "[boot] Installing from roof reef..." + mkdir -p /root/reef + cd /root/reef + # Will be populated by installer + } + } +fi + +cd /root/reef + +# ===== 5. Install dependencies ===== +bun install --frozen-lockfile 2>/dev/null || bun install + +# ===== 6. Configure reef DNA ===== +cat > /root/reef/.env << 'ENVEOF' +# Auto-generated by reef bootloader +VERS_VM_ID=${req.vmId} +VERS_AGENT_NAME=${req.name} +VERS_AGENT_ROLE=${req.type === "full" ? "agent" : req.type} +VERS_INFRA_URL=${roofUrl} +REEF_VM_TYPE=${req.type} +REEF_ORGANS=${profile.organs.join(",")} +REEF_CAPABILITIES=${profile.capabilities.join(",")} +${req.parentVmId ? `REEF_PARENT_VM_ID=${req.parentVmId}` : ""} +ENVEOF + +# Source env +set -a; source .env; set +a + +# ===== 7. Selective module loading ===== +# Core services directory is always loaded. +# For non-full profiles, we remove services that aren't in the DNA. +ACTIVE_ORGANS="${profile.organs.join(" ")}" +echo "[boot] Active organs: $ACTIVE_ORGANS" + +${ + req.type !== "full" + ? ` +# Disable non-DNA services by creating .disabled marker +for dir in services/*/; do + svc=$(basename "$dir") + if ! echo "$ACTIVE_ORGANS" | grep -qw "$svc"; then + touch "$dir/.disabled" + echo "[boot] Disabled: $svc" + else + rm -f "$dir/.disabled" 2>/dev/null + echo "[boot] Enabled: $svc" + fi +done +` + : "# Full profile — all services enabled" +} + +${ + profile.installPiVers + ? ` +# ===== 8. Install pi-vers ===== +echo "[boot] Installing pi-vers..." +if [ -d /root/.pi/extensions ]; then + cd /root/.pi/extensions + if [ -d pi-vers ]; then + cd pi-vers && git pull --ff-only 2>/dev/null || true + else + git clone https://github.com/hdresearch/pi-vers.git 2>/dev/null || true + fi +fi +cd /root/reef +` + : "# Skipping pi-vers (not needed for ${req.type} VMs)" +} + +# ===== 9. Register in roof reef's VM tree ===== +echo "[boot] Registering in VM tree..." +curl -sf -X POST "${roofUrl}/vm-tree/vms" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer \${VERS_AUTH_TOKEN:-}" \\ + -d '{ + "vmId": "${req.vmId}", + "name": "${req.name}", + "category": "${req.type === "full" ? "agent_vm" : req.type === "swarm" ? "swarm_vm" : req.type === "lightweight" ? "swarm_vm" : "infra_vm"}", + "parentVmId": ${req.parentVmId ? `"${req.parentVmId}"` : "null"}, + "reefConfig": ${JSON.stringify({ organs: profile.organs, capabilities: profile.capabilities })} + }' 2>/dev/null || echo "[boot] VM tree registration failed (non-fatal)" + +# ===== 10. Start reef via systemd ===== +echo "[boot] Starting reef..." +if systemctl is-system-running &>/dev/null; then + cp /root/reef/scripts/reef.service /etc/systemd/system/reef.service + systemctl daemon-reload + systemctl enable reef + systemctl start reef +else + # No systemd — run directly + nohup bun run src/main.ts > /tmp/reef.log 2>&1 & +fi + +# ===== 11. Health check ===== +echo "[boot] Waiting for reef..." +for i in $(seq 1 30); do + if curl -sf http://localhost:3000/health > /dev/null 2>&1; then + echo "[boot] Reef up in \${i}s" + + # Register in registry + curl -sf -X POST "${roofUrl}/registry/vms" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer \${VERS_AUTH_TOKEN:-}" \\ + -d '{ + "id": "${req.vmId}", + "name": "${req.name}", + "role": "${req.type === "full" ? "worker" : req.type === "infra" ? "infra" : "worker"}", + "address": "${req.vmId}.vm.vers.sh", + "registeredBy": "bootloader", + "reefConfig": ${JSON.stringify({ organs: profile.organs, capabilities: profile.capabilities })} + }' 2>/dev/null || true + + echo "[boot] Bootstrap complete for ${req.name}" + exit 0 + fi + sleep 1 +done + +echo "[boot] Reef failed to start" +tail -20 /tmp/reef.log 2>/dev/null +exit 1 +`; +} + +// ============================================================================= +// Routes +// ============================================================================= + +const routes = new Hono(); + +// GET /profiles — list available VM type profiles +routes.get("/profiles", (c) => { + return c.json({ profiles: PROFILES }); +}); + +// GET /profiles/:type — get a specific profile +routes.get("/profiles/:type", (c) => { + const type = c.req.param("type") as VMType; + const profile = PROFILES[type]; + if (!profile) return c.json({ error: `Unknown VM type: ${type}` }, 404); + return c.json(profile); +}); + +// POST /generate — generate a boot script for a VM +routes.post("/generate", async (c) => { + const body = await c.req.json(); + const { vmId, name, type, parentVmId, extraOrgans, extraCapabilities, roofReefUrl } = body; + + if (!vmId || !name || !type) { + return c.json({ error: "vmId, name, and type are required" }, 400); + } + if (!PROFILES[type as VMType]) { + return c.json({ error: `Unknown VM type: ${type}. Valid: full, swarm, lightweight, infra` }, 400); + } + + const script = generateBootScript({ + vmId, + name, + type: type as VMType, + parentVmId, + extraOrgans, + extraCapabilities, + roofReefUrl, + }); + + const profile = { ...PROFILES[type as VMType] }; + if (extraOrgans) { + for (const o of extraOrgans) { + if (!profile.organs.includes(o)) profile.organs.push(o); + } + } + if (extraCapabilities) { + for (const c of extraCapabilities) { + if (!profile.capabilities.includes(c)) profile.capabilities.push(c); + } + } + + return c.json({ vmId, name, type, profile, script }, 201); +}); + +// GET /script/:type — get a generic boot script for a VM type +routes.get("/script/:type", (c) => { + const type = c.req.param("type") as VMType; + if (!PROFILES[type]) return c.json({ error: `Unknown VM type: ${type}` }, 404); + + const script = generateBootScript({ + vmId: "${VERS_VM_ID}", + name: "${VERS_AGENT_NAME}", + type, + parentVmId: "${REEF_PARENT_VM_ID}", + }); + + return new Response(script, { + headers: { "Content-Type": "text/x-shellscript" }, + }); +}); + +// ============================================================================= +// Module export +// ============================================================================= + +const bootloader: ServiceModule = { + name: "bootloader", + description: "VM bootstrapping — generates boot scripts based on VM DNA profiles", + routes, + + registerTools(pi: ExtensionAPI, client: FleetClient) { + pi.registerTool({ + name: "reef_boot_generate", + label: "Bootloader: Generate Script", + description: + "Generate a boot script to bootstrap reef on a new VM. Configures modules and capabilities based on VM type.", + parameters: Type.Object({ + vmId: Type.String({ description: "VM ID to bootstrap" }), + name: Type.String({ description: "VM name" }), + type: Type.Union( + [Type.Literal("full"), Type.Literal("swarm"), Type.Literal("lightweight"), Type.Literal("infra")], + { description: "VM type profile" }, + ), + parentVmId: Type.Optional(Type.String({ description: "Parent VM ID" })), + extraOrgans: Type.Optional(Type.Array(Type.String(), { description: "Additional service modules" })), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("POST", "/bootloader/generate", params); + return client.ok( + [ + `Boot script generated for "${result.name}" (${result.type})`, + ` Organs: ${result.profile.organs.join(", ")}`, + ` Capabilities: ${result.profile.capabilities.join(", ")}`, + ` Pi-vers: ${result.profile.installPiVers ? "yes" : "no"}`, + ` Script length: ${result.script.length} chars`, + "", + "Use SCP to copy and execute on the VM:", + ` scp boot.sh root@${params.vmId}.vm.vers.sh:/tmp/boot.sh`, + ` ssh root@${params.vmId}.vm.vers.sh 'bash /tmp/boot.sh'`, + ].join("\n"), + { result }, + ); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "reef_boot_profiles", + label: "Bootloader: List Profiles", + description: "List available VM type profiles and their default DNA (modules + capabilities).", + parameters: Type.Object({}), + async execute() { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("GET", "/bootloader/profiles"); + const lines: string[] = []; + for (const [type, profile] of Object.entries(result.profiles) as [string, any][]) { + lines.push( + [ + `${type}:`, + ` ${profile.description}`, + ` Organs: ${profile.organs.join(", ")}`, + ` Capabilities: ${profile.capabilities.join(", ") || "none"}`, + ` Pi-vers: ${profile.installPiVers ? "yes" : "no"}`, + ].join("\n"), + ); + } + return client.ok(lines.join("\n\n"), { profiles: result.profiles }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + }, + + capabilities: ["vm.boot", "vm.provision"], + + routeDocs: { + "GET /profiles": { + summary: "List available VM type profiles", + response: "{ profiles: { full, swarm, lightweight, infra } }", + }, + "GET /profiles/:type": { + summary: "Get a specific VM profile", + params: { type: { type: "string", required: true, description: "full | swarm | lightweight | infra" } }, + }, + "POST /generate": { + summary: "Generate a boot script for a VM", + body: { + vmId: { type: "string", required: true, description: "VM ID" }, + name: { type: "string", required: true, description: "VM name" }, + type: { type: "string", required: true, description: "VM type: full | swarm | lightweight | infra" }, + parentVmId: { type: "string", description: "Parent VM ID" }, + extraOrgans: { type: "string[]", description: "Additional modules to include" }, + }, + response: "{ vmId, name, type, profile, script }", + }, + "GET /script/:type": { + summary: "Get a generic boot script for a VM type (shell script)", + params: { type: { type: "string", required: true } }, + response: "text/x-shellscript", + }, + }, +}; + +export default bootloader; diff --git a/services/lieutenant/index.ts b/services/lieutenant/index.ts new file mode 100644 index 0000000..9cbe7d4 --- /dev/null +++ b/services/lieutenant/index.ts @@ -0,0 +1,157 @@ +/** + * Lieutenant service — persistent, conversational agent sessions. + * + * Unlike ephemeral tasks (one pi process per prompt), lieutenants are + * long-lived agent sessions that persist across tasks, accumulate context, + * and support multi-turn interaction. They can run locally or on Vers VMs. + * + * Tools (8): + * reef_lt_create — Spawn a lieutenant (local or remote) + * reef_lt_send — Send a message (prompt, steer, followUp) + * reef_lt_read — Read current/historical output + * reef_lt_status — Overview of all lieutenants + * reef_lt_pause — Pause a VM lieutenant (preserves state) + * reef_lt_resume — Resume a paused lieutenant + * reef_lt_destroy — Tear down a lieutenant (or all) + * reef_lt_discover — Recover lieutenants from registry + * + * State: data/lieutenants.sqlite (via LieutenantStore) + * Events: lieutenant:created, lieutenant:completed, lieutenant:paused, + * lieutenant:resumed, lieutenant:destroyed + */ + +import { Hono } from "hono"; +import type { FleetClient, ServiceContext, ServiceModule } from "../../src/core/types.js"; +import { ServiceEventBus } from "../../src/core/events.js"; +import { createRoutes } from "./routes.js"; +import { LieutenantRuntime } from "./runtime.js"; +import { LieutenantStore } from "./store.js"; +import { registerTools } from "./tools.js"; + +const store = new LieutenantStore(); + +// Create runtime with a placeholder event bus — will be replaced in init() +let runtime = new LieutenantRuntime({ events: new ServiceEventBus(), store }); +const routes = createRoutes(store, runtime); + +const lieutenant: ServiceModule = { + name: "lieutenant", + description: "Persistent agent sessions — long-lived lieutenants with multi-turn context", + routes, + + init(ctx: ServiceContext) { + // Re-create runtime with the real event bus from the service context + runtime = new LieutenantRuntime({ + events: ctx.events, + store, + }); + // Update route handlers to use the real runtime + // The Hono routes capture `runtime` by reference through the closure in createRoutes, + // but since we re-assigned `runtime` above, we need to patch the routes object. + // Instead, we replace the routes property directly. + (lieutenant as any).routes = createRoutes(store, runtime); + }, + + store: { + flush() { + store.flush(); + }, + async close() { + await runtime.shutdown(); + store.close(); + }, + }, + + registerTools(pi, client: FleetClient) { + registerTools(pi, client); + }, + + widget: { + async getLines(client: FleetClient) { + try { + const res = await client.api<{ lieutenants: any[]; count: number }>("GET", "/lieutenant/lieutenants"); + if (res.count === 0) return []; + const working = res.lieutenants.filter((l) => l.status === "working").length; + const idle = res.lieutenants.filter((l) => l.status === "idle").length; + const paused = res.lieutenants.filter((l) => l.status === "paused").length; + const parts = [`${res.count} LT`]; + if (working) parts.push(`${working} working`); + if (idle) parts.push(`${idle} idle`); + if (paused) parts.push(`${paused} paused`); + return [`Lieutenants: ${parts.join(", ")}`]; + } catch { + return []; + } + }, + }, + + dependencies: ["store"], + capabilities: ["agent.spawn", "agent.communicate", "agent.lifecycle"], + + routeDocs: { + "POST /lieutenants": { + summary: "Create a new lieutenant", + body: { + name: { type: "string", required: true, description: "Lieutenant name" }, + role: { type: "string", required: true, description: "Role description (becomes system prompt context)" }, + local: { type: "boolean", description: "Run locally as subprocess (default: true)" }, + model: { type: "string", description: "Model ID" }, + commitId: { type: "string", description: "Golden image commit ID (remote mode)" }, + }, + response: "The created lieutenant object", + }, + "GET /lieutenants": { + summary: "List all active lieutenants", + query: { status: { type: "string", description: "Filter by status" } }, + response: "{ lieutenants: [...], count }", + }, + "GET /lieutenants/:name": { + summary: "Get a lieutenant by name", + params: { name: { type: "string", required: true, description: "Lieutenant name" } }, + response: "Lieutenant object", + }, + "POST /lieutenants/:name/send": { + summary: "Send a message to a lieutenant", + params: { name: { type: "string", required: true, description: "Lieutenant name" } }, + body: { + message: { type: "string", required: true, description: "Message to send" }, + mode: { type: "string", description: "prompt | steer | followUp" }, + }, + response: "{ sent, mode, note? }", + }, + "GET /lieutenants/:name/read": { + summary: "Read lieutenant output", + params: { name: { type: "string", required: true, description: "Lieutenant name" } }, + query: { + tail: { type: "number", description: "Characters from end" }, + history: { type: "number", description: "Previous responses to include" }, + }, + response: "{ name, status, taskCount, output, ... }", + }, + "POST /lieutenants/:name/pause": { + summary: "Pause a VM lieutenant (preserves state)", + params: { name: { type: "string", required: true, description: "Lieutenant name" } }, + }, + "POST /lieutenants/:name/resume": { + summary: "Resume a paused lieutenant", + params: { name: { type: "string", required: true, description: "Lieutenant name" } }, + }, + "DELETE /lieutenants/:name": { + summary: "Destroy a lieutenant", + params: { name: { type: "string", required: true, description: "Lieutenant name" } }, + }, + "POST /lieutenants/destroy-all": { + summary: "Destroy all lieutenants", + }, + "POST /lieutenants/discover": { + summary: "Discover lieutenants from the registry", + response: "{ results: [...] }", + }, + "GET /_panel": { + summary: "HTML dashboard showing active lieutenants", + response: "text/html", + }, + }, +}; + +export default lieutenant; diff --git a/services/lieutenant/routes.ts b/services/lieutenant/routes.ts new file mode 100644 index 0000000..6ee5cb5 --- /dev/null +++ b/services/lieutenant/routes.ts @@ -0,0 +1,214 @@ +/** + * Lieutenant HTTP routes — management, messaging, status. + */ + +import { Hono } from "hono"; +import type { LieutenantStore, LtStatus } from "./store.js"; +import { ConflictError, NotFoundError, ValidationError } from "./store.js"; +import type { LieutenantRuntime } from "./runtime.js"; + +export function createRoutes(store: LieutenantStore, runtime: LieutenantRuntime): Hono { + const routes = new Hono(); + + // POST /lieutenants — create a new lieutenant + routes.post("/lieutenants", async (c) => { + try { + const body = await c.req.json(); + const { name, role, local, model, commitId } = body; + + if (!name || typeof name !== "string") return c.json({ error: "name is required" }, 400); + if (!role || typeof role !== "string") return c.json({ error: "role is required" }, 400); + + const lt = await runtime.create({ name, role, isLocal: !!local, model, commitId }); + return c.json(lt, 201); + } catch (e) { + if (e instanceof ValidationError) return c.json({ error: e.message }, 400); + if (e instanceof ConflictError) return c.json({ error: e.message }, 409); + throw e; + } + }); + + // GET /lieutenants — list all active lieutenants + routes.get("/lieutenants", (c) => { + const status = c.req.query("status") as LtStatus | undefined; + const lts = store.list(status ? { status } : undefined); + return c.json({ lieutenants: lts, count: lts.length }); + }); + + // GET /lieutenants/:name — get a lieutenant by name + routes.get("/lieutenants/:name", (c) => { + const lt = store.getByName(c.req.param("name")); + if (!lt || lt.status === "destroyed") return c.json({ error: "Lieutenant not found" }, 404); + return c.json(lt); + }); + + // POST /lieutenants/:name/send — send a message to a lieutenant + routes.post("/lieutenants/:name/send", async (c) => { + try { + const body = await c.req.json(); + const { message, mode } = body; + if (!message || typeof message !== "string") return c.json({ error: "message is required" }, 400); + + const result = await runtime.send(c.req.param("name"), message, mode); + return c.json(result); + } catch (e) { + if (e instanceof NotFoundError) return c.json({ error: e.message }, 404); + if (e instanceof ValidationError) return c.json({ error: e.message }, 400); + throw e; + } + }); + + // GET /lieutenants/:name/read — read lieutenant output + routes.get("/lieutenants/:name/read", (c) => { + const name = c.req.param("name"); + const tail = c.req.query("tail") ? parseInt(c.req.query("tail")!, 10) : undefined; + const history = c.req.query("history") ? parseInt(c.req.query("history")!, 10) : undefined; + + const lt = store.getByName(name); + if (!lt || lt.status === "destroyed") return c.json({ error: "Lieutenant not found" }, 404); + + let output = lt.lastOutput || "(no output yet)"; + if (tail && output.length > tail) { + output = `...${output.slice(-tail)}`; + } + + const parts: string[] = []; + if (history && history > 0) { + const count = Math.min(history, lt.outputHistory.length); + const start = lt.outputHistory.length - count; + for (let i = start; i < lt.outputHistory.length; i++) { + parts.push(`=== Response ${i + 1} ===\n${lt.outputHistory[i]}\n`); + } + parts.push(`=== Current (${lt.status}) ===\n${output}`); + } else { + parts.push(output); + } + + return c.json({ + name, + status: lt.status, + taskCount: lt.taskCount, + outputLength: lt.lastOutput.length, + historyCount: lt.outputHistory.length, + output: parts.join("\n"), + }); + }); + + // POST /lieutenants/:name/pause — pause a lieutenant + routes.post("/lieutenants/:name/pause", async (c) => { + try { + const result = await runtime.pause(c.req.param("name")); + return c.json(result); + } catch (e) { + if (e instanceof NotFoundError) return c.json({ error: e.message }, 404); + if (e instanceof ValidationError) return c.json({ error: e.message }, 400); + throw e; + } + }); + + // POST /lieutenants/:name/resume — resume a paused lieutenant + routes.post("/lieutenants/:name/resume", async (c) => { + try { + const result = await runtime.resume(c.req.param("name")); + return c.json(result); + } catch (e) { + if (e instanceof NotFoundError) return c.json({ error: e.message }, 404); + if (e instanceof ValidationError) return c.json({ error: e.message }, 400); + throw e; + } + }); + + // DELETE /lieutenants/:name — destroy a lieutenant + routes.delete("/lieutenants/:name", async (c) => { + try { + const name = c.req.param("name"); + const result = await runtime.destroy(name); + return c.json(result); + } catch (e) { + if (e instanceof NotFoundError) return c.json({ error: e.message }, 404); + throw e; + } + }); + + // POST /lieutenants/destroy-all — destroy all lieutenants + routes.post("/lieutenants/destroy-all", async (c) => { + const results = await runtime.destroyAll(); + return c.json({ results }); + }); + + // POST /lieutenants/discover — discover lieutenants from registry + routes.post("/lieutenants/discover", async (c) => { + const results = await runtime.discover(); + return c.json({ results }); + }); + + // GET /_panel — HTML dashboard + routes.get("/_panel", (c) => { + const lts = store.list(); + const rows = lts + .map((lt) => { + const statusColor = + lt.status === "idle" + ? "#4f9" + : lt.status === "working" + ? "#ff9800" + : lt.status === "paused" + ? "#888" + : lt.status === "error" + ? "#f55" + : "#aaa"; + const icon = + lt.status === "working" + ? "⟳" + : lt.status === "idle" + ? "●" + : lt.status === "paused" + ? "⏸" + : lt.status === "error" + ? "✗" + : "○"; + const location = lt.isLocal ? "local" : `VM: ${lt.vmId.slice(0, 12)}`; + return ` + ${icon} ${lt.name} + ${lt.role.slice(0, 50)} + ${lt.status} + ${location} + ${lt.taskCount} + ${lt.lastActivityAt ? new Date(lt.lastActivityAt).toLocaleTimeString() : "---"} + `; + }) + .join("\n"); + + const html = ` + + + Lieutenant Dashboard + + + +

Lieutenants

+

${lts.length} lieutenant${lts.length !== 1 ? "s" : ""} active

+ + + + + + ${rows || ''} + +
NameRoleStatusLocationTasksLast Active
No lieutenants active
+ +`; + + return c.html(html); + }); + + return routes; +} diff --git a/services/lieutenant/rpc.ts b/services/lieutenant/rpc.ts new file mode 100644 index 0000000..496b811 --- /dev/null +++ b/services/lieutenant/rpc.ts @@ -0,0 +1,204 @@ +/** + * RPC agent management — spawns and manages pi processes for lieutenants. + * + * Two modes: + * - Local: pi child process on the same machine (no VM required) + * - Remote: pi on a Vers VM via SSH + RPC (FIFOs + tmux) + * + * Each lieutenant gets its own long-lived pi process that persists across tasks. + */ + +import { type ChildProcess, spawn } from "node:child_process"; +import { mkdirSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { homedir } from "node:os"; + +// ============================================================================= +// Types +// ============================================================================= + +export interface RpcHandle { + send: (cmd: object) => void; + onEvent: (handler: (event: any) => void) => void; + kill: () => Promise; + vmId: string; + isAlive: () => boolean; +} + +export interface LocalRpcOptions { + systemPrompt?: string; + model?: string; + cwd?: string; +} + +// ============================================================================= +// Local RPC — pi subprocess on the same machine +// ============================================================================= + +export async function startLocalRpcAgent(name: string, opts: LocalRpcOptions): Promise { + const ltDir = join(homedir(), ".reef", "lieutenants", name); + const workDir = opts.cwd || join(ltDir, "workspace"); + const sessionDir = join(ltDir, "sessions"); + + if (!existsSync(workDir)) mkdirSync(workDir, { recursive: true }); + if (!existsSync(sessionDir)) mkdirSync(sessionDir, { recursive: true }); + + // Write system prompt file if provided + if (opts.systemPrompt) { + const promptPath = join(ltDir, "system-prompt.md"); + await Bun.write(promptPath, opts.systemPrompt); + } + + const piPath = process.env.PI_PATH ?? "pi"; + const args = ["--mode", "rpc", "--no-session"]; + + if (opts.systemPrompt) { + args.push("--append-system-prompt", opts.systemPrompt); + } + + const env: Record = { ...(process.env as Record) }; + if (opts.model) { + env.PI_MODEL = opts.model; + } + + const child: ChildProcess = spawn(piPath, args, { + cwd: workDir, + env, + stdio: ["pipe", "pipe", "pipe"], + }); + + if (!child.stdin || !child.stdout || !child.stderr) { + throw new Error(`Failed to spawn pi process for lieutenant "${name}"`); + } + + let eventHandler: ((event: any) => void) | undefined; + let killed = false; + let lineBuf = ""; + + child.stdout.on("data", (data: Buffer) => { + lineBuf += data.toString(); + const lines = lineBuf.split("\n"); + lineBuf = lines.pop() || ""; + for (const line of lines) { + if (!line.trim()) continue; + try { + const event = JSON.parse(line); + if (eventHandler) eventHandler(event); + } catch { + /* not JSON */ + } + } + }); + + child.stderr.on("data", (data: Buffer) => { + const msg = data.toString().trim(); + if (msg) console.error(` [lt:${name}] ${msg}`); + }); + + child.on("exit", (code) => { + if (!killed) { + console.error(` [lt:${name}] pi exited with code ${code}`); + } + }); + + // Wait briefly for process to start + await new Promise((resolve) => setTimeout(resolve, 500)); + if (child.exitCode !== null) { + throw new Error(`Pi process for "${name}" exited immediately (code ${child.exitCode})`); + } + + function send(cmd: object) { + if (killed || !child.stdin || child.exitCode !== null) return; + try { + child.stdin.write(`${JSON.stringify(cmd)}\n`); + } catch (err) { + console.error(` [lt:${name}] send failed: ${err instanceof Error ? err.message : err}`); + } + } + + async function kill() { + killed = true; + if (child.exitCode === null) { + child.kill("SIGTERM"); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + try { + child.kill("SIGKILL"); + } catch { + /* ignore */ + } + resolve(); + }, 3000); + child.on("exit", () => { + clearTimeout(timeout); + resolve(); + }); + }); + } + } + + return { + send, + onEvent: (h) => { + eventHandler = h; + }, + kill, + vmId: `local-${name}`, + isAlive: () => !killed && child.exitCode === null, + }; +} + +// ============================================================================= +// RPC readiness check — poll for pi to be ready to accept prompts +// ============================================================================= + +export function waitForRpcReady(handle: RpcHandle, timeoutMs = 30000): Promise { + return new Promise((resolve) => { + let resolved = false; + const timeout = setTimeout(() => { + if (!resolved) { + resolved = true; + resolve(false); + } + }, timeoutMs); + + const originalHandler = handle.onEvent; + handle.onEvent((event) => { + if (!resolved && event.type === "response" && event.command === "get_state") { + resolved = true; + clearTimeout(timeout); + resolve(true); + } + }); + + let attempts = 0; + const trySend = () => { + if (resolved || attempts > 10) return; + attempts++; + handle.send({ id: "startup-check", type: "get_state" }); + setTimeout(trySend, 2000); + }; + setTimeout(trySend, 1000); + }); +} + +// ============================================================================= +// Build system prompt for a lieutenant +// ============================================================================= + +export function buildSystemPrompt(name: string, role: string): string { + return [ + `You are a lieutenant agent named "${name}".`, + `Your role: ${role}`, + "", + "You are a persistent, long-lived agent session managed by a coordinator.", + "You accumulate context across multiple tasks. When given a new task,", + "you have full memory of previous work in this session.", + "", + "You have access to all available tools including file operations, bash, and", + "any extensions loaded on this machine.", + "", + "When you complete a task, end with a clear summary of what was done", + "and any open questions or next steps.", + ].join("\n"); +} diff --git a/services/lieutenant/runtime.ts b/services/lieutenant/runtime.ts new file mode 100644 index 0000000..7335872 --- /dev/null +++ b/services/lieutenant/runtime.ts @@ -0,0 +1,318 @@ +/** + * Lieutenant runtime — manages the lifecycle of lieutenant pi processes. + * + * Bridges the store (persistent state) with RPC handles (live processes). + * Handles creation, messaging, pause/resume, destruction, and discovery. + */ + +import type { ServiceEventBus } from "../../src/core/events.js"; +import type { RpcHandle } from "./rpc.js"; +import { buildSystemPrompt, startLocalRpcAgent, waitForRpcReady } from "./rpc.js"; +import type { LieutenantStore, Lieutenant } from "./store.js"; +import { NotFoundError, ValidationError } from "./store.js"; + +export interface LieutenantRuntimeOptions { + events: ServiceEventBus; + store: LieutenantStore; +} + +export class LieutenantRuntime { + private handles = new Map(); + private events: ServiceEventBus; + private store: LieutenantStore; + + constructor(opts: LieutenantRuntimeOptions) { + this.events = opts.events; + this.store = opts.store; + } + + // ========================================================================= + // Create + // ========================================================================= + + async create(params: { + name: string; + role: string; + isLocal?: boolean; + model?: string; + commitId?: string; + }): Promise { + const { name, role, isLocal = true, model } = params; + const systemPrompt = buildSystemPrompt(name, role); + + // Create DB record + const lt = this.store.create({ + name, + role, + isLocal, + systemPrompt, + model, + parentAgent: process.env.VERS_AGENT_NAME, + }); + + if (isLocal) { + // Spawn local pi process + const handle = await startLocalRpcAgent(name, { systemPrompt, model }); + this.handles.set(name, handle); + + // Wait for RPC readiness + const ready = await waitForRpcReady(handle); + if (!ready) { + await handle.kill(); + this.handles.delete(name); + this.store.destroy(name); + throw new Error(`Local pi RPC failed to start for "${name}"`); + } + + // Set model if specified + if (model) { + handle.send({ type: "set_model", provider: "anthropic", modelId: model }); + } + + this.store.update(name, { status: "idle" }); + this.installEventHandler(name); + + this.events.fire("lieutenant:created", { name, vmId: lt.vmId, role, isLocal: true }); + } else { + // Remote mode — VM-based lieutenant + // The actual VM provisioning is handled by pi-vers (vers-vm.ts). + // Reef stores the state; the caller is responsible for providing vmId + // after VM creation and starting the RPC connection. + this.store.update(name, { status: "idle" }); + this.events.fire("lieutenant:created", { + name, + vmId: lt.vmId, + role, + isLocal: false, + commitId: params.commitId, + }); + } + + return this.store.getByName(name)!; + } + + // ========================================================================= + // Send message + // ========================================================================= + + async send( + name: string, + message: string, + mode?: "prompt" | "steer" | "followUp", + ): Promise<{ sent: boolean; mode: string; note?: string }> { + const lt = this.store.getByName(name); + if (!lt) throw new NotFoundError(`Lieutenant '${name}' not found`); + if (lt.status === "paused") throw new ValidationError(`Lieutenant '${name}' is paused. Resume it first.`); + if (lt.status === "destroyed") throw new NotFoundError(`Lieutenant '${name}' has been destroyed`); + + const handle = this.handles.get(name); + if (!handle || !handle.isAlive()) { + throw new ValidationError(`No active RPC connection for '${name}'`); + } + + let actualMode = mode || "prompt"; + let note: string | undefined; + + // Auto-select mode based on lieutenant state + if (lt.status === "working" && actualMode === "prompt") { + actualMode = "followUp"; + note = "auto-queued as follow-up since lieutenant is working"; + } + + if (actualMode === "prompt") { + this.store.update(name, { taskCount: lt.taskCount + 1 }); + this.store.update(name, { lastOutput: "" }); + handle.send({ type: "prompt", message }); + } else if (actualMode === "steer") { + handle.send({ type: "steer", message }); + } else if (actualMode === "followUp") { + handle.send({ type: "follow_up", message }); + } + + this.store.update(name, { lastActivityAt: new Date().toISOString() }); + + return { sent: true, mode: actualMode, note }; + } + + // ========================================================================= + // Pause / Resume + // ========================================================================= + + async pause(name: string): Promise<{ paused: boolean }> { + const lt = this.store.getByName(name); + if (!lt) throw new NotFoundError(`Lieutenant '${name}' not found`); + if (lt.isLocal) throw new ValidationError(`Lieutenant '${name}' is local — pause/resume requires a VM`); + if (lt.status === "paused") return { paused: true }; + if (lt.status === "working") { + throw new ValidationError(`Lieutenant '${name}' is working. Wait for it to finish or steer it first.`); + } + + // For remote VMs, the actual pause is done via Vers API (pi-vers handles this). + // We just update state here. + this.store.update(name, { status: "paused" }); + this.events.fire("lieutenant:paused", { name, vmId: lt.vmId }); + return { paused: true }; + } + + async resume(name: string): Promise<{ resumed: boolean }> { + const lt = this.store.getByName(name); + if (!lt) throw new NotFoundError(`Lieutenant '${name}' not found`); + if (lt.isLocal) throw new ValidationError(`Lieutenant '${name}' is local — pause/resume requires a VM`); + if (lt.status !== "paused") { + throw new ValidationError(`Lieutenant '${name}' is not paused (status: ${lt.status})`); + } + + // For remote VMs, the actual resume is done via Vers API (pi-vers handles this). + // We update state and reconnect RPC. + this.store.update(name, { status: "idle" }); + this.events.fire("lieutenant:resumed", { name, vmId: lt.vmId }); + return { resumed: true }; + } + + // ========================================================================= + // Destroy + // ========================================================================= + + async destroy(name: string): Promise<{ destroyed: boolean; detail: string }> { + const lt = this.store.getByName(name); + if (!lt) throw new NotFoundError(`Lieutenant '${name}' not found`); + + // Kill RPC handle + const handle = this.handles.get(name); + if (handle) { + try { + await handle.kill(); + } catch { + /* ignore */ + } + this.handles.delete(name); + } + + this.store.destroy(name); + this.events.fire("lieutenant:destroyed", { name, vmId: lt.vmId, isLocal: lt.isLocal }); + + const detail = lt.isLocal + ? `${name}: destroyed (local, ${lt.taskCount} tasks completed)` + : `${name}: destroyed (VM ${lt.vmId.slice(0, 12)}, ${lt.taskCount} tasks completed)`; + + return { destroyed: true, detail }; + } + + async destroyAll(): Promise { + const names = this.store.names(); + const results: string[] = []; + for (const name of names) { + try { + const result = await this.destroy(name); + results.push(result.detail); + } catch (e) { + results.push(`${name}: failed — ${e instanceof Error ? e.message : String(e)}`); + } + } + return results; + } + + // ========================================================================= + // Discover — reconnect to lieutenants from registry + // ========================================================================= + + async discover(): Promise { + // Check registry for lieutenant VMs + const infraUrl = process.env.VERS_INFRA_URL; + const authToken = process.env.VERS_AUTH_TOKEN; + if (!infraUrl || !authToken) return ["No VERS_INFRA_URL or VERS_AUTH_TOKEN configured"]; + + try { + const res = await fetch(`${infraUrl}/registry/vms?role=lieutenant`, { + headers: { Authorization: `Bearer ${authToken}` }, + }); + if (!res.ok) return [`Registry query failed: HTTP ${res.status}`]; + + const data = (await res.json()) as { vms: any[]; count: number }; + const results: string[] = []; + + for (const vm of data.vms || []) { + const name = vm.metadata?.agentId || vm.name; + if (this.store.getByName(name)?.status !== "destroyed" && this.store.getByName(name)) { + results.push(`${name}: already tracked`); + continue; + } + + // Re-create as a tracked lieutenant + this.store.create({ + name, + role: vm.metadata?.role || "recovered lieutenant", + vmId: vm.id, + isLocal: false, + }); + this.store.update(name, { status: vm.status === "paused" ? "paused" : "idle" }); + results.push(`${name}: discovered (VM ${vm.id.slice(0, 12)}, ${vm.status})`); + } + + return results.length > 0 ? results : ["No lieutenants found in registry"]; + } catch (e) { + return [`Discovery failed: ${e instanceof Error ? e.message : String(e)}`]; + } + } + + // ========================================================================= + // Event handler installation — wires pi events to store updates + // ========================================================================= + + private installEventHandler(name: string): void { + const handle = this.handles.get(name); + if (!handle) return; + + handle.onEvent((event) => { + const lt = this.store.getByName(name); + if (!lt) return; + + if (event.type === "agent_start") { + this.store.update(name, { status: "working" }); + this.store.update(name, { lastOutput: "" }); + } else if (event.type === "agent_end") { + // Rotate output to history before clearing + this.store.rotateOutput(name); + this.store.update(name, { status: "idle" }); + + // Broadcast completion + const rawOutput = lt.lastOutput.trim(); + const summary = rawOutput.length > 200 ? `...${rawOutput.slice(-200)}` : rawOutput; + const hasError = /\b(error|failed|exception|fatal)\b/i.test(rawOutput.slice(-500)); + + this.events.fire("lieutenant:completed", { + name: lt.name, + role: lt.role, + status: hasError ? "error" : "success", + summary, + taskCount: lt.taskCount, + }); + } else if (event.type === "message_update" && event.assistantMessageEvent?.type === "text_delta") { + this.store.appendOutput(name, event.assistantMessageEvent.delta); + } + }); + } + + // ========================================================================= + // Accessors + // ========================================================================= + + getHandle(name: string): RpcHandle | undefined { + return this.handles.get(name); + } + + hasHandle(name: string): boolean { + return this.handles.has(name); + } + + async shutdown(): Promise { + for (const [name, handle] of this.handles) { + try { + await handle.kill(); + } catch { + /* ignore */ + } + } + this.handles.clear(); + } +} diff --git a/services/lieutenant/store.ts b/services/lieutenant/store.ts new file mode 100644 index 0000000..31000e4 --- /dev/null +++ b/services/lieutenant/store.ts @@ -0,0 +1,302 @@ +/** + * Lieutenant store — persistent agent session state backed by SQLite. + * + * Tracks lieutenant lifecycle: creation, messaging, pause/resume, destruction. + * Each lieutenant is a persistent agent session running on a VM or locally. + */ + +import { Database } from "bun:sqlite"; +import { existsSync, mkdirSync } from "node:fs"; +import { dirname } from "node:path"; +import { ulid } from "ulid"; + +// ============================================================================= +// Types +// ============================================================================= + +export type LtStatus = "starting" | "idle" | "working" | "paused" | "error" | "destroyed"; + +export interface Lieutenant { + id: string; + name: string; + role: string; + vmId: string; + isLocal: boolean; + status: LtStatus; + lastOutput: string; + outputHistory: string[]; + taskCount: number; + createdAt: string; + lastActivityAt: string; + systemPrompt?: string; + model?: string; + parentAgent?: string; +} + +export interface CreateInput { + name: string; + role: string; + vmId?: string; + isLocal?: boolean; + systemPrompt?: string; + model?: string; + parentAgent?: string; +} + +export interface UpdateInput { + status?: LtStatus; + lastOutput?: string; + taskCount?: number; + lastActivityAt?: string; + vmId?: string; +} + +// ============================================================================= +// Errors +// ============================================================================= + +export class NotFoundError extends Error { + constructor(message: string) { + super(message); + this.name = "NotFoundError"; + } +} + +export class ValidationError extends Error { + constructor(message: string) { + super(message); + this.name = "ValidationError"; + } +} + +export class ConflictError extends Error { + constructor(message: string) { + super(message); + this.name = "ConflictError"; + } +} + +// ============================================================================= +// Constants +// ============================================================================= + +const VALID_STATUSES = new Set(["starting", "idle", "working", "paused", "error", "destroyed"]); +const MAX_OUTPUT_HISTORY = 20; + +// ============================================================================= +// Store +// ============================================================================= + +export class LieutenantStore { + private db: Database; + + constructor(dbPath = "data/lieutenants.sqlite") { + const dir = dirname(dbPath); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + + this.db = new Database(dbPath); + this.db.exec("PRAGMA journal_mode=WAL"); + this.initTables(); + } + + private initTables(): void { + this.db.exec(` + CREATE TABLE IF NOT EXISTS lieutenants ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + role TEXT NOT NULL, + vm_id TEXT NOT NULL, + is_local INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'starting', + last_output TEXT NOT NULL DEFAULT '', + output_history TEXT NOT NULL DEFAULT '[]', + task_count INTEGER NOT NULL DEFAULT 0, + system_prompt TEXT, + model TEXT, + parent_agent TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + last_activity_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + `); + + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_lt_status ON lieutenants(status)`); + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_lt_vm_id ON lieutenants(vm_id)`); + } + + create(input: CreateInput): Lieutenant { + if (!input.name?.trim()) throw new ValidationError("name is required"); + if (!input.role?.trim()) throw new ValidationError("role is required"); + + const existing = this.getByName(input.name); + if (existing && existing.status !== "destroyed") { + throw new ConflictError(`Lieutenant '${input.name}' already exists. Destroy it first or use a different name.`); + } + + // If re-creating over a destroyed entry, remove it first + if (existing) { + this.db.run("DELETE FROM lieutenants WHERE name = ?", [input.name]); + } + + const id = ulid(); + const now = new Date().toISOString(); + const vmId = input.vmId || (input.isLocal ? `local-${input.name}` : ""); + + this.db.run( + `INSERT INTO lieutenants (id, name, role, vm_id, is_local, status, system_prompt, model, parent_agent, created_at, last_activity_at) + VALUES (?, ?, ?, ?, ?, 'starting', ?, ?, ?, ?, ?)`, + [ + id, + input.name.trim(), + input.role.trim(), + vmId, + input.isLocal ? 1 : 0, + input.systemPrompt || null, + input.model || null, + input.parentAgent || null, + now, + now, + ], + ); + + return this.get(id)!; + } + + get(id: string): Lieutenant | undefined { + const row = this.db.query("SELECT * FROM lieutenants WHERE id = ?").get(id) as any; + return row ? rowToLieutenant(row) : undefined; + } + + getByName(name: string): Lieutenant | undefined { + const row = this.db.query("SELECT * FROM lieutenants WHERE name = ?").get(name) as any; + return row ? rowToLieutenant(row) : undefined; + } + + list(filters?: { status?: LtStatus; isLocal?: boolean }): Lieutenant[] { + let sql = "SELECT * FROM lieutenants WHERE status != 'destroyed'"; + const params: any[] = []; + + if (filters?.status) { + sql += " AND status = ?"; + params.push(filters.status); + } + if (filters?.isLocal !== undefined) { + sql += " AND is_local = ?"; + params.push(filters.isLocal ? 1 : 0); + } + + sql += " ORDER BY created_at DESC"; + + return this.db + .query(sql) + .all(...params) + .map(rowToLieutenant); + } + + update(name: string, input: UpdateInput): Lieutenant { + const lt = this.getByName(name); + if (!lt) throw new NotFoundError(`Lieutenant '${name}' not found`); + + if (input.status !== undefined && !VALID_STATUSES.has(input.status)) { + throw new ValidationError(`invalid status: ${input.status}`); + } + + const sets: string[] = []; + const params: any[] = []; + + if (input.status !== undefined) { + sets.push("status = ?"); + params.push(input.status); + } + if (input.lastOutput !== undefined) { + sets.push("last_output = ?"); + params.push(input.lastOutput); + } + if (input.taskCount !== undefined) { + sets.push("task_count = ?"); + params.push(input.taskCount); + } + if (input.vmId !== undefined) { + sets.push("vm_id = ?"); + params.push(input.vmId); + } + + sets.push("last_activity_at = ?"); + params.push(input.lastActivityAt || new Date().toISOString()); + + if (sets.length > 0) { + params.push(name); + this.db.run(`UPDATE lieutenants SET ${sets.join(", ")} WHERE name = ?`, params); + } + + return this.getByName(name)!; + } + + appendOutput(name: string, delta: string): void { + this.db.run( + "UPDATE lieutenants SET last_output = last_output || ?, last_activity_at = ? WHERE name = ?", + [delta, new Date().toISOString(), name], + ); + } + + /** Push current lastOutput to history and reset it */ + rotateOutput(name: string): void { + const lt = this.getByName(name); + if (!lt || !lt.lastOutput.trim()) return; + + const history = [...lt.outputHistory, lt.lastOutput]; + if (history.length > MAX_OUTPUT_HISTORY) history.shift(); + + this.db.run("UPDATE lieutenants SET output_history = ?, last_output = '' WHERE name = ?", [ + JSON.stringify(history), + name, + ]); + } + + destroy(name: string): boolean { + const lt = this.getByName(name); + if (!lt) return false; + + this.db.run("UPDATE lieutenants SET status = 'destroyed', last_activity_at = ? WHERE name = ?", [ + new Date().toISOString(), + name, + ]); + return true; + } + + names(): string[] { + const rows = this.db.query("SELECT name FROM lieutenants WHERE status != 'destroyed'").all() as any[]; + return rows.map((r) => r.name); + } + + count(): number { + const row = this.db.query("SELECT COUNT(*) as c FROM lieutenants WHERE status != 'destroyed'").get() as any; + return row?.c || 0; + } + + flush(): void { + // WAL mode handles this — no-op + } + + close(): void { + this.db.close(); + } +} + +function rowToLieutenant(row: any): Lieutenant { + return { + id: row.id, + name: row.name, + role: row.role, + vmId: row.vm_id, + isLocal: !!row.is_local, + status: row.status, + lastOutput: row.last_output || "", + outputHistory: JSON.parse(row.output_history || "[]"), + taskCount: row.task_count, + createdAt: row.created_at, + lastActivityAt: row.last_activity_at, + systemPrompt: row.system_prompt || undefined, + model: row.model || undefined, + parentAgent: row.parent_agent || undefined, + }; +} diff --git a/services/lieutenant/tools.ts b/services/lieutenant/tools.ts new file mode 100644 index 0000000..cece008 --- /dev/null +++ b/services/lieutenant/tools.ts @@ -0,0 +1,241 @@ +/** + * Lieutenant agent tools — the 8 tools agents use to manage lieutenants. + * + * Ports: vers_lt_create, vers_lt_send, vers_lt_read, vers_lt_status, + * vers_lt_pause, vers_lt_resume, vers_lt_destroy, vers_lt_discover + */ + +import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import type { FleetClient } from "../../src/core/types.js"; + +export function registerTools(pi: ExtensionAPI, client: FleetClient) { + // --- reef_lt_create --- + pi.registerTool({ + name: "reef_lt_create", + label: "Create Lieutenant", + description: [ + "Spawn a persistent agent session (lieutenant).", + "Lieutenants persist across tasks, accumulate context, and support multi-turn interaction.", + "Set local=true to run as a local subprocess (no VM). Remote mode requires a Vers VM.", + ].join(" "), + parameters: Type.Object({ + name: Type.String({ description: "Short name for this lieutenant (e.g., 'infra', 'billing')" }), + role: Type.String({ description: "Role description — becomes the lieutenant's system prompt context" }), + local: Type.Optional(Type.Boolean({ description: "Run locally as a subprocess instead of on a VM (default: true)" })), + model: Type.Optional(Type.String({ description: "Model ID (e.g., claude-sonnet-4-20250514)" })), + commitId: Type.Optional(Type.String({ description: "Golden image commit ID for VM creation (remote mode only)" })), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("POST", "/lieutenant/lieutenants", { + name: params.name, + role: params.role, + local: params.local ?? true, + model: params.model, + commitId: params.commitId, + }); + const loc = result.isLocal ? "[local]" : `[VM: ${result.vmId}]`; + return client.ok( + [`Lieutenant "${result.name}" is ready ${loc}.`, ` Role: ${result.role}`, ` Status: ${result.status}`].join( + "\n", + ), + { lieutenant: result }, + ); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + // --- reef_lt_send --- + pi.registerTool({ + name: "reef_lt_send", + label: "Send to Lieutenant", + description: [ + "Send a message to a lieutenant. Modes:", + " 'prompt' (default when idle) — start a new task", + " 'steer' — interrupt current work and redirect", + " 'followUp' — queue message for after current task finishes", + ].join("\n"), + parameters: Type.Object({ + name: Type.String({ description: "Lieutenant name" }), + message: Type.String({ description: "The message to send" }), + mode: Type.Optional( + Type.Union([Type.Literal("prompt"), Type.Literal("steer"), Type.Literal("followUp")], { + description: "Message mode (default: prompt, auto-selects followUp if busy)", + }), + ), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("POST", `/lieutenant/lieutenants/${encodeURIComponent(params.name)}/send`, { + message: params.message, + mode: params.mode, + }); + const msg = params.message; + const preview = msg.length > 120 ? `${msg.slice(0, 120)}...` : msg; + const note = result.note ? ` (${result.note})` : ""; + return client.ok(`Sent to ${params.name} (${result.mode})${note}: "${preview}"`); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + // --- reef_lt_read --- + pi.registerTool({ + name: "reef_lt_read", + label: "Read Lieutenant Output", + description: "Read output from a lieutenant. Shows current response if working, or last completed response if idle.", + parameters: Type.Object({ + name: Type.String({ description: "Lieutenant name" }), + tail: Type.Optional(Type.Number({ description: "Characters from end (default: all)" })), + history: Type.Optional(Type.Number({ description: "Number of previous responses to include (default: 0, max: 20)" })), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const qs = new URLSearchParams(); + if (params.tail) qs.set("tail", String(params.tail)); + if (params.history) qs.set("history", String(params.history)); + const query = qs.toString(); + const result = await client.api( + "GET", + `/lieutenant/lieutenants/${encodeURIComponent(params.name)}/read${query ? `?${query}` : ""}`, + ); + return client.ok(`[${result.name}] (${result.status}, ${result.taskCount} tasks):\n\n${result.output}`, { + name: result.name, + status: result.status, + taskCount: result.taskCount, + outputLength: result.outputLength, + historyCount: result.historyCount, + }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + // --- reef_lt_status --- + pi.registerTool({ + name: "reef_lt_status", + label: "Lieutenant Status", + description: "Overview of all lieutenants: status, role, task count, last activity.", + parameters: Type.Object({}), + async execute() { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("GET", "/lieutenant/lieutenants"); + if (result.count === 0) { + return client.ok("No lieutenants active."); + } + const lines = result.lieutenants.map((lt: any) => { + const icon = + lt.status === "working" + ? "~" + : lt.status === "idle" + ? "*" + : lt.status === "paused" + ? "||" + : lt.status === "error" + ? "X" + : "o"; + const location = lt.isLocal ? "local" : `VM: ${lt.vmId.slice(0, 12)}`; + return [ + `${icon} ${lt.name} [${lt.status}]`, + ` Role: ${lt.role}`, + ` ${location}`, + ` Tasks: ${lt.taskCount}`, + ` Last active: ${lt.lastActivityAt}`, + ].join("\n"); + }); + return client.ok(lines.join("\n\n"), { lieutenants: result.lieutenants }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + // --- reef_lt_pause --- + pi.registerTool({ + name: "reef_lt_pause", + label: "Pause Lieutenant", + description: + "Pause a lieutenant's VM. Preserves full state (memory + disk). Can be resumed later. Only works for remote (VM) lieutenants.", + parameters: Type.Object({ + name: Type.String({ description: "Lieutenant name" }), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + await client.api("POST", `/lieutenant/lieutenants/${encodeURIComponent(params.name)}/pause`); + return client.ok(`Lieutenant "${params.name}" paused. Use reef_lt_resume to wake it.`); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + // --- reef_lt_resume --- + pi.registerTool({ + name: "reef_lt_resume", + label: "Resume Lieutenant", + description: "Resume a paused lieutenant. VM resumes from exact state including the pi session.", + parameters: Type.Object({ + name: Type.String({ description: "Lieutenant name" }), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + await client.api("POST", `/lieutenant/lieutenants/${encodeURIComponent(params.name)}/resume`); + return client.ok(`Lieutenant "${params.name}" resumed. Ready for tasks.`); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + // --- reef_lt_destroy --- + pi.registerTool({ + name: "reef_lt_destroy", + label: "Destroy Lieutenant", + description: "Tear down a lieutenant — kills pi process, removes from tracking. Pass name='*' to destroy all.", + parameters: Type.Object({ + name: Type.String({ description: "Lieutenant name, or '*' for all" }), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + if (params.name === "*") { + const result = await client.api("POST", "/lieutenant/lieutenants/destroy-all"); + return client.ok(result.results.join("\n") || "No lieutenants to destroy."); + } + const result = await client.api("DELETE", `/lieutenant/lieutenants/${encodeURIComponent(params.name)}`); + return client.ok(result.detail); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + // --- reef_lt_discover --- + pi.registerTool({ + name: "reef_lt_discover", + label: "Discover Lieutenants", + description: + "Discover running lieutenants from the registry and reconnect to them. Use after session restart to recover lieutenant state.", + parameters: Type.Object({}), + async execute() { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("POST", "/lieutenant/lieutenants/discover"); + return client.ok(`Discovery results:\n${result.results.join("\n")}`); + } catch (e: any) { + return client.err(e.message); + } + }, + }); +} diff --git a/services/registry/behaviors.ts b/services/registry/behaviors.ts new file mode 100644 index 0000000..838d6b5 --- /dev/null +++ b/services/registry/behaviors.ts @@ -0,0 +1,145 @@ +/** + * Registry behaviors — auto-registration, heartbeat, lifecycle event handling. + */ + +import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import type { FleetClient } from "../../src/core/types.js"; + +export function registerBehaviors(pi: ExtensionAPI, client: FleetClient) { + let heartbeatTimer: ReturnType | null = null; + + // Auto-register this VM on agent start + pi.on("agent_start", async () => { + if (!client.getBaseUrl() || !client.vmId) return; + + try { + await client.api("POST", "/registry/vms", { + id: client.vmId, + name: client.agentName, + role: client.agentRole, + address: `${client.vmId}.vm.vers.sh`, + registeredBy: client.agentName, + metadata: { pid: process.pid, startedAt: new Date().toISOString() }, + }); + } catch { + // Might already exist — try update instead + try { + await client.api("PATCH", `/registry/vms/${client.vmId}`, { + name: client.agentName, + status: "running", + }); + } catch { + /* best-effort */ + } + } + }); + + // Mark stopped on agent end + pi.on("agent_end", async () => { + if (!client.getBaseUrl() || !client.vmId) return; + try { + await client.api("PATCH", `/registry/vms/${client.vmId}`, { status: "stopped" }); + } catch { + /* best-effort */ + } + }); + + // Start heartbeat timer on session start + pi.on("session_start", async () => { + if (!client.getBaseUrl() || !client.vmId) return; + + heartbeatTimer = setInterval(async () => { + try { + await client.api("POST", `/registry/vms/${client.vmId}/heartbeat`); + } catch { + /* best-effort */ + } + }, 60_000); + }); + + // Stop heartbeat on shutdown + pi.on("session_shutdown", async () => { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + heartbeatTimer = null; + } + }); + + // Handle lifecycle events from other extensions + pi.events.on( + "vers:agent_spawned", + async (data: { vmId: string; label: string; role: string; address: string; commitId?: string }) => { + if (!client.getBaseUrl()) return; + try { + await client.api("POST", "/registry/vms", { + id: data.vmId, + name: data.label, + role: data.role || "worker", + address: data.address, + registeredBy: "reef", + metadata: { + agentId: data.label, + commitId: data.commitId, + registeredVia: "vers:agent_spawned", + createdAt: new Date().toISOString(), + }, + }); + } catch (err) { + console.error(`[registry] Registration failed for ${data.label}: ${err instanceof Error ? err.message : err}`); + } + }, + ); + + pi.events.on("vers:agent_destroyed", async (data: { vmId: string; label: string }) => { + if (!client.getBaseUrl()) return; + try { + await client.api("DELETE", `/registry/vms/${encodeURIComponent(data.vmId)}`); + } catch (err) { + console.error(`[registry] Delete failed for ${data.label}: ${err instanceof Error ? err.message : err}`); + } + }); + + pi.events.on( + "vers:lt_created", + async (data: { + vmId: string; + name: string; + role: string; + address: string; + ltRole?: string; + commitId?: string; + createdAt?: string; + }) => { + if (!client.getBaseUrl()) return; + try { + await client.api("POST", "/registry/vms", { + id: data.vmId, + name: data.name, + role: data.role || "lieutenant", + address: data.address, + registeredBy: "reef", + metadata: { + agentId: data.name, + role: data.ltRole, + commitId: data.commitId, + createdAt: data.createdAt, + registeredVia: "vers:lt_created", + }, + }); + } catch (err) { + console.error( + `[registry] LT registration failed for ${data.name}: ${err instanceof Error ? err.message : err}`, + ); + } + }, + ); + + pi.events.on("vers:lt_destroyed", async (data: { vmId: string; name: string }) => { + if (!client.getBaseUrl()) return; + try { + await client.api("DELETE", `/registry/vms/${encodeURIComponent(data.vmId)}`); + } catch (err) { + console.error(`[registry] LT delete failed for ${data.name}: ${err instanceof Error ? err.message : err}`); + } + }); +} diff --git a/services/registry/index.ts b/services/registry/index.ts new file mode 100644 index 0000000..e8e13fc --- /dev/null +++ b/services/registry/index.ts @@ -0,0 +1,126 @@ +/** + * Registry service — VM service discovery with SQLite backing. + * + * Upgraded from in-memory (examples/services/registry) to SQLite with: + * - Persistent storage across restarts + * - VM lineage tracking (parent-child relationships) + * - Reef config per VM (organs + capabilities = "DNA") + * - Config diff between VMs + */ + +import type { FleetClient, ServiceModule } from "../../src/core/types.js"; +import { registerBehaviors } from "./behaviors.js"; +import { createRoutes } from "./routes.js"; +import { RegistryStore } from "./store.js"; +import { registerTools } from "./tools.js"; + +const store = new RegistryStore(); + +const registry: ServiceModule = { + name: "registry", + description: "VM service discovery — SQLite-backed with lineage tracking", + routes: createRoutes(store), + + store: { + flush() { + store.flush(); + }, + async close() { + store.close(); + }, + }, + + registerTools, + registerBehaviors, + + capabilities: ["fleet.discovery", "fleet.registry", "fleet.lineage"], + + routeDocs: { + "POST /vms": { + summary: "Register a VM (upserts if ID exists)", + body: { + id: { type: "string", required: true, description: "VM ID" }, + name: { type: "string", required: true, description: "Human-readable name" }, + role: { type: "string", required: true, description: "Role: infra | lieutenant | worker | golden | custom" }, + address: { type: "string", required: true, description: "Network address" }, + parentVmId: { type: "string", description: "Parent VM ID for lineage" }, + reefConfig: { type: "object", description: "VM DNA: { organs: [...], capabilities: [...] }" }, + registeredBy: { type: "string", required: true, description: "Agent or system that registered" }, + }, + response: "The registered VM object", + }, + "GET /vms": { + summary: "List VMs with optional filters", + query: { + role: { type: "string", description: "Filter by role" }, + status: { type: "string", description: "Filter by status: running | paused | stopped" }, + parentVmId: { type: "string", description: "Filter by parent VM" }, + }, + response: "{ vms, count }", + }, + "GET /vms/:id": { + summary: "Get a VM by ID", + params: { id: { type: "string", required: true, description: "VM ID" } }, + }, + "PATCH /vms/:id": { + summary: "Update a VM's fields", + params: { id: { type: "string", required: true, description: "VM ID" } }, + body: { + status: { type: "string", description: "New status" }, + reefConfig: { type: "object", description: "Updated reef config (DNA)" }, + }, + }, + "DELETE /vms/:id": { + summary: "Deregister a VM", + params: { id: { type: "string", required: true, description: "VM ID" } }, + }, + "POST /vms/:id/heartbeat": { + summary: "Send a heartbeat for a VM", + params: { id: { type: "string", required: true, description: "VM ID" } }, + response: "{ id, lastSeen }", + }, + "GET /discover/:role": { + summary: "Discover running VMs by role (excludes stale)", + params: { role: { type: "string", required: true, description: "Role to discover" } }, + response: "{ vms, count }", + }, + "GET /vms/:id/children": { + summary: "Get direct child VMs", + params: { id: { type: "string", required: true, description: "VM ID" } }, + }, + "GET /vms/:id/ancestors": { + summary: "Get ancestor chain to root", + params: { id: { type: "string", required: true, description: "VM ID" } }, + }, + "GET /vms/:id/subtree": { + summary: "Get full subtree (BFS)", + params: { id: { type: "string", required: true, description: "VM ID" } }, + }, + "GET /vms/:idA/diff/:idB": { + summary: "Compare reef configs between two VMs", + params: { + idA: { type: "string", required: true, description: "First VM ID" }, + idB: { type: "string", required: true, description: "Second VM ID" }, + }, + response: "{ added: { organs, capabilities }, removed: { organs, capabilities } }", + }, + "GET /_panel": { + summary: "HTML dashboard showing registered VMs with lineage", + response: "text/html", + }, + }, + + widget: { + async getLines(client: FleetClient) { + try { + const res = await client.api<{ vms: { status: string }[]; count: number }>("GET", "/registry/vms"); + const running = res.vms.filter((v) => v.status === "running").length; + return [`Registry: ${res.count} VMs (${running} running)`]; + } catch { + return []; + } + }, + }, +}; + +export default registry; diff --git a/services/registry/routes.ts b/services/registry/routes.ts new file mode 100644 index 0000000..84c4608 --- /dev/null +++ b/services/registry/routes.ts @@ -0,0 +1,175 @@ +/** + * Registry HTTP routes — VM registration, discovery, heartbeat, lineage. + */ + +import { Hono } from "hono"; +import type { RegistryStore, VMFilters, VMRole, VMStatus } from "./store.js"; +import { ConflictError, NotFoundError, ValidationError } from "./store.js"; + +export function createRoutes(store: RegistryStore): Hono { + const routes = new Hono(); + + // POST /vms — register a VM + routes.post("/vms", async (c) => { + try { + const body = await c.req.json(); + const vm = store.register(body); + return c.json(vm, 201); + } catch (e) { + if (e instanceof ValidationError) return c.json({ error: e.message }, 400); + if (e instanceof ConflictError) return c.json({ error: e.message }, 409); + throw e; + } + }); + + // GET /vms — list VMs with optional filters + routes.get("/vms", (c) => { + const filters: VMFilters = {}; + const role = c.req.query("role"); + const status = c.req.query("status"); + const parentVmId = c.req.query("parentVmId"); + if (role) filters.role = role as VMRole; + if (status) filters.status = status as VMStatus; + if (parentVmId) filters.parentVmId = parentVmId; + + const vms = store.list(filters); + return c.json({ vms, count: vms.length }); + }); + + // GET /vms/:id — get a VM by ID + routes.get("/vms/:id", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + return c.json(vm); + }); + + // PATCH /vms/:id — update a VM + routes.patch("/vms/:id", async (c) => { + try { + const body = await c.req.json(); + const vm = store.update(c.req.param("id"), body); + return c.json(vm); + } catch (e) { + if (e instanceof NotFoundError) return c.json({ error: e.message }, 404); + if (e instanceof ValidationError) return c.json({ error: e.message }, 400); + throw e; + } + }); + + // DELETE /vms/:id — deregister a VM + routes.delete("/vms/:id", (c) => { + const deleted = store.deregister(c.req.param("id")); + if (!deleted) return c.json({ error: "VM not found" }, 404); + return c.json({ deleted: true }); + }); + + // POST /vms/:id/heartbeat — heartbeat + routes.post("/vms/:id/heartbeat", (c) => { + try { + const vm = store.heartbeat(c.req.param("id")); + return c.json({ id: vm.id, lastSeen: vm.lastSeen }); + } catch (e) { + if (e instanceof NotFoundError) return c.json({ error: e.message }, 404); + throw e; + } + }); + + // GET /discover/:role — discover VMs by role + routes.get("/discover/:role", (c) => { + const role = c.req.param("role") as VMRole; + const vms = store.discover(role); + return c.json({ vms, count: vms.length }); + }); + + // ========================================================================= + // Lineage endpoints + // ========================================================================= + + // GET /vms/:id/children — direct children + routes.get("/vms/:id/children", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + const children = store.children(c.req.param("id")); + return c.json({ children, count: children.length }); + }); + + // GET /vms/:id/ancestors — path to root + routes.get("/vms/:id/ancestors", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + const ancestors = store.ancestors(c.req.param("id")); + return c.json({ ancestors, count: ancestors.length }); + }); + + // GET /vms/:id/subtree — full subtree (BFS) + routes.get("/vms/:id/subtree", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + const subtree = store.subtree(c.req.param("id")); + return c.json({ subtree, count: subtree.length }); + }); + + // GET /vms/:idA/diff/:idB — config diff between two VMs + routes.get("/vms/:idA/diff/:idB", (c) => { + const diff = store.configDiff(c.req.param("idA"), c.req.param("idB")); + if (!diff) return c.json({ error: "One or both VMs not found" }, 404); + return c.json(diff); + }); + + // ========================================================================= + // Dashboard + // ========================================================================= + + routes.get("/_panel", (c) => { + const vms = store.list({}); + const rows = vms + .map((vm) => { + const statusColor = vm.status === "running" ? "#4f9" : vm.status === "paused" ? "#ff9800" : "#888"; + const lastSeen = vm.lastSeen ? new Date(vm.lastSeen).toLocaleTimeString() : "---"; + const parent = vm.parentVmId ? vm.parentVmId.slice(0, 8) : "---"; + const organs = vm.reefConfig.organs.join(", ") || "none"; + return ` + ${vm.id.slice(0, 12)} + ${vm.name} + ${vm.role} + ${vm.status} + ${parent} + ${organs} + ${lastSeen} + `; + }) + .join("\n"); + + const html = ` + + + Registry Dashboard + + + +

VM Registry

+

${vms.length} VM${vms.length !== 1 ? "s" : ""} registered

+ + + + + + ${rows || ''} + +
IDNameRoleStatusParentOrgansLast Seen
No VMs registered
+ +`; + + return c.html(html); + }); + + return routes; +} diff --git a/services/registry/store.ts b/services/registry/store.ts new file mode 100644 index 0000000..66e2fd6 --- /dev/null +++ b/services/registry/store.ts @@ -0,0 +1,432 @@ +/** + * Registry store — VM service discovery backed by SQLite. + * + * Upgraded from in-memory to SQLite with: + * - VM lineage tracking (parent-child relationships) + * - Reef config per VM (the "DNA" concept — organs + capabilities) + * - Heartbeat-based liveness detection + */ + +import { Database } from "bun:sqlite"; +import { existsSync, mkdirSync } from "node:fs"; +import { dirname } from "node:path"; +import { ulid } from "ulid"; + +// ============================================================================= +// Types +// ============================================================================= + +export type VMRole = "infra" | "lieutenant" | "worker" | "golden" | "custom"; +export type VMStatus = "running" | "paused" | "stopped"; + +export interface VMService { + name: string; + port: number; + protocol?: string; +} + +export interface ReefConfig { + organs: string[]; + capabilities: string[]; +} + +export interface VM { + id: string; + name: string; + role: VMRole; + status: VMStatus; + address: string; + parentVmId: string | null; + services: VMService[]; + reefConfig: ReefConfig; + registeredBy: string; + registeredAt: string; + lastSeen: string; + metadata?: Record; +} + +export interface RegisterInput { + id: string; + name: string; + role: VMRole; + address: string; + parentVmId?: string; + services?: VMService[]; + reefConfig?: ReefConfig; + registeredBy: string; + metadata?: Record; +} + +export interface UpdateInput { + name?: string; + status?: VMStatus; + address?: string; + services?: VMService[]; + reefConfig?: ReefConfig; + metadata?: Record; +} + +export interface VMFilters { + role?: VMRole; + status?: VMStatus; + parentVmId?: string; +} + +// ============================================================================= +// Errors +// ============================================================================= + +export class NotFoundError extends Error { + constructor(message: string) { + super(message); + this.name = "NotFoundError"; + } +} + +export class ValidationError extends Error { + constructor(message: string) { + super(message); + this.name = "ValidationError"; + } +} + +export class ConflictError extends Error { + constructor(message: string) { + super(message); + this.name = "ConflictError"; + } +} + +// ============================================================================= +// Constants +// ============================================================================= + +const VALID_ROLES = new Set(["infra", "lieutenant", "worker", "golden", "custom"]); +const VALID_STATUSES = new Set(["running", "paused", "stopped"]); +const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes + +const DEFAULT_REEF_CONFIG: ReefConfig = { organs: [], capabilities: [] }; + +// ============================================================================= +// Store +// ============================================================================= + +export class RegistryStore { + private db: Database; + + constructor(dbPath = "data/registry.sqlite") { + const dir = dirname(dbPath); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + + this.db = new Database(dbPath); + this.db.exec("PRAGMA journal_mode=WAL"); + this.initTables(); + } + + private initTables(): void { + this.db.exec(` + CREATE TABLE IF NOT EXISTS vms ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + role TEXT NOT NULL CHECK(role IN ('infra', 'lieutenant', 'worker', 'golden', 'custom')), + status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running', 'paused', 'stopped')), + address TEXT NOT NULL, + parent_vm_id TEXT REFERENCES vms(id) ON DELETE SET NULL, + services TEXT NOT NULL DEFAULT '[]', + reef_config TEXT NOT NULL DEFAULT '{"organs":[],"capabilities":[]}', + registered_by TEXT NOT NULL, + registered_at TEXT NOT NULL DEFAULT (datetime('now')), + last_seen TEXT NOT NULL DEFAULT (datetime('now')), + metadata TEXT + ) + `); + + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_vms_role ON vms(role)`); + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_vms_status ON vms(status)`); + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_vms_parent ON vms(parent_vm_id)`); + } + + private isStale(lastSeen: string): boolean { + return Date.now() - new Date(lastSeen).getTime() > STALE_THRESHOLD_MS; + } + + register(input: RegisterInput): VM { + if (!input.id?.trim()) throw new ValidationError("id is required"); + if (!input.name?.trim()) throw new ValidationError("name is required"); + if (!input.role || !VALID_ROLES.has(input.role)) throw new ValidationError(`invalid role: ${input.role}`); + if (!input.address?.trim()) throw new ValidationError("address is required"); + if (!input.registeredBy?.trim()) throw new ValidationError("registeredBy is required"); + + const now = new Date().toISOString(); + const existing = this.get(input.id); + + if (existing) { + // Upsert — update existing registration + this.db.run( + `UPDATE vms SET name = ?, role = ?, status = 'running', address = ?, + parent_vm_id = ?, services = ?, reef_config = ?, registered_by = ?, + last_seen = ?, metadata = ? WHERE id = ?`, + [ + input.name.trim(), + input.role, + input.address.trim(), + input.parentVmId || existing.parentVmId || null, + JSON.stringify(input.services || existing.services), + JSON.stringify(input.reefConfig || existing.reefConfig), + input.registeredBy.trim(), + now, + input.metadata ? JSON.stringify(input.metadata) : (existing.metadata ? JSON.stringify(existing.metadata) : null), + input.id, + ], + ); + } else { + this.db.run( + `INSERT INTO vms (id, name, role, status, address, parent_vm_id, services, reef_config, registered_by, registered_at, last_seen, metadata) + VALUES (?, ?, ?, 'running', ?, ?, ?, ?, ?, ?, ?, ?)`, + [ + input.id.trim(), + input.name.trim(), + input.role, + input.address.trim(), + input.parentVmId || null, + JSON.stringify(input.services || []), + JSON.stringify(input.reefConfig || DEFAULT_REEF_CONFIG), + input.registeredBy.trim(), + now, + now, + input.metadata ? JSON.stringify(input.metadata) : null, + ], + ); + } + + return this.get(input.id)!; + } + + get(id: string): VM | undefined { + const row = this.db.query("SELECT * FROM vms WHERE id = ?").get(id) as any; + return row ? rowToVM(row) : undefined; + } + + list(filters?: VMFilters): VM[] { + let sql = "SELECT * FROM vms"; + const conditions: string[] = []; + const params: any[] = []; + + if (filters?.role) { + conditions.push("role = ?"); + params.push(filters.role); + } + if (filters?.status) { + conditions.push("status = ?"); + params.push(filters.status); + } + if (filters?.parentVmId) { + conditions.push("parent_vm_id = ?"); + params.push(filters.parentVmId); + } + + if (conditions.length) sql += ` WHERE ${conditions.join(" AND ")}`; + sql += " ORDER BY last_seen DESC"; + + let results = this.db + .query(sql) + .all(...params) + .map(rowToVM); + + // Exclude stale VMs from "running" filter + if (filters?.status === "running") { + results = results.filter((v) => !this.isStale(v.lastSeen)); + } + + return results; + } + + update(id: string, input: UpdateInput): VM { + const vm = this.get(id); + if (!vm) throw new NotFoundError("VM not found"); + + if (input.status !== undefined && !VALID_STATUSES.has(input.status)) { + throw new ValidationError(`invalid status: ${input.status}`); + } + + const sets: string[] = []; + const params: any[] = []; + + if (input.name !== undefined) { + sets.push("name = ?"); + params.push(input.name.trim()); + } + if (input.status !== undefined) { + sets.push("status = ?"); + params.push(input.status); + } + if (input.address !== undefined) { + sets.push("address = ?"); + params.push(input.address.trim()); + } + if (input.services !== undefined) { + sets.push("services = ?"); + params.push(JSON.stringify(input.services)); + } + if (input.reefConfig !== undefined) { + sets.push("reef_config = ?"); + params.push(JSON.stringify(input.reefConfig)); + } + if (input.metadata !== undefined) { + sets.push("metadata = ?"); + params.push(JSON.stringify(input.metadata)); + } + + sets.push("last_seen = ?"); + params.push(new Date().toISOString()); + + if (sets.length > 0) { + params.push(id); + this.db.run(`UPDATE vms SET ${sets.join(", ")} WHERE id = ?`, params); + } + + return this.get(id)!; + } + + deregister(id: string): boolean { + const result = this.db.run("DELETE FROM vms WHERE id = ?", [id]); + return result.changes > 0; + } + + heartbeat(id: string): VM { + const vm = this.get(id); + if (!vm) throw new NotFoundError("VM not found"); + + this.db.run("UPDATE vms SET last_seen = ?, status = 'running' WHERE id = ?", [new Date().toISOString(), id]); + return this.get(id)!; + } + + discover(role: VMRole): VM[] { + return this.db + .query("SELECT * FROM vms WHERE role = ? AND status = 'running'") + .all(role) + .map(rowToVM) + .filter((v) => !this.isStale(v.lastSeen)); + } + + // ========================================================================= + // Lineage queries + // ========================================================================= + + /** Get all direct children of a VM */ + children(vmId: string): VM[] { + return this.db + .query("SELECT * FROM vms WHERE parent_vm_id = ? ORDER BY registered_at") + .all(vmId) + .map(rowToVM); + } + + /** Get ancestors from a VM up to the root */ + ancestors(vmId: string): VM[] { + const result: VM[] = []; + let currentId: string | null = vmId; + const seen = new Set(); + + while (currentId) { + if (seen.has(currentId)) break; // prevent cycles + seen.add(currentId); + const vm = this.get(currentId); + if (!vm) break; + result.unshift(vm); + currentId = vm.parentVmId; + } + + return result; + } + + /** Get entire subtree rooted at a VM (BFS) */ + subtree(vmId: string): VM[] { + const result: VM[] = []; + const queue: string[] = [vmId]; + const seen = new Set(); + + while (queue.length > 0) { + const id = queue.shift()!; + if (seen.has(id)) continue; + seen.add(id); + + const vm = this.get(id); + if (!vm) continue; + result.push(vm); + + const kids = this.children(id); + for (const kid of kids) { + queue.push(kid.id); + } + } + + return result; + } + + // ========================================================================= + // Config diff + // ========================================================================= + + /** Compare reef configs between two VMs */ + configDiff(vmIdA: string, vmIdB: string): { added: ReefConfig; removed: ReefConfig } | null { + const a = this.get(vmIdA); + const b = this.get(vmIdB); + if (!a || !b) return null; + + return { + added: { + organs: b.reefConfig.organs.filter((o) => !a.reefConfig.organs.includes(o)), + capabilities: b.reefConfig.capabilities.filter((c) => !a.reefConfig.capabilities.includes(c)), + }, + removed: { + organs: a.reefConfig.organs.filter((o) => !b.reefConfig.organs.includes(o)), + capabilities: a.reefConfig.capabilities.filter((c) => !b.reefConfig.capabilities.includes(c)), + }, + }; + } + + // ========================================================================= + // Lifecycle + // ========================================================================= + + count(): number { + const row = this.db.query("SELECT COUNT(*) as c FROM vms").get() as any; + return row?.c || 0; + } + + flush(): void { + // WAL mode handles durability + } + + close(): void { + this.db.close(); + } +} + +// ============================================================================= +// Row mapper +// ============================================================================= + +function rowToVM(row: any): VM { + const vm: VM = { + id: row.id, + name: row.name, + role: row.role, + status: row.status, + address: row.address, + parentVmId: row.parent_vm_id || null, + services: JSON.parse(row.services || "[]"), + reefConfig: JSON.parse(row.reef_config || '{"organs":[],"capabilities":[]}'), + registeredBy: row.registered_by, + registeredAt: row.registered_at, + lastSeen: row.last_seen, + }; + if (row.metadata) { + try { + vm.metadata = JSON.parse(row.metadata); + } catch { + /* ignore malformed metadata */ + } + } + return vm; +} diff --git a/services/registry/tools.ts b/services/registry/tools.ts new file mode 100644 index 0000000..75db734 --- /dev/null +++ b/services/registry/tools.ts @@ -0,0 +1,128 @@ +/** + * Registry tools — VM registration, discovery, heartbeat, lineage. + */ + +import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import type { FleetClient } from "../../src/core/types.js"; + +const ROLE_VALUES = ["infra", "lieutenant", "worker", "golden", "custom"] as const; + +export function registerTools(pi: ExtensionAPI, client: FleetClient) { + pi.registerTool({ + name: "registry_list", + label: "Registry: List VMs", + description: "List VMs in the coordination registry. Optionally filter by role, status, or parent.", + parameters: Type.Object({ + role: Type.Optional(Type.String({ description: "Filter by role: infra | lieutenant | worker | golden | custom" })), + status: Type.Optional(Type.String({ description: "Filter by status: running | paused | stopped" })), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const qs = new URLSearchParams(); + if (params.role) qs.set("role", params.role); + if (params.status) qs.set("status", params.status); + const query = qs.toString(); + const result = await client.api("GET", `/registry/vms${query ? `?${query}` : ""}`); + return client.ok(JSON.stringify(result, null, 2), { result }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "registry_register", + label: "Registry: Register VM", + description: "Register a VM so other agents can discover it. Supports lineage tracking via parentVmId.", + parameters: Type.Object({ + id: Type.String({ description: "VM ID" }), + name: Type.String({ description: "Human-readable name" }), + role: Type.String({ description: "VM role: infra | lieutenant | worker | golden | custom" }), + address: Type.String({ description: "Network address or endpoint" }), + parentVmId: Type.Optional(Type.String({ description: "Parent VM ID for lineage tracking" })), + reefConfig: Type.Optional( + Type.Object( + { + organs: Type.Array(Type.String(), { description: "Service modules loaded" }), + capabilities: Type.Array(Type.String(), { description: "Extension capabilities" }), + }, + { description: "VM DNA — modules and capabilities" }, + ), + ), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const vm = await client.api("POST", "/registry/vms", { + ...params, + registeredBy: client.agentName, + }); + return client.ok(JSON.stringify(vm, null, 2), { vm }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "registry_discover", + label: "Registry: Discover VMs", + description: "Discover running VMs by role — find workers, lieutenants, or other agents.", + parameters: Type.Object({ + role: Type.String({ description: "VM role to discover" }), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("GET", `/registry/discover/${encodeURIComponent(params.role)}`); + return client.ok(JSON.stringify(result, null, 2), { result }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "registry_heartbeat", + label: "Registry: Heartbeat", + description: "Send a heartbeat to keep a VM's registration active.", + parameters: Type.Object({ + id: Type.String({ description: "VM ID to heartbeat" }), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("POST", `/registry/vms/${encodeURIComponent(params.id)}/heartbeat`); + return client.ok(JSON.stringify(result, null, 2), { result }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "registry_lineage", + label: "Registry: VM Lineage", + description: "View a VM's lineage — ancestors (path to root) or subtree (all descendants).", + parameters: Type.Object({ + id: Type.String({ description: "VM ID" }), + direction: Type.Optional( + Type.Union([Type.Literal("ancestors"), Type.Literal("subtree"), Type.Literal("children")], { + description: "Direction: ancestors (default), subtree, or children", + }), + ), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const dir = params.direction || "ancestors"; + const result = await client.api("GET", `/registry/vms/${encodeURIComponent(params.id)}/${dir}`); + return client.ok(JSON.stringify(result, null, 2), { result }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); +} diff --git a/services/vers-config/index.ts b/services/vers-config/index.ts new file mode 100644 index 0000000..0abd7f8 --- /dev/null +++ b/services/vers-config/index.ts @@ -0,0 +1,262 @@ +/** + * Vers Config service — centralized config resolution for the Vers platform. + * + * Replaces scattered config files from pi-vers: + * ~/.vers/keys.json → reef store (encrypted at rest) + * ~/.vers/config.json → reef store + * ~/.vers/agent-services.json → reef core auth (VERS_INFRA_URL, VERS_AUTH_TOKEN) + * ~/.pi/lieutenants.json → reef SQLite (lieutenant service) + * + * SSH-specific config (keys, control sockets) stays in pi-vers. + * + * Config hierarchy (highest priority first): + * 1. Environment variables (VERS_API_KEY, VERS_AUTH_TOKEN, etc.) + * 2. Reef store values (set via API or tools) + * 3. File-based fallbacks (~/.vers/keys.json, etc.) + */ + +import { existsSync, readFileSync } from "node:fs"; +import { join } from "node:path"; +import { homedir } from "node:os"; +import { Hono } from "hono"; +import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import type { FleetClient, ServiceContext, ServiceModule } from "../../src/core/types.js"; + +// ============================================================================= +// Config resolution +// ============================================================================= + +interface VersConfig { + apiKey: string | null; + infraUrl: string | null; + authToken: string | null; + baseUrl: string | null; + agentName: string | null; + vmId: string | null; + agentRole: string | null; +} + +/** Stored overrides (set via API, persisted in reef store) */ +const overrides: Record = {}; + +function loadFileConfig(filename: string): Record { + try { + const filePath = join(homedir(), ".vers", filename); + if (existsSync(filePath)) { + return JSON.parse(readFileSync(filePath, "utf-8")); + } + } catch { + /* ignore */ + } + return {}; +} + +function resolveApiKey(): string | null { + // 1. Environment variable + if (process.env.VERS_API_KEY) return process.env.VERS_API_KEY; + // 2. Store override + if (overrides.VERS_API_KEY) return overrides.VERS_API_KEY; + // 3. File fallback + const keys = loadFileConfig("keys.json") as { keys?: { VERS_API_KEY?: string } }; + return keys.keys?.VERS_API_KEY || null; +} + +function resolveConfig(): VersConfig { + const agentServices = loadFileConfig("agent-services.json") as { + infraUrl?: string; + authToken?: string; + }; + const config = loadFileConfig("config.json") as { + baseUrl?: string; + }; + + return { + apiKey: resolveApiKey(), + infraUrl: process.env.VERS_INFRA_URL || overrides.VERS_INFRA_URL || agentServices.infraUrl || null, + authToken: process.env.VERS_AUTH_TOKEN || overrides.VERS_AUTH_TOKEN || agentServices.authToken || null, + baseUrl: process.env.VERS_BASE_URL || overrides.VERS_BASE_URL || config.baseUrl || null, + agentName: process.env.VERS_AGENT_NAME || overrides.VERS_AGENT_NAME || null, + vmId: process.env.VERS_VM_ID || overrides.VERS_VM_ID || null, + agentRole: process.env.VERS_AGENT_ROLE || overrides.VERS_AGENT_ROLE || null, + }; +} + +// ============================================================================= +// Routes +// ============================================================================= + +const routes = new Hono(); + +// GET / — resolve current config (masks sensitive values) +routes.get("/", (c) => { + const cfg = resolveConfig(); + return c.json({ + apiKey: cfg.apiKey ? `${cfg.apiKey.slice(0, 8)}...` : null, + infraUrl: cfg.infraUrl, + authToken: cfg.authToken ? `${cfg.authToken.slice(0, 8)}...` : null, + baseUrl: cfg.baseUrl, + agentName: cfg.agentName, + vmId: cfg.vmId, + agentRole: cfg.agentRole, + sources: { + apiKey: process.env.VERS_API_KEY ? "env" : overrides.VERS_API_KEY ? "store" : "file", + infraUrl: process.env.VERS_INFRA_URL ? "env" : overrides.VERS_INFRA_URL ? "store" : "file", + authToken: process.env.VERS_AUTH_TOKEN ? "env" : overrides.VERS_AUTH_TOKEN ? "store" : "file", + }, + }); +}); + +// PUT /:key — set a config override +routes.put("/:key", async (c) => { + const key = c.req.param("key").toUpperCase(); + const validKeys = new Set([ + "VERS_API_KEY", + "VERS_INFRA_URL", + "VERS_AUTH_TOKEN", + "VERS_BASE_URL", + "VERS_AGENT_NAME", + "VERS_VM_ID", + "VERS_AGENT_ROLE", + ]); + + if (!validKeys.has(key)) { + return c.json({ error: `Invalid config key: ${key}. Valid: ${Array.from(validKeys).join(", ")}` }, 400); + } + + const body = await c.req.json(); + if (typeof body.value !== "string") { + return c.json({ error: "value must be a string" }, 400); + } + + overrides[key] = body.value; + return c.json({ key, set: true, source: "store" }); +}); + +// DELETE /:key — remove a config override +routes.delete("/:key", (c) => { + const key = c.req.param("key").toUpperCase(); + if (overrides[key]) { + delete overrides[key]; + return c.json({ key, deleted: true }); + } + return c.json({ error: "Key not found in overrides" }, 404); +}); + +// GET /resolve/:key — resolve a single config value (full value, not masked) +routes.get("/resolve/:key", (c) => { + const key = c.req.param("key").toUpperCase(); + const cfg = resolveConfig(); + const map: Record = { + VERS_API_KEY: cfg.apiKey, + VERS_INFRA_URL: cfg.infraUrl, + VERS_AUTH_TOKEN: cfg.authToken, + VERS_BASE_URL: cfg.baseUrl, + VERS_AGENT_NAME: cfg.agentName, + VERS_VM_ID: cfg.vmId, + VERS_AGENT_ROLE: cfg.agentRole, + }; + + if (!(key in map)) { + return c.json({ error: `Unknown config key: ${key}` }, 400); + } + + return c.json({ + key, + value: map[key], + source: process.env[key] ? "env" : overrides[key] ? "store" : "file", + }); +}); + +// ============================================================================= +// Module export +// ============================================================================= + +let storeRef: any = null; + +const versConfig: ServiceModule = { + name: "vers-config", + description: "Centralized Vers platform config resolution", + routes, + + init(ctx: ServiceContext) { + // Load saved overrides from reef store if available + const storeModule = ctx.getStore("store"); + storeRef = storeModule; + }, + + registerTools(pi: ExtensionAPI, client: FleetClient) { + pi.registerTool({ + name: "vers_config_get", + label: "Vers Config: View", + description: "View the current Vers platform configuration. Shows resolved values from env, store, or file sources.", + parameters: Type.Object({}), + async execute() { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("GET", "/vers-config/"); + const lines = [ + `API Key: ${result.apiKey || "(not set)"} [${result.sources.apiKey}]`, + `Infra URL: ${result.infraUrl || "(not set)"} [${result.sources.infraUrl}]`, + `Auth Token: ${result.authToken || "(not set)"} [${result.sources.authToken}]`, + `Base URL: ${result.baseUrl || "(not set)"}`, + `Agent Name: ${result.agentName || "(not set)"}`, + `VM ID: ${result.vmId || "(not set)"}`, + `Agent Role: ${result.agentRole || "(not set)"}`, + ]; + return client.ok(lines.join("\n"), { config: result }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "vers_config_set", + label: "Vers Config: Set", + description: + "Set a Vers config value. Stored in reef (takes priority over file-based config, overridden by env vars).", + parameters: Type.Object({ + key: Type.String({ + description: + "Config key: VERS_API_KEY, VERS_INFRA_URL, VERS_AUTH_TOKEN, VERS_BASE_URL, VERS_AGENT_NAME, VERS_VM_ID, VERS_AGENT_ROLE", + }), + value: Type.String({ description: "Config value" }), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + await client.api("PUT", `/vers-config/${encodeURIComponent(params.key)}`, { value: params.value }); + return client.ok(`Set ${params.key} (source: store).`); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + }, + + routeDocs: { + "GET /": { + summary: "View resolved config (sensitive values masked)", + response: "{ apiKey, infraUrl, authToken, baseUrl, agentName, vmId, agentRole, sources }", + }, + "PUT /:key": { + summary: "Set a config override", + params: { key: { type: "string", required: true, description: "Config key (e.g., VERS_API_KEY)" } }, + body: { value: { type: "string", required: true, description: "Config value" } }, + }, + "DELETE /:key": { + summary: "Remove a config override", + params: { key: { type: "string", required: true, description: "Config key" } }, + }, + "GET /resolve/:key": { + summary: "Resolve a single config value (full, unmasked)", + params: { key: { type: "string", required: true, description: "Config key" } }, + }, + }, + + dependencies: ["store"], + capabilities: ["config.resolve", "config.manage"], +}; + +export default versConfig; diff --git a/services/vm-tree/index.ts b/services/vm-tree/index.ts new file mode 100644 index 0000000..69b8e71 --- /dev/null +++ b/services/vm-tree/index.ts @@ -0,0 +1,404 @@ +/** + * VM Tree service — SQLite-backed VM lineage tree. + * + * Tracks the hierarchy: + * roof reef + * └── lieutenants (1:many) + * └── swarm workers / agent VMs + * + * Features: + * - Full lineage queries (ancestors, descendants, subtrees) + * - Category-based filtering (lieutenant, swarm_vm, agent_vm, infra_vm) + * - Reef config (DNA) per VM: which organs (modules) and capabilities (extensions) + * - Config diff between VMs + * - Dashboard: modules/extensions on each VM, lineage position + * - Hourly snapshots via cron (data/snapshots/vms-{timestamp}.sqlite, retain last 24) + * + * Database: data/vms.sqlite (included in starter image) + */ + +import { Hono } from "hono"; +import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import { Type } from "@sinclair/typebox"; +import type { FleetClient, ServiceContext, ServiceModule } from "../../src/core/types.js"; +import type { VMCategory } from "./store.js"; +import { VMTreeStore } from "./store.js"; + +const store = new VMTreeStore(); + +// ============================================================================= +// Routes +// ============================================================================= + +const routes = new Hono(); + +// GET /vms — list all VMs +routes.get("/vms", (c) => { + const category = c.req.query("category") as VMCategory | undefined; + const parentVmId = c.req.query("parentVmId"); + const vms = store.list({ category: category || undefined, parentVmId: parentVmId || undefined }); + return c.json({ vms, count: vms.length }); +}); + +// POST /vms — register a VM in the tree +routes.post("/vms", async (c) => { + try { + const body = await c.req.json(); + const vm = store.create(body); + return c.json(vm, 201); + } catch (e: any) { + return c.json({ error: e.message }, 400); + } +}); + +// GET /vms/:id — get a VM +routes.get("/vms/:id", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + return c.json(vm); +}); + +// PATCH /vms/:id — update a VM +routes.patch("/vms/:id", async (c) => { + try { + const body = await c.req.json(); + const vm = store.update(c.req.param("id"), body); + return c.json(vm); + } catch (e: any) { + return c.json({ error: e.message }, 400); + } +}); + +// DELETE /vms/:id — remove a VM +routes.delete("/vms/:id", (c) => { + try { + const removed = store.remove(c.req.param("id")); + if (!removed) return c.json({ error: "VM not found" }, 404); + return c.json({ deleted: true }); + } catch (e: any) { + return c.json({ error: e.message }, 400); + } +}); + +// GET /tree — full tree view (all roots or from a specific VM) +routes.get("/tree", (c) => { + const rootId = c.req.query("root"); + const tree = store.tree(rootId || undefined); + return c.json({ tree, count: store.count() }); +}); + +// GET /vms/:id/ancestors — path to root +routes.get("/vms/:id/ancestors", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + return c.json({ ancestors: store.ancestors(c.req.param("id")) }); +}); + +// GET /vms/:id/descendants — all descendants (BFS) +routes.get("/vms/:id/descendants", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + return c.json({ descendants: store.descendants(c.req.param("id")) }); +}); + +// GET /vms/:id/children — direct children +routes.get("/vms/:id/children", (c) => { + const vm = store.get(c.req.param("id")); + if (!vm) return c.json({ error: "VM not found" }, 404); + return c.json({ children: store.children(c.req.param("id")) }); +}); + +// GET /vms/:a/diff/:b — config diff +routes.get("/vms/:a/diff/:b", (c) => { + const diff = store.configDiff(c.req.param("a"), c.req.param("b")); + if (!diff) return c.json({ error: "One or both VMs not found" }, 404); + return c.json(diff); +}); + +// GET /find/organ/:name — find VMs with a specific organ +routes.get("/find/organ/:name", (c) => { + const vms = store.findByOrgan(c.req.param("name")); + return c.json({ vms, count: vms.length }); +}); + +// GET /find/capability/:name — find VMs with a specific capability +routes.get("/find/capability/:name", (c) => { + const vms = store.findByCapability(c.req.param("name")); + return c.json({ vms, count: vms.length }); +}); + +// GET /stats — summary statistics +routes.get("/stats", (c) => { + return c.json(store.stats()); +}); + +// POST /snapshot — create a snapshot now +routes.post("/snapshot", (c) => { + const path = store.snapshot(); + const removed = store.pruneSnapshots(); + return c.json({ snapshot: path, prunedOldSnapshots: removed }); +}); + +// GET /_panel — dashboard +routes.get("/_panel", (c) => { + const allVMs = store.list(); + const stats = store.stats(); + const tree = store.tree(); + + function renderTree(views: { vm: any; children: any[] }[], depth = 0): string { + return views + .map((v) => { + const indent = " ".repeat(depth * 4); + const prefix = depth > 0 ? "└─ " : ""; + const organs = v.vm.reefConfig.organs.join(", ") || "none"; + const caps = v.vm.reefConfig.capabilities.join(", ") || "none"; + const catColor = + v.vm.category === "lieutenant" + ? "#ff9800" + : v.vm.category === "infra_vm" + ? "#4f9" + : v.vm.category === "swarm_vm" + ? "#64b5f6" + : "#ccc"; + + let html = `
+ ${indent}${prefix}${v.vm.name} + [${v.vm.category}] + ${v.vm.vmId.slice(0, 12)} +
${indent}     + organs: ${organs} | caps: ${caps} +
`; + + if (v.children.length > 0) { + html += renderTree(v.children, depth + 1); + } + return html; + }) + .join(""); + } + + const html = ` + + + VM Tree + + + +

VM Lineage Tree

+
+ ${stats.total} VM${stats.total !== 1 ? "s" : ""} | + ${stats.roots} root${stats.roots !== 1 ? "s" : ""} | + ${Object.entries(stats.byCategory).map(([k, v]) => `${v} ${k}`).join(", ") || "empty"} +
+
+ ${tree.length > 0 ? renderTree(tree) : 'No VMs in tree'} +
+ +`; + + return c.html(html); +}); + +// ============================================================================= +// Module export +// ============================================================================= + +const vmTree: ServiceModule = { + name: "vm-tree", + description: "VM lineage tree — SQLite-backed hierarchy with DNA tracking", + routes, + + init(ctx: ServiceContext) { + // Register hourly snapshot cron job if cron service is available + const cron = ctx.getModule("cron"); + if (cron) { + // Schedule via the cron service's HTTP API + const port = parseInt(process.env.PORT || "3000", 10); + const token = process.env.VERS_AUTH_TOKEN || ""; + fetch(`http://localhost:${port}/cron/jobs`, { + method: "POST", + headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}` }, + body: JSON.stringify({ + name: "vm-tree-snapshot", + schedule: "1h", + type: "http", + config: { method: "POST", path: "/vm-tree/snapshot" }, + enabled: true, + }), + }).catch(() => { + /* best-effort — cron may not be ready yet */ + }); + } + }, + + store: { + flush() { + store.flush(); + }, + async close() { + store.close(); + }, + }, + + registerTools(pi: ExtensionAPI, client: FleetClient) { + pi.registerTool({ + name: "vm_tree_view", + label: "VM Tree: View", + description: + "View the VM lineage tree. Shows which modules (organs) and extensions (capabilities) are on each VM and where it sits in the hierarchy.", + parameters: Type.Object({ + vmId: Type.Optional(Type.String({ description: "Root VM ID to view subtree from (default: all roots)" })), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const qs = params.vmId ? `?root=${encodeURIComponent(params.vmId)}` : ""; + const result = await client.api("GET", `/vm-tree/tree${qs}`); + return client.ok(JSON.stringify(result, null, 2), { tree: result }); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "vm_tree_register", + label: "VM Tree: Register", + description: "Register a VM in the lineage tree with its category and DNA (organs + capabilities).", + parameters: Type.Object({ + name: Type.String({ description: "VM name" }), + category: Type.Union( + [ + Type.Literal("lieutenant"), + Type.Literal("swarm_vm"), + Type.Literal("agent_vm"), + Type.Literal("infra_vm"), + ], + { description: "VM category" }, + ), + parentVmId: Type.Optional(Type.String({ description: "Parent VM ID in the lineage tree" })), + vmId: Type.Optional(Type.String({ description: "VM ID (auto-generated if not provided)" })), + reefConfig: Type.Optional( + Type.Object( + { + organs: Type.Array(Type.String(), { description: "Service modules loaded" }), + capabilities: Type.Array(Type.String(), { description: "Extension capabilities" }), + }, + { description: "VM DNA" }, + ), + ), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api("POST", "/vm-tree/vms", params); + return client.ok( + `Registered "${result.name}" (${result.category}) in tree. ID: ${result.vmId}`, + { vm: result }, + ); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + + pi.registerTool({ + name: "vm_tree_find", + label: "VM Tree: Find", + description: "Find VMs by organ (module) or capability. Useful for answering 'which VMs have X loaded?'", + parameters: Type.Object({ + type: Type.Union([Type.Literal("organ"), Type.Literal("capability")], { + description: "Search type", + }), + name: Type.String({ description: "Organ or capability name to search for" }), + }), + async execute(_id, params) { + if (!client.getBaseUrl()) return client.noUrl(); + try { + const result = await client.api( + "GET", + `/vm-tree/find/${params.type}/${encodeURIComponent(params.name)}`, + ); + if (result.count === 0) { + return client.ok(`No VMs found with ${params.type} "${params.name}".`); + } + return client.ok( + `${result.count} VM(s) with ${params.type} "${params.name}":\n${JSON.stringify(result.vms, null, 2)}`, + { result }, + ); + } catch (e: any) { + return client.err(e.message); + } + }, + }); + }, + + widget: { + async getLines(client: FleetClient) { + try { + const res = await client.api("GET", "/vm-tree/stats"); + if (res.total === 0) return []; + return [`VM Tree: ${res.total} VMs, ${res.roots} roots`]; + } catch { + return []; + } + }, + }, + + dependencies: [], + capabilities: ["vm.tree", "vm.lineage", "vm.config"], + + routeDocs: { + "GET /vms": { + summary: "List VMs with optional category/parent filter", + query: { + category: { type: "string", description: "lieutenant | swarm_vm | agent_vm | infra_vm" }, + parentVmId: { type: "string", description: "Filter by parent" }, + }, + response: "{ vms: [...], count }", + }, + "POST /vms": { + summary: "Register a VM in the lineage tree", + body: { + name: { type: "string", required: true, description: "VM name" }, + category: { type: "string", required: true, description: "VM category" }, + parentVmId: { type: "string", description: "Parent VM ID" }, + reefConfig: { type: "object", description: "{ organs: [...], capabilities: [...] }" }, + }, + response: "The created VM node", + }, + "GET /vms/:id": { + summary: "Get a VM by ID", + params: { id: { type: "string", required: true } }, + }, + "PATCH /vms/:id": { + summary: "Update a VM", + params: { id: { type: "string", required: true } }, + }, + "DELETE /vms/:id": { + summary: "Remove a VM (fails if has children)", + params: { id: { type: "string", required: true } }, + }, + "GET /tree": { + summary: "Full tree view — all roots or subtree from ?root=vmId", + query: { root: { type: "string", description: "Root VM ID" } }, + response: "{ tree: [...], count }", + }, + "GET /vms/:id/ancestors": { summary: "Ancestor chain to root" }, + "GET /vms/:id/descendants": { summary: "All descendants (BFS)" }, + "GET /vms/:id/children": { summary: "Direct children" }, + "GET /vms/:a/diff/:b": { summary: "Config diff between two VMs" }, + "GET /find/organ/:name": { summary: "Find VMs with a specific organ" }, + "GET /find/capability/:name": { summary: "Find VMs with a specific capability" }, + "GET /stats": { summary: "Summary statistics" }, + "POST /snapshot": { summary: "Create a DB snapshot and prune old ones" }, + "GET /_panel": { summary: "HTML dashboard with tree visualization", response: "text/html" }, + }, +}; + +export default vmTree; diff --git a/services/vm-tree/store.ts b/services/vm-tree/store.ts new file mode 100644 index 0000000..3de1d27 --- /dev/null +++ b/services/vm-tree/store.ts @@ -0,0 +1,401 @@ +/** + * VM Tree store — SQLite-backed VM lineage tree. + * + * This is the canonical VM tree from the architecture spec: + * roof reef (SQLite VM tree, module distribution) + * └── lieutenants (1:many, snapshot to create) + * └── swarm workers / agent VMs (fleets) + * + * Schema tracks: + * - Parent-child relationships (lineage) + * - VM category (lieutenant, swarm_vm, agent_vm, infra_vm) + * - Reef config per VM (the "DNA" — organs + capabilities) + * - Creation/update timestamps + * + * Separate from registry: registry tracks live VM health/heartbeats, + * vm-tree tracks the permanent lineage and config history. + */ + +import { Database } from "bun:sqlite"; +import { existsSync, mkdirSync, copyFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { ulid } from "ulid"; + +// ============================================================================= +// Types +// ============================================================================= + +export type VMCategory = "lieutenant" | "swarm_vm" | "agent_vm" | "infra_vm"; + +export interface ReefConfig { + organs: string[]; + capabilities: string[]; +} + +export interface VMNode { + vmId: string; + name: string; + parentVmId: string | null; + category: VMCategory; + reefConfig: ReefConfig; + createdAt: string; + updatedAt: string; +} + +export interface CreateVMInput { + vmId?: string; + name: string; + parentVmId?: string; + category: VMCategory; + reefConfig?: ReefConfig; +} + +export interface UpdateVMInput { + name?: string; + category?: VMCategory; + reefConfig?: ReefConfig; +} + +export interface TreeView { + vm: VMNode; + children: TreeView[]; +} + +// ============================================================================= +// Constants +// ============================================================================= + +const VALID_CATEGORIES = new Set(["lieutenant", "swarm_vm", "agent_vm", "infra_vm"]); +const DEFAULT_CONFIG: ReefConfig = { organs: [], capabilities: [] }; + +// ============================================================================= +// Store +// ============================================================================= + +export class VMTreeStore { + private db: Database; + private dbPath: string; + + constructor(dbPath = "data/vms.sqlite") { + this.dbPath = dbPath; + const dir = dirname(dbPath); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + + this.db = new Database(dbPath); + this.db.exec("PRAGMA journal_mode=WAL"); + this.initTables(); + } + + private initTables(): void { + this.db.exec(` + CREATE TABLE IF NOT EXISTS vms ( + vm_id TEXT PRIMARY KEY, + name TEXT NOT NULL, + parent_vm_id TEXT REFERENCES vms(vm_id), + category TEXT NOT NULL CHECK(category IN ('lieutenant', 'swarm_vm', 'agent_vm', 'infra_vm')), + reef_config TEXT NOT NULL DEFAULT '{"organs":[],"capabilities":[]}', + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + `); + + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_vms_parent ON vms(parent_vm_id)`); + this.db.exec(`CREATE INDEX IF NOT EXISTS idx_vms_category ON vms(category)`); + } + + // ========================================================================= + // CRUD + // ========================================================================= + + create(input: CreateVMInput): VMNode { + if (!input.name?.trim()) throw new Error("name is required"); + if (!input.category || !VALID_CATEGORIES.has(input.category)) { + throw new Error(`invalid category: ${input.category}`); + } + + // Validate parent exists if specified + if (input.parentVmId) { + const parent = this.get(input.parentVmId); + if (!parent) throw new Error(`parent VM '${input.parentVmId}' not found`); + } + + const vmId = input.vmId || ulid(); + const now = new Date().toISOString(); + + this.db.run( + `INSERT INTO vms (vm_id, name, parent_vm_id, category, reef_config, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + [ + vmId, + input.name.trim(), + input.parentVmId || null, + input.category, + JSON.stringify(input.reefConfig || DEFAULT_CONFIG), + now, + now, + ], + ); + + return this.get(vmId)!; + } + + get(vmId: string): VMNode | undefined { + const row = this.db.query("SELECT * FROM vms WHERE vm_id = ?").get(vmId) as any; + return row ? rowToNode(row) : undefined; + } + + update(vmId: string, input: UpdateVMInput): VMNode { + const vm = this.get(vmId); + if (!vm) throw new Error(`VM '${vmId}' not found`); + + if (input.category && !VALID_CATEGORIES.has(input.category)) { + throw new Error(`invalid category: ${input.category}`); + } + + const sets: string[] = []; + const params: any[] = []; + + if (input.name !== undefined) { + sets.push("name = ?"); + params.push(input.name.trim()); + } + if (input.category !== undefined) { + sets.push("category = ?"); + params.push(input.category); + } + if (input.reefConfig !== undefined) { + sets.push("reef_config = ?"); + params.push(JSON.stringify(input.reefConfig)); + } + + sets.push("updated_at = ?"); + params.push(new Date().toISOString()); + params.push(vmId); + + this.db.run(`UPDATE vms SET ${sets.join(", ")} WHERE vm_id = ?`, params); + return this.get(vmId)!; + } + + remove(vmId: string): boolean { + // Check for children — don't orphan them + const kids = this.children(vmId); + if (kids.length > 0) { + throw new Error(`VM '${vmId}' has ${kids.length} children. Remove or reassign them first.`); + } + const result = this.db.run("DELETE FROM vms WHERE vm_id = ?", [vmId]); + return result.changes > 0; + } + + list(filters?: { category?: VMCategory; parentVmId?: string }): VMNode[] { + let sql = "SELECT * FROM vms"; + const conditions: string[] = []; + const params: any[] = []; + + if (filters?.category) { + conditions.push("category = ?"); + params.push(filters.category); + } + if (filters?.parentVmId) { + conditions.push("parent_vm_id = ?"); + params.push(filters.parentVmId); + } + + if (conditions.length) sql += ` WHERE ${conditions.join(" AND ")}`; + sql += " ORDER BY created_at"; + + return this.db + .query(sql) + .all(...params) + .map(rowToNode); + } + + // ========================================================================= + // Lineage queries + // ========================================================================= + + children(vmId: string): VMNode[] { + return this.db + .query("SELECT * FROM vms WHERE parent_vm_id = ? ORDER BY created_at") + .all(vmId) + .map(rowToNode); + } + + ancestors(vmId: string): VMNode[] { + const result: VMNode[] = []; + let currentId: string | null = vmId; + const seen = new Set(); + + while (currentId) { + if (seen.has(currentId)) break; + seen.add(currentId); + const vm = this.get(currentId); + if (!vm) break; + result.unshift(vm); + currentId = vm.parentVmId; + } + + return result; + } + + descendants(vmId: string): VMNode[] { + const result: VMNode[] = []; + const queue: string[] = [vmId]; + const seen = new Set(); + + while (queue.length > 0) { + const id = queue.shift()!; + if (seen.has(id)) continue; + seen.add(id); + + const kids = this.children(id); + for (const kid of kids) { + result.push(kid); + queue.push(kid.vmId); + } + } + + return result; + } + + /** Build a full tree view from a root (or all roots if no vmId given) */ + tree(vmId?: string): TreeView[] { + if (vmId) { + const vm = this.get(vmId); + if (!vm) return []; + return [this.buildTree(vm)]; + } + + // All roots (VMs with no parent) + const roots = this.db + .query("SELECT * FROM vms WHERE parent_vm_id IS NULL ORDER BY created_at") + .all() + .map(rowToNode); + + return roots.map((r) => this.buildTree(r)); + } + + private buildTree(vm: VMNode): TreeView { + const kids = this.children(vm.vmId); + return { + vm, + children: kids.map((k) => this.buildTree(k)), + }; + } + + // ========================================================================= + // Config queries + // ========================================================================= + + /** Compare reef configs between two VMs */ + configDiff(vmIdA: string, vmIdB: string): { added: ReefConfig; removed: ReefConfig } | null { + const a = this.get(vmIdA); + const b = this.get(vmIdB); + if (!a || !b) return null; + + return { + added: { + organs: b.reefConfig.organs.filter((o) => !a.reefConfig.organs.includes(o)), + capabilities: b.reefConfig.capabilities.filter((c) => !a.reefConfig.capabilities.includes(c)), + }, + removed: { + organs: a.reefConfig.organs.filter((o) => !b.reefConfig.organs.includes(o)), + capabilities: a.reefConfig.capabilities.filter((c) => !b.reefConfig.capabilities.includes(c)), + }, + }; + } + + /** Find VMs that have a specific organ loaded */ + findByOrgan(organ: string): VMNode[] { + // SQLite JSON — use LIKE for simplicity since json_each requires extension + return this.db + .query(`SELECT * FROM vms WHERE reef_config LIKE ?`) + .all(`%"${organ}"%`) + .map(rowToNode); + } + + /** Find VMs that have a specific capability */ + findByCapability(capability: string): VMNode[] { + return this.db + .query(`SELECT * FROM vms WHERE reef_config LIKE ?`) + .all(`%"${capability}"%`) + .map(rowToNode); + } + + // ========================================================================= + // Snapshots + // ========================================================================= + + /** Create a snapshot of the database */ + snapshot(snapshotDir = "data/snapshots"): string { + if (!existsSync(snapshotDir)) mkdirSync(snapshotDir, { recursive: true }); + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const snapshotPath = join(snapshotDir, `vms-${timestamp}.sqlite`); + copyFileSync(this.dbPath, snapshotPath); + return snapshotPath; + } + + /** Clean old snapshots, keeping the most recent N */ + pruneSnapshots(snapshotDir = "data/snapshots", keep = 24): number { + if (!existsSync(snapshotDir)) return 0; + + const { readdirSync, unlinkSync } = require("node:fs") as typeof import("node:fs"); + const files = readdirSync(snapshotDir) + .filter((f: string) => f.startsWith("vms-") && f.endsWith(".sqlite")) + .sort() + .reverse(); + + let removed = 0; + for (let i = keep; i < files.length; i++) { + try { + unlinkSync(join(snapshotDir, files[i])); + removed++; + } catch { + /* ignore */ + } + } + return removed; + } + + // ========================================================================= + // Stats + // ========================================================================= + + stats(): { total: number; byCategory: Record; roots: number } { + const total = (this.db.query("SELECT COUNT(*) as c FROM vms").get() as any)?.c || 0; + const roots = (this.db.query("SELECT COUNT(*) as c FROM vms WHERE parent_vm_id IS NULL").get() as any)?.c || 0; + + const byCategory: Record = {}; + const rows = this.db.query("SELECT category, COUNT(*) as c FROM vms GROUP BY category").all() as any[]; + for (const row of rows) { + byCategory[row.category] = row.c; + } + + return { total, byCategory, roots }; + } + + count(): number { + return (this.db.query("SELECT COUNT(*) as c FROM vms").get() as any)?.c || 0; + } + + flush(): void {} + + close(): void { + this.db.close(); + } +} + +// ============================================================================= +// Row mapper +// ============================================================================= + +function rowToNode(row: any): VMNode { + return { + vmId: row.vm_id, + name: row.name, + parentVmId: row.parent_vm_id || null, + category: row.category, + reefConfig: JSON.parse(row.reef_config || '{"organs":[],"capabilities":[]}'), + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +}