diff --git a/apps/web/src/app/admin/api/kiloclaw-controller-telemetry/hooks.ts b/apps/web/src/app/admin/api/kiloclaw-controller-telemetry/hooks.ts new file mode 100644 index 0000000000..dca1b0abfb --- /dev/null +++ b/apps/web/src/app/admin/api/kiloclaw-controller-telemetry/hooks.ts @@ -0,0 +1,34 @@ +'use client'; + +import { useQuery } from '@tanstack/react-query'; + +export type ControllerTelemetryRow = { + timestamp: string; + sandbox_id: string; + machine_id: string; + disk_used_bytes: number; + disk_total_bytes: number; +}; + +type AnalyticsEngineResponse = { + data: T[]; + meta: { name: string; type: string }[]; + rows: number; +}; + +export function useControllerTelemetryDiskUsage(sandboxId: string) { + return useQuery>({ + queryKey: ['kiloclaw-controller-telemetry', 'disk-usage', sandboxId], + queryFn: async () => { + const response = await fetch( + `/admin/api/kiloclaw-controller-telemetry?sandboxId=${encodeURIComponent(sandboxId)}` + ); + if (!response.ok) { + throw new Error('Failed to fetch controller telemetry disk usage'); + } + return response.json() as Promise>; + }, + enabled: !!sandboxId, + refetchInterval: 60_000, + }); +} diff --git a/apps/web/src/app/admin/api/kiloclaw-controller-telemetry/route.ts b/apps/web/src/app/admin/api/kiloclaw-controller-telemetry/route.ts new file mode 100644 index 0000000000..7ab168b87d --- /dev/null +++ b/apps/web/src/app/admin/api/kiloclaw-controller-telemetry/route.ts @@ -0,0 +1,82 @@ +import type { NextRequest } from 'next/server'; +import { NextResponse } from 'next/server'; +import { getUserFromAuth } from '@/lib/user.server'; +import { getEnvVariable } from '@/lib/dotenvx'; + +function isSafeIdentifier(value: string): boolean { + return /^[A-Za-z0-9_-]+$/.test(value); +} + +function buildQuery(sandboxId: string): string { + return `SELECT + timestamp, + blob1 AS sandbox_id, + blob8 AS machine_id, + double7 AS disk_used_bytes, + double8 AS disk_total_bytes +FROM kiloclaw_controller_telemetry +WHERE index1 = '${sandboxId}' +ORDER BY timestamp DESC +LIMIT 1 +FORMAT JSON`; +} + +type AnalyticsEngineResponse = { + data: Record[]; + meta: { name: string; type: string }[]; + rows: number; +}; + +export async function GET( + request: NextRequest +): Promise> { + const { authFailedResponse } = await getUserFromAuth({ adminOnly: true }); + if (authFailedResponse) { + return authFailedResponse; + } + + const { searchParams } = new URL(request.url); + const sandboxId = searchParams.get('sandboxId'); + + if (!sandboxId || !isSafeIdentifier(sandboxId)) { + return NextResponse.json({ error: 'Invalid or missing sandboxId' }, { status: 400 }); + } + + const accountId = getEnvVariable('R2_ACCOUNT_ID'); + const token = getEnvVariable('CF_ANALYTICS_ENGINE_TOKEN'); + + if (!accountId || !token) { + return NextResponse.json( + { error: 'Missing Cloudflare Analytics Engine configuration' }, + { status: 500 } + ); + } + + const sqlQuery = buildQuery(sandboxId); + + try { + const response = await fetch( + `https://api.cloudflare.com/client/v4/accounts/${accountId}/analytics_engine/sql`, + { + method: 'POST', + headers: { Authorization: `Bearer ${token}` }, + body: sqlQuery, + } + ); + + if (!response.ok) { + const errorText = await response.text(); + console.error('Analytics Engine API error:', response.status, errorText); + return NextResponse.json( + { error: `Analytics Engine API error: ${response.status}` }, + { status: 500 } + ); + } + + const result: AnalyticsEngineResponse = await response.json(); + return NextResponse.json(result); + } catch (error) { + console.error('Analytics Engine request failed:', error); + return NextResponse.json({ error: 'Failed to query Analytics Engine' }, { status: 500 }); + } +} diff --git a/apps/web/src/app/admin/components/KiloclawInstances/KiloclawInstanceDetail.tsx b/apps/web/src/app/admin/components/KiloclawInstances/KiloclawInstanceDetail.tsx index ef0663d3e5..ce3911b87c 100644 --- a/apps/web/src/app/admin/components/KiloclawInstances/KiloclawInstanceDetail.tsx +++ b/apps/web/src/app/admin/components/KiloclawInstances/KiloclawInstanceDetail.tsx @@ -70,6 +70,7 @@ import { type KiloclawEventRow, type KiloclawAllEventRow, } from '@/app/admin/api/kiloclaw-analytics/hooks'; +import { useControllerTelemetryDiskUsage } from '@/app/admin/api/kiloclaw-controller-telemetry/hooks'; function parseTimestamp(timestamp: string): Date { const normalized = timestamp.includes('T') ? timestamp : timestamp.replace(' ', 'T'); @@ -1198,6 +1199,12 @@ export function KiloclawInstanceDetail({ instanceId }: { instanceId: string }) { enabled: !!userId, }); + const sandboxId = data?.sandbox_id; + const aeDiskUsage = useControllerTelemetryDiskUsage(sandboxId ?? ''); + const aeRow = aeDiskUsage.data?.data?.[0]; + const diskUsed = aeRow && aeRow.disk_used_bytes > 0 ? aeRow.disk_used_bytes : null; + const diskTotal = aeRow && aeRow.disk_total_bytes > 0 ? aeRow.disk_total_bytes : null; + const { mutateAsync: destroyInstance, isPending: isDestroying } = useMutation( trpc.admin.kiloclawInstances.destroy.mutationOptions({ onSuccess: () => { @@ -1687,10 +1694,7 @@ export function KiloclawInstanceDetail({ instanceId }: { instanceId: string }) {
- {formatVolumeUsageLine( - data.workerStatus?.diskUsedBytes, - data.workerStatus?.diskTotalBytes - )} + {formatVolumeUsageLine(diskUsed, diskTotal)}
diff --git a/apps/web/src/lib/kiloclaw/types.ts b/apps/web/src/lib/kiloclaw/types.ts index 45ed644689..08eea00e44 100644 --- a/apps/web/src/lib/kiloclaw/types.ts +++ b/apps/web/src/lib/kiloclaw/types.ts @@ -219,8 +219,6 @@ export type PlatformDebugStatusResponse = PlatformStatusResponse & { restoreStartedAt: string | null; pendingRestoreVolumeId: string | null; instanceReadyEmailSent: boolean; - diskUsedBytes: number | null; - diskTotalBytes: number | null; }; export type CleanupRecoveryPreviousVolumeResponse = { diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts b/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts index 8a0d41bfa7..2a09368b8d 100644 --- a/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts +++ b/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts @@ -1674,8 +1674,6 @@ export class KiloClawInstance extends DurableObject { restoreStartedAt: string | null; pendingRestoreVolumeId: string | null; instanceReadyEmailSent: boolean; - diskUsedBytes: number | null; - diskTotalBytes: number | null; }> { await this.loadState(); const alarmScheduledAt = await this.ctx.storage.getAlarm(); @@ -1729,8 +1727,6 @@ export class KiloClawInstance extends DurableObject { restoreStartedAt: this.s.restoreStartedAt, pendingRestoreVolumeId: this.s.pendingRestoreVolumeId, instanceReadyEmailSent: this.s.instanceReadyEmailSent, - diskUsedBytes: this.s.diskUsedBytes, - diskTotalBytes: this.s.diskTotalBytes, }; } @@ -1770,17 +1766,6 @@ export class KiloClawInstance extends DurableObject { return { shouldNotify: true, userId: this.s.userId }; } - /** Persist disk usage from controller checkin, or clear when the pair is missing/null. */ - async recordDiskStats(usedBytes: number | null, totalBytes: number | null): Promise { - await this.loadState(); - this.s.diskUsedBytes = usedBytes; - this.s.diskTotalBytes = totalBytes; - await this.persist({ - diskUsedBytes: this.s.diskUsedBytes, - diskTotalBytes: this.s.diskTotalBytes, - }); - } - async listVolumeSnapshots(): Promise { await this.loadState(); if (!this.s.flyVolumeId) return []; diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-instance/state.ts b/services/kiloclaw/src/durable-objects/kiloclaw-instance/state.ts index a4907945c6..e4f409fb7c 100644 --- a/services/kiloclaw/src/durable-objects/kiloclaw-instance/state.ts +++ b/services/kiloclaw/src/durable-objects/kiloclaw-instance/state.ts @@ -224,8 +224,6 @@ export async function loadState(ctx: DurableObjectState, s: InstanceMutableState // Legacy instances pre-dating this field treat absence as already-sent // to avoid spurious emails after deploy. s.instanceReadyEmailSent = 'instanceReadyEmailSent' in raw ? d.instanceReadyEmailSent : true; - s.diskUsedBytes = d.diskUsedBytes; - s.diskTotalBytes = d.diskTotalBytes; s.customSecretMeta = d.customSecretMeta; s.streamChatApiKey = d.streamChatApiKey; s.streamChatBotUserId = d.streamChatBotUserId; @@ -311,8 +309,6 @@ export function resetMutableState(s: InstanceMutableState): void { s.preRestoreStatus = null; s.pendingRestoreVolumeId = null; s.instanceReadyEmailSent = false; - s.diskUsedBytes = null; - s.diskTotalBytes = null; s.streamChatApiKey = null; s.streamChatBotUserId = null; s.streamChatBotUserToken = null; @@ -391,8 +387,6 @@ export function createMutableState(): InstanceMutableState { preRestoreStatus: null, pendingRestoreVolumeId: null, instanceReadyEmailSent: false, - diskUsedBytes: null, - diskTotalBytes: null, customSecretMeta: null, streamChatApiKey: null, streamChatBotUserId: null, diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-instance/types.ts b/services/kiloclaw/src/durable-objects/kiloclaw-instance/types.ts index c4f83140b0..d9546f09dc 100644 --- a/services/kiloclaw/src/durable-objects/kiloclaw-instance/types.ts +++ b/services/kiloclaw/src/durable-objects/kiloclaw-instance/types.ts @@ -117,8 +117,6 @@ export type InstanceMutableState = { preRestoreStatus: InstanceStatus | null; pendingRestoreVolumeId: string | null; instanceReadyEmailSent: boolean; - diskUsedBytes: number | null; - diskTotalBytes: number | null; customSecretMeta: PersistedState['customSecretMeta']; // Stream Chat default channel (auto-provisioned) streamChatApiKey: string | null; diff --git a/services/kiloclaw/src/routes/controller.test.ts b/services/kiloclaw/src/routes/controller.test.ts index 63749a6afc..e5b1ff7be4 100644 --- a/services/kiloclaw/src/routes/controller.test.ts +++ b/services/kiloclaw/src/routes/controller.test.ts @@ -2,6 +2,12 @@ import { describe, expect, it, vi, type Mock } from 'vitest'; import { controller } from './controller'; import { deriveGatewayToken } from '../auth/gateway-token'; +type AnalyticsEngineDataPoint = { + blobs: string[]; + doubles: number[]; + indexes: string[]; +}; + vi.mock('cloudflare:workers', () => ({ waitUntil: (p: Promise) => p, })); @@ -30,7 +36,7 @@ const sandboxId = 'dXNlci0x'; function makeEnv(options?: { gatewayTokenSecret?: string; kilocodeApiKey?: string; - writeDataPoint?: (payload: unknown) => void; + writeDataPoint?: (payload: AnalyticsEngineDataPoint) => void; posthogKey?: string; hyperdriveConnectionString?: string; workerEnv?: string; @@ -47,7 +53,6 @@ function makeEnv(options?: { botVibe: 'Dry wit', botEmoji: '🤖', }); - const recordDiskStats = vi.fn().mockResolvedValue(undefined); const tryMarkInstanceReady = options?.tryMarkInstanceReady ?? vi.fn().mockResolvedValue({ shouldNotify: false, userId: null }); @@ -58,7 +63,7 @@ function makeEnv(options?: { INTERNAL_API_SECRET: options?.internalApiSecret, KILOCLAW_INSTANCE: { idFromName: (userId: string) => userId, - get: () => ({ getConfig, getStatus, recordDiskStats, tryMarkInstanceReady }), + get: () => ({ getConfig, getStatus, tryMarkInstanceReady }), }, KILOCLAW_CONTROLLER_AE: options?.writeDataPoint ? { @@ -114,6 +119,17 @@ async function makeAuthHeaders(targetSandboxId = sandboxId) { }; } +function analyticsEvents(writeDataPoint: Mock): AnalyticsEngineDataPoint[] { + const calls = writeDataPoint.mock.calls as [AnalyticsEngineDataPoint][]; + return calls.map(([call]) => call); +} + +function firstAnalyticsEvent(writeDataPoint: Mock): AnalyticsEngineDataPoint { + const [call] = analyticsEvents(writeDataPoint); + expect(call).toBeDefined(); + return call; +} + describe('POST /checkin', () => { it('returns 401 when required auth headers are missing', async () => { const response = await controller.request( @@ -148,7 +164,7 @@ describe('POST /checkin', () => { }); it('returns 204 and writes AE datapoint when both tokens are valid', async () => { - const writeDataPoint = vi.fn(); + const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>(); const env = makeEnv({ writeDataPoint }); const headers = await makeAuthHeaders(); @@ -160,6 +176,99 @@ describe('POST /checkin', () => { expect(response.status).toBe(204); expect(writeDataPoint).toHaveBeenCalledTimes(1); + + const call = firstAnalyticsEvent(writeDataPoint); + expect(call.doubles).toHaveLength(8); + expect(call.doubles[6]).toBe(0); + expect(call.doubles[7]).toBe(0); + }); + + it('writes disk usage doubles when disk stats are present', async () => { + const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>(); + const env = makeEnv({ writeDataPoint }); + const headers = await makeAuthHeaders(); + + const response = await controller.request( + '/checkin', + { + method: 'POST', + headers, + body: JSON.stringify(makeBody({ diskUsedBytes: 1024000, diskTotalBytes: 5368709120 })), + }, + env + ); + + expect(response.status).toBe(204); + expect(writeDataPoint).toHaveBeenCalledTimes(1); + + const call = firstAnalyticsEvent(writeDataPoint); + expect(call.doubles).toHaveLength(8); + expect(call.doubles[6]).toBe(1024000); + expect(call.doubles[7]).toBe(5368709120); + }); + + it('normalizes null disk usage doubles to zero', async () => { + const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>(); + const env = makeEnv({ writeDataPoint }); + const headers = await makeAuthHeaders(); + + const response = await controller.request( + '/checkin', + { + method: 'POST', + headers, + body: JSON.stringify(makeBody({ diskUsedBytes: null, diskTotalBytes: null })), + }, + env + ); + + expect(response.status).toBe(204); + expect(writeDataPoint).toHaveBeenCalledTimes(1); + + const call = firstAnalyticsEvent(writeDataPoint); + expect(call.doubles).toHaveLength(8); + expect(call.doubles[6]).toBe(0); + expect(call.doubles[7]).toBe(0); + }); + + it('clamps negative disk usage doubles to zero', async () => { + const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>(); + const env = makeEnv({ writeDataPoint }); + const headers = await makeAuthHeaders(); + + const response = await controller.request( + '/checkin', + { + method: 'POST', + headers, + body: JSON.stringify(makeBody({ diskUsedBytes: -1, diskTotalBytes: -1 })), + }, + env + ); + + expect(response.status).toBe(204); + expect(writeDataPoint).toHaveBeenCalledTimes(1); + + const call = firstAnalyticsEvent(writeDataPoint); + expect(call.doubles).toHaveLength(8); + expect(call.doubles[6]).toBe(0); + expect(call.doubles[7]).toBe(0); + }); + + it('still returns 204 when AE write throws', async () => { + const writeDataPoint = vi + .fn<(payload: AnalyticsEngineDataPoint) => Promise>() + .mockRejectedValue(new Error('AE error')); + const env = makeEnv({ writeDataPoint }); + const headers = await makeAuthHeaders(); + + const response = await controller.request( + '/checkin', + { method: 'POST', headers, body: JSON.stringify(makeBody()) }, + env + ); + + expect(response.status).toBe(204); }); it('does not call PostHog when productTelemetry is absent', async () => { diff --git a/services/kiloclaw/src/routes/controller.ts b/services/kiloclaw/src/routes/controller.ts index c4efe7c6ed..b8003415ff 100644 --- a/services/kiloclaw/src/routes/controller.ts +++ b/services/kiloclaw/src/routes/controller.ts @@ -29,6 +29,13 @@ const ProductTelemetrySchema = z.object({ const INSTANCE_READY_LOAD_THRESHOLD = 0.1; +const DiskBytesSchema = z + .number() + .int() + .nullable() + .optional() + .transform(value => Math.max(value ?? 0, 0)); + const CheckinSchema = z.object({ sandboxId: z.string().min(1), machineId: z.string().optional(), @@ -44,8 +51,8 @@ const CheckinSchema = z.object({ bandwidthBytesIn: z.number().min(0), bandwidthBytesOut: z.number().min(0), lastExitReason: z.string().optional(), - diskUsedBytes: z.number().int().min(0).nullable().optional(), - diskTotalBytes: z.number().int().min(0).nullable().optional(), + diskUsedBytes: DiskBytesSchema, + diskTotalBytes: DiskBytesSchema, productTelemetry: ProductTelemetrySchema.optional(), }); @@ -161,6 +168,8 @@ controller.post('/checkin', async (c: Context) => { data.loadAvg5m, data.bandwidthBytesIn, data.bandwidthBytesOut, + data.diskUsedBytes, + data.diskTotalBytes, ], indexes: [data.sandboxId], }); @@ -203,13 +212,6 @@ controller.post('/checkin', async (c: Context) => { waitUntil(telemetryPromise); } - // Persist disk stats (best-effort). Missing/null pair clears DO storage so the admin UI does not show stale usage. - try { - await stub.recordDiskStats(data.diskUsedBytes ?? null, data.diskTotalBytes ?? null); - } catch (err) { - console.error('[controller] recordDiskStats failed (non-fatal):', err); - } - // Instance readiness detection: when load drops below threshold, notify the // backend so it can send the one-time "instance ready" email and finalize // any pending async auto-resume state for this instance. diff --git a/services/kiloclaw/src/schemas/instance-config.ts b/services/kiloclaw/src/schemas/instance-config.ts index 2aa1106f55..b26e5bda01 100644 --- a/services/kiloclaw/src/schemas/instance-config.ts +++ b/services/kiloclaw/src/schemas/instance-config.ts @@ -289,9 +289,6 @@ export const PersistedStateSchema = z.object({ // Tracks whether the "instance ready" email has been sent for this provision lifecycle. // Set to true on first low-load checkin; reset on DO wipe (destroy + re-provision). instanceReadyEmailSent: z.boolean().default(false), - // Disk usage reported by the controller checkin. - diskUsedBytes: z.number().int().nullable().default(null), - diskTotalBytes: z.number().int().nullable().default(null), // Metadata for custom (non-catalog) secrets: env var name → { configPath? }. // configPath is a JSON dot-notation path for patching into openclaw.json at boot. customSecretMeta: z.record(z.string(), CustomSecretMetaSchema).nullable().default(null),