Skip to content
34 changes: 34 additions & 0 deletions apps/web/src/app/admin/api/kiloclaw-controller-telemetry/hooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use client';

import { useQuery } from '@tanstack/react-query';

export type ControllerTelemetryRow = {
timestamp: string;
sandbox_id: string;
machine_id: string;
disk_used_bytes: number;
disk_total_bytes: number;
};

type AnalyticsEngineResponse<T> = {
data: T[];
meta: { name: string; type: string }[];
rows: number;
};

export function useControllerTelemetryDiskUsage(sandboxId: string) {
return useQuery<AnalyticsEngineResponse<ControllerTelemetryRow>>({
queryKey: ['kiloclaw-controller-telemetry', 'disk-usage', sandboxId],
queryFn: async () => {
const response = await fetch(
`/admin/api/kiloclaw-controller-telemetry?sandboxId=${encodeURIComponent(sandboxId)}`
);
if (!response.ok) {
throw new Error('Failed to fetch controller telemetry disk usage');
}
return response.json() as Promise<AnalyticsEngineResponse<ControllerTelemetryRow>>;
},
enabled: !!sandboxId,
refetchInterval: 60_000,
});
}
82 changes: 82 additions & 0 deletions apps/web/src/app/admin/api/kiloclaw-controller-telemetry/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { NextRequest } from 'next/server';
import { NextResponse } from 'next/server';
import { getUserFromAuth } from '@/lib/user.server';
import { getEnvVariable } from '@/lib/dotenvx';

function isSafeIdentifier(value: string): boolean {
return /^[A-Za-z0-9_-]+$/.test(value);
}

function buildQuery(sandboxId: string): string {
return `SELECT
timestamp,
blob1 AS sandbox_id,
blob8 AS machine_id,
double7 AS disk_used_bytes,
double8 AS disk_total_bytes
FROM kiloclaw_controller_telemetry
WHERE index1 = '${sandboxId}'
ORDER BY timestamp DESC
LIMIT 1
FORMAT JSON`;
}

type AnalyticsEngineResponse = {
data: Record<string, unknown>[];
meta: { name: string; type: string }[];
rows: number;
};

