Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions cloudflare-gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const eventAbortControllers = new Map<string, AbortController>();
const eventSinks = new Set<(agentId: string, event: string, data: unknown) => void>();
// Per-agent idle timers — fires exit when no nudges arrive
const idleTimers = new Map<string, ReturnType<typeof setTimeout>>();
// LOC diff stats poller interval
let locStatsTimer: ReturnType<typeof setInterval> | undefined;
const LOC_STATS_INTERVAL_MS = 30_000; // 30s

let nextPort = 4096;
const startTime = Date.now();
Expand Down Expand Up @@ -135,6 +138,143 @@ function broadcastEvent(agentId: string, event: string, data: unknown): void {
}
}

/**
* Report token/cost usage to TownDO for metrics aggregation.
* Extracts input_tokens, output_tokens, and cost from message.completed
* or assistant.completed event properties.
* Fire-and-forget — errors are silently ignored.
*/
function reportTokenUsage(agent: ManagedAgent, properties: Record<string, unknown>): void {
// The SDK emits usage data in various shapes depending on the provider.
// Common shapes: { usage: { inputTokens, outputTokens } },
// { input_tokens, output_tokens }, { usage: { input_tokens, output_tokens } }
const usage =
properties.usage && typeof properties.usage === 'object'
? (properties.usage as Record<string, unknown>)
: properties;

const inputTokens = Number(usage.inputTokens ?? usage.input_tokens ?? 0) || 0;
const outputTokens = Number(usage.outputTokens ?? usage.output_tokens ?? 0) || 0;

// Cost might be in microdollars or dollars depending on provider
let costMicrodollars = 0;
if (usage.cost_microdollars != null) {
costMicrodollars = Number(usage.cost_microdollars) || 0;
} else if (usage.cost != null) {
// Assume cost is in dollars, convert to microdollars
costMicrodollars = Math.round((Number(usage.cost) || 0) * 1_000_000);
}

if (inputTokens === 0 && outputTokens === 0 && costMicrodollars === 0) return;

const apiUrl = agent.gastownApiUrl;
const authToken =
process.env.GASTOWN_CONTAINER_TOKEN ?? agent.gastownContainerToken ?? agent.gastownSessionToken;
if (!apiUrl || !authToken) return;

const headers: Record<string, string> = {
'Content-Type': 'application/json',
Authorization: `Bearer ${authToken}`,
};
if (process.env.GASTOWN_CONTAINER_TOKEN || agent.gastownContainerToken) {
headers['X-Gastown-Agent-Id'] = agent.agentId;
if (agent.rigId) headers['X-Gastown-Rig-Id'] = agent.rigId;
}

fetch(`${apiUrl}/api/towns/${agent.townId ?? '_'}/usage`, {
method: 'POST',
headers,
body: JSON.stringify({
input_tokens: inputTokens,
output_tokens: outputTokens,
cost_microdollars: costMicrodollars,
}),
}).catch(() => {
// Best-effort — don't block the event loop
});
}

/**
* Poll git diff stats for all active agents and report aggregate LOC to TownDO.
* Uses the SDK's worktree.diffSummary() to compare against the default branch.
* Runs every 30s.
*/
async function pollLocStats(): Promise<void> {
let totalAdditions = 0;
let totalDeletions = 0;
let townId: string | null = null;
let apiUrl: string | null = null;
let authToken: string | null = null;

for (const agent of agents.values()) {
if (agent.status !== 'running' && agent.status !== 'starting') continue;
if (!agent.defaultBranch) continue;

// Capture credentials from first viable agent
if (!townId) {
townId = agent.townId;
apiUrl = agent.gastownApiUrl;
authToken =
process.env.GASTOWN_CONTAINER_TOKEN ??
agent.gastownContainerToken ??
agent.gastownSessionToken;
}

const instance = sdkInstances.get(agent.workdir);
if (!instance) continue;

try {
const base = `origin/${agent.defaultBranch}`;
const result = await instance.client.worktree.diffSummary(
{ directory: agent.workdir, base },
{ throwOnError: true }
);
const diffs = result.data;
if (Array.isArray(diffs)) {
for (const diff of diffs) {
const d = diff as { additions?: number; deletions?: number };
totalAdditions += Number(d.additions ?? 0);
totalDeletions += Number(d.deletions ?? 0);
}
}
} catch {
// Agent may not have a git worktree yet or diff may fail — skip
}
}

// Report aggregate LOC to TownDO
if (townId && apiUrl && authToken && (totalAdditions > 0 || totalDeletions > 0)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: The poller never clears LOC back to zero

This POST is skipped whenever totalAdditions and totalDeletions both become 0. Because setLoc() keeps the last reported snapshot until another /loc call arrives, the town can keep showing stale non-zero line counts after a branch is rebased/merged cleanly or after all agents go idle. The poller needs to report the 0/0 state as well so the gauges can reset.

const headers: Record<string, string> = {
'Content-Type': 'application/json',
Authorization: `Bearer ${authToken}`,
};
fetch(`${apiUrl}/api/towns/${townId}/loc`, {
method: 'POST',
headers,
body: JSON.stringify({ additions: totalAdditions, deletions: totalDeletions }),
}).catch(() => {});
}
}

function ensureLocStatsPoller(): void {
if (locStatsTimer) return;
locStatsTimer = setInterval(() => {
void pollLocStats();
}, LOC_STATS_INTERVAL_MS);
// Run immediately on first start
void pollLocStats();
}

function stopLocStatsPollerIfIdle(): void {
const hasActive = [...agents.values()].some(
a => a.status === 'running' || a.status === 'starting'
);
if (!hasActive && locStatsTimer) {
clearInterval(locStatsTimer);
locStatsTimer = undefined;
}
}

/**
* Get or create an SDK server instance for a workdir.
*
Expand Down Expand Up @@ -461,6 +601,14 @@ async function subscribeToEvents(
// Broadcast to WebSocket sinks
broadcastEvent(agent.agentId, event.type ?? 'unknown', event.properties ?? {});

// Report token usage to TownDO for throughput gauges
if (
(event.type === 'message.completed' || event.type === 'assistant.completed') &&
event.properties
) {
reportTokenUsage(agent, event.properties as Record<string, unknown>);
}

if (event.type === 'session.idle') {
if (request.role === 'mayor') {
// Mayor agents are persistent — session.idle means "turn done", not exit.
Expand Down Expand Up @@ -543,6 +691,7 @@ export async function startAgent(
gastownSessionToken: request.envVars?.GASTOWN_SESSION_TOKEN ?? null,
completionCallbackUrl: request.envVars?.GASTOWN_COMPLETION_CALLBACK_URL ?? null,
model: request.model ?? null,
defaultBranch: request.defaultBranch ?? null,
};
agents.set(request.agentId, agent);

Expand Down Expand Up @@ -600,6 +749,9 @@ export async function startAgent(
}
agent.messageCount = 1;

// Start LOC diff stats poller when first agent becomes active
ensureLocStatsPoller();

log.info('agent.start', {
agentId: request.agentId,
role: request.role,
Expand Down Expand Up @@ -660,6 +812,7 @@ export async function stopAgent(agentId: string): Promise<void> {
agent.exitReason = 'stopped';
log.info('agent.exit', { agentId, reason: 'stopped', exitReason: 'stopped' });
broadcastEvent(agentId, 'agent.exited', { reason: 'stopped' });
stopLocStatsPollerIfIdle();
}

/**
Expand Down
2 changes: 2 additions & 0 deletions cloudflare-gastown/container/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ export type ManagedAgent = {
completionCallbackUrl: string | null;
/** Model ID used for this agent's sessions (e.g. "anthropic/claude-sonnet-4.6") */
model: string | null;
/** Default branch for git diff stats (e.g. "main") */
defaultBranch: string | null;
};

export type AgentStatusResponse = {
Expand Down
72 changes: 72 additions & 0 deletions cloudflare-gastown/src/db/tables/metrics-snapshots.table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { z } from 'zod';
import { getTableFromZodSchema, getCreateTableQueryFromTable } from '../../util/table';

/**
* Periodic metrics snapshots collected by the alarm handler.
* One row per alarm tick (~5s when active, ~60s when idle).
* Used for timeseries charts on the observability tab.
*/
export const MetricsSnapshotRecord = z.object({
snapshot_id: z.string(),
/** ISO timestamp of when the snapshot was taken */
captured_at: z.string(),
/** Number of agents in 'working' status */
agents_working: z.coerce.number(),
/** Number of agents in 'idle' status */
agents_idle: z.coerce.number(),
/** Total registered agents */
agents_total: z.coerce.number(),
/** Beads currently open */
beads_open: z.coerce.number(),
/** Beads currently in_progress */
beads_in_progress: z.coerce.number(),
/** Beads currently in_review */
beads_in_review: z.coerce.number(),
/** Bead events recorded since last snapshot */
events_since_last: z.coerce.number(),
/** Beads created since last snapshot */
beads_created_since_last: z.coerce.number(),
/** Beads closed since last snapshot */
beads_closed_since_last: z.coerce.number(),
/** Accumulated input tokens since last snapshot */
input_tokens_since_last: z.coerce.number(),
/** Accumulated output tokens since last snapshot */
output_tokens_since_last: z.coerce.number(),
/** Accumulated cost in microdollars since last snapshot */
cost_microdollars_since_last: z.coerce.number(),
/** Total LOC additions across all agents (absolute snapshot, not delta) */
loc_additions: z.coerce.number(),
/** Total LOC deletions across all agents (absolute snapshot, not delta) */
loc_deletions: z.coerce.number(),
});

export type MetricsSnapshotRecord = z.output<typeof MetricsSnapshotRecord>;

export const metrics_snapshots = getTableFromZodSchema('metrics_snapshots', MetricsSnapshotRecord);

export function createTableMetricsSnapshots(): string {
return getCreateTableQueryFromTable(metrics_snapshots, {
snapshot_id: 'text primary key',
captured_at: 'text not null',
agents_working: 'integer not null default 0',
agents_idle: 'integer not null default 0',
agents_total: 'integer not null default 0',
beads_open: 'integer not null default 0',
beads_in_progress: 'integer not null default 0',
beads_in_review: 'integer not null default 0',
events_since_last: 'integer not null default 0',
beads_created_since_last: 'integer not null default 0',
beads_closed_since_last: 'integer not null default 0',
input_tokens_since_last: 'integer not null default 0',
output_tokens_since_last: 'integer not null default 0',
cost_microdollars_since_last: 'integer not null default 0',
loc_additions: 'integer not null default 0',
loc_deletions: 'integer not null default 0',
});
}

export function getIndexesMetricsSnapshots(): string[] {
return [
`CREATE INDEX IF NOT EXISTS idx_metrics_snapshots_captured ON ${metrics_snapshots}(${metrics_snapshots.columns.captured_at})`,
];
}
Loading