export async function GET(
request: NextRequest
): Promise<NextResponse<{ error: string } | AnalyticsEngineResponse>> {
const { authFailedResponse } = await getUserFromAuth({ adminOnly: true });
if (authFailedResponse) {
return authFailedResponse;
}

const { searchParams } = new URL(request.url);
const sandboxId = searchParams.get('sandboxId');

if (!sandboxId || !isSafeIdentifier(sandboxId)) {
return NextResponse.json({ error: 'Invalid or missing sandboxId' }, { status: 400 });
}

const accountId = getEnvVariable('R2_ACCOUNT_ID');
const token = getEnvVariable('CF_ANALYTICS_ENGINE_TOKEN');

if (!accountId || !token) {
return NextResponse.json(
{ error: 'Missing Cloudflare Analytics Engine configuration' },
{ status: 500 }
);
}

const sqlQuery = buildQuery(sandboxId);

try {
const response = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${accountId}/analytics_engine/sql`,
{
method: 'POST',
headers: { Authorization: `Bearer ${token}` },
body: sqlQuery,
}
);

if (!response.ok) {
const errorText = await response.text();
console.error('Analytics Engine API error:', response.status, errorText);
return NextResponse.json(
{ error: `Analytics Engine API error: ${response.status}` },
{ status: 500 }
);
}

const result: AnalyticsEngineResponse = await response.json();
return NextResponse.json(result);
} catch (error) {
console.error('Analytics Engine request failed:', error);
return NextResponse.json({ error: 'Failed to query Analytics Engine' }, { status: 500 });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import {
type KiloclawEventRow,
type KiloclawAllEventRow,
} from '@/app/admin/api/kiloclaw-analytics/hooks';
import { useControllerTelemetryDiskUsage } from '@/app/admin/api/kiloclaw-controller-telemetry/hooks';

function parseTimestamp(timestamp: string): Date {
const normalized = timestamp.includes('T') ? timestamp : timestamp.replace(' ', 'T');
Expand Down Expand Up @@ -1198,6 +1199,12 @@ export function KiloclawInstanceDetail({ instanceId }: { instanceId: string }) {
enabled: !!userId,
});

const sandboxId = data?.sandbox_id;
const aeDiskUsage = useControllerTelemetryDiskUsage(sandboxId ?? '');
const aeRow = aeDiskUsage.data?.data?.[0];
const diskUsed = aeRow && aeRow.disk_used_bytes > 0 ? aeRow.disk_used_bytes : null;
Comment thread
evanjacobson marked this conversation as resolved.
const diskTotal = aeRow && aeRow.disk_total_bytes > 0 ? aeRow.disk_total_bytes : null;

const { mutateAsync: destroyInstance, isPending: isDestroying } = useMutation(
trpc.admin.kiloclawInstances.destroy.mutationOptions({
onSuccess: () => {
Expand Down Expand Up @@ -1687,10 +1694,7 @@ export function KiloclawInstanceDetail({ instanceId }: { instanceId: string }) {
<div className="flex items-center gap-2">
<HardDrive className="text-muted-foreground h-4 w-4 shrink-0" />
<DetailField label="Volume Usage">
{formatVolumeUsageLine(
data.workerStatus?.diskUsedBytes,
data.workerStatus?.diskTotalBytes
)}
{formatVolumeUsageLine(diskUsed, diskTotal)}
</DetailField>
</div>
</CardContent>
Expand Down
2 changes: 0 additions & 2 deletions apps/web/src/lib/kiloclaw/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ export type PlatformDebugStatusResponse = PlatformStatusResponse & {
restoreStartedAt: string | null;
pendingRestoreVolumeId: string | null;
instanceReadyEmailSent: boolean;
diskUsedBytes: number | null;
diskTotalBytes: number | null;
};

export type CleanupRecoveryPreviousVolumeResponse = {
Expand Down
15 changes: 0 additions & 15 deletions services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1674,8 +1674,6 @@ export class KiloClawInstance extends DurableObject<KiloClawEnv> {
restoreStartedAt: string | null;
pendingRestoreVolumeId: string | null;
instanceReadyEmailSent: boolean;
diskUsedBytes: number | null;
diskTotalBytes: number | null;
}> {
await this.loadState();
const alarmScheduledAt = await this.ctx.storage.getAlarm();
Expand Down Expand Up @@ -1729,8 +1727,6 @@ export class KiloClawInstance extends DurableObject<KiloClawEnv> {
restoreStartedAt: this.s.restoreStartedAt,
pendingRestoreVolumeId: this.s.pendingRestoreVolumeId,
instanceReadyEmailSent: this.s.instanceReadyEmailSent,
diskUsedBytes: this.s.diskUsedBytes,
diskTotalBytes: this.s.diskTotalBytes,
};
}

Expand Down Expand Up @@ -1770,17 +1766,6 @@ export class KiloClawInstance extends DurableObject<KiloClawEnv> {
return { shouldNotify: true, userId: this.s.userId };
}

/** Persist disk usage from controller checkin, or clear when the pair is missing/null. */
async recordDiskStats(usedBytes: number | null, totalBytes: number | null): Promise<void> {
await this.loadState();
this.s.diskUsedBytes = usedBytes;
this.s.diskTotalBytes = totalBytes;
await this.persist({
diskUsedBytes: this.s.diskUsedBytes,
diskTotalBytes: this.s.diskTotalBytes,
});
}

async listVolumeSnapshots(): Promise<FlyVolumeSnapshot[]> {
await this.loadState();
if (!this.s.flyVolumeId) return [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ export async function loadState(ctx: DurableObjectState, s: InstanceMutableState
// Legacy instances pre-dating this field treat absence as already-sent
// to avoid spurious emails after deploy.
s.instanceReadyEmailSent = 'instanceReadyEmailSent' in raw ? d.instanceReadyEmailSent : true;
s.diskUsedBytes = d.diskUsedBytes;
s.diskTotalBytes = d.diskTotalBytes;
s.customSecretMeta = d.customSecretMeta;
s.streamChatApiKey = d.streamChatApiKey;
s.streamChatBotUserId = d.streamChatBotUserId;
Expand Down Expand Up @@ -311,8 +309,6 @@ export function resetMutableState(s: InstanceMutableState): void {
s.preRestoreStatus = null;
s.pendingRestoreVolumeId = null;
s.instanceReadyEmailSent = false;
s.diskUsedBytes = null;
s.diskTotalBytes = null;
s.streamChatApiKey = null;
s.streamChatBotUserId = null;
s.streamChatBotUserToken = null;
Expand Down Expand Up @@ -391,8 +387,6 @@ export function createMutableState(): InstanceMutableState {
preRestoreStatus: null,
pendingRestoreVolumeId: null,
instanceReadyEmailSent: false,
diskUsedBytes: null,
diskTotalBytes: null,
customSecretMeta: null,
streamChatApiKey: null,
streamChatBotUserId: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ export type InstanceMutableState = {
preRestoreStatus: InstanceStatus | null;
pendingRestoreVolumeId: string | null;
instanceReadyEmailSent: boolean;
diskUsedBytes: number | null;
diskTotalBytes: number | null;
customSecretMeta: PersistedState['customSecretMeta'];
// Stream Chat default channel (auto-provisioned)
streamChatApiKey: string | null;
Expand Down
117 changes: 113 additions & 4 deletions services/kiloclaw/src/routes/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ import { describe, expect, it, vi, type Mock } from 'vitest';
import { controller } from './controller';
import { deriveGatewayToken } from '../auth/gateway-token';

type AnalyticsEngineDataPoint = {
blobs: string[];
doubles: number[];
indexes: string[];
};

vi.mock('cloudflare:workers', () => ({
waitUntil: (p: Promise<unknown>) => p,
}));
Expand Down Expand Up @@ -30,7 +36,7 @@ const sandboxId = 'dXNlci0x';
function makeEnv(options?: {
gatewayTokenSecret?: string;
kilocodeApiKey?: string;
writeDataPoint?: (payload: unknown) => void;
writeDataPoint?: (payload: AnalyticsEngineDataPoint) => void;
posthogKey?: string;
hyperdriveConnectionString?: string;
workerEnv?: string;
Expand All @@ -47,7 +53,6 @@ function makeEnv(options?: {
botVibe: 'Dry wit',
botEmoji: '🤖',
});
const recordDiskStats = vi.fn().mockResolvedValue(undefined);
const tryMarkInstanceReady =
options?.tryMarkInstanceReady ??
vi.fn().mockResolvedValue({ shouldNotify: false, userId: null });
Expand All @@ -58,7 +63,7 @@ function makeEnv(options?: {
INTERNAL_API_SECRET: options?.internalApiSecret,
KILOCLAW_INSTANCE: {
idFromName: (userId: string) => userId,
get: () => ({ getConfig, getStatus, recordDiskStats, tryMarkInstanceReady }),
get: () => ({ getConfig, getStatus, tryMarkInstanceReady }),
},
KILOCLAW_CONTROLLER_AE: options?.writeDataPoint
? {
Expand Down Expand Up @@ -114,6 +119,17 @@ async function makeAuthHeaders(targetSandboxId = sandboxId) {
};
}

function analyticsEvents(writeDataPoint: Mock): AnalyticsEngineDataPoint[] {
const calls = writeDataPoint.mock.calls as [AnalyticsEngineDataPoint][];
return calls.map(([call]) => call);
}

function firstAnalyticsEvent(writeDataPoint: Mock): AnalyticsEngineDataPoint {
const [call] = analyticsEvents(writeDataPoint);
expect(call).toBeDefined();
return call;
}

describe('POST /checkin', () => {
it('returns 401 when required auth headers are missing', async () => {
const response = await controller.request(
Expand Down Expand Up @@ -148,7 +164,7 @@ describe('POST /checkin', () => {
});

it('returns 204 and writes AE datapoint when both tokens are valid', async () => {
const writeDataPoint = vi.fn();
const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>();
const env = makeEnv({ writeDataPoint });
const headers = await makeAuthHeaders();

Expand All @@ -160,6 +176,99 @@ describe('POST /checkin', () => {

expect(response.status).toBe(204);
expect(writeDataPoint).toHaveBeenCalledTimes(1);

const call = firstAnalyticsEvent(writeDataPoint);
expect(call.doubles).toHaveLength(8);
expect(call.doubles[6]).toBe(0);
expect(call.doubles[7]).toBe(0);
});

it('writes disk usage doubles when disk stats are present', async () => {
const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>();
const env = makeEnv({ writeDataPoint });
const headers = await makeAuthHeaders();

const response = await controller.request(
'/checkin',
{
method: 'POST',
headers,
body: JSON.stringify(makeBody({ diskUsedBytes: 1024000, diskTotalBytes: 5368709120 })),
},
env
);

expect(response.status).toBe(204);
expect(writeDataPoint).toHaveBeenCalledTimes(1);

const call = firstAnalyticsEvent(writeDataPoint);
expect(call.doubles).toHaveLength(8);
expect(call.doubles[6]).toBe(1024000);
expect(call.doubles[7]).toBe(5368709120);
});

it('normalizes null disk usage doubles to zero', async () => {
const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>();
const env = makeEnv({ writeDataPoint });
const headers = await makeAuthHeaders();

const response = await controller.request(
'/checkin',
{
method: 'POST',
headers,
body: JSON.stringify(makeBody({ diskUsedBytes: null, diskTotalBytes: null })),
},
env
);

expect(response.status).toBe(204);
expect(writeDataPoint).toHaveBeenCalledTimes(1);

const call = firstAnalyticsEvent(writeDataPoint);
expect(call.doubles).toHaveLength(8);
expect(call.doubles[6]).toBe(0);
expect(call.doubles[7]).toBe(0);
});

it('clamps negative disk usage doubles to zero', async () => {
const writeDataPoint = vi.fn<(payload: AnalyticsEngineDataPoint) => void>();
const env = makeEnv({ writeDataPoint });
const headers = await makeAuthHeaders();

const response = await controller.request(
'/checkin',
{
method: 'POST',
headers,
body: JSON.stringify(makeBody({ diskUsedBytes: -1, diskTotalBytes: -1 })),
},
env
);

expect(response.status).toBe(204);
expect(writeDataPoint).toHaveBeenCalledTimes(1);

const call = firstAnalyticsEvent(writeDataPoint);
expect(call.doubles).toHaveLength(8);
expect(call.doubles[6]).toBe(0);
expect(call.doubles[7]).toBe(0);
});

it('still returns 204 when AE write throws', async () => {
const writeDataPoint = vi
.fn<(payload: AnalyticsEngineDataPoint) => Promise<void>>()
.mockRejectedValue(new Error('AE error'));
const env = makeEnv({ writeDataPoint });
const headers = await makeAuthHeaders();

const response = await controller.request(
'/checkin',
{ method: 'POST', headers, body: JSON.stringify(makeBody()) },
env
);

expect(response.status).toBe(204);
});

it('does not call PostHog when productTelemetry is absent', async () => {
Expand Down
Loading
Loading