Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/__tests__/cli-network.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ test('test command prints suite summary and exits non-zero on failures', async (

assert.equal(result.code, 1);
assert.equal(result.calls.length, 1);
assert.equal(result.calls[0]?.meta?.requestProgress, 'replay-test');
assert.match(result.stderr, /Running replay suite\.\.\./);
assert.doesNotMatch(result.stdout, /PASS \/tmp\/01-pass\.ad/);
assert.match(result.stdout, /FAIL \/tmp\/02-fail\.ad after 2 attempts \(5ms\)/);
Expand Down
23 changes: 22 additions & 1 deletion src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,11 @@ export async function runCli(argv: string[], deps: CliDeps = DEFAULT_CLI_DEPS):
? startDaemonLogTail(daemonPaths.logPath)
: null;
const client = createAgentDeviceClient(buildClientConfig(effectiveFlags, resolvedRuntime), {
transport: deps.sendToDaemon as AgentDeviceDaemonTransport,
transport: createCliDaemonTransport({
command,
flags: effectiveFlags,
transport: deps.sendToDaemon as AgentDeviceDaemonTransport,
}),
});
if (command === 'batch') {
if (!parsedBatchSteps) {
Expand Down Expand Up @@ -479,6 +483,23 @@ function hasExplicitMetroRuntimeOverrides(explicitFlagKeys: Set<FlagKey>): boole
return false;
}

function createCliDaemonTransport(options: {
command: string;
flags: CliFlags;
transport: AgentDeviceDaemonTransport;
}): AgentDeviceDaemonTransport {
const { command, flags, transport } = options;
if (command !== 'test' || flags.json) return transport;
return async (req) =>
await transport({
...req,
meta: {
...req.meta,
requestProgress: 'replay-test',
},
});
}

function guessSessionFromArgv(argv: string[]): string | null {
for (let i = 0; i < argv.length; i += 1) {
const token = argv[i]!;
Expand Down
39 changes: 22 additions & 17 deletions src/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export type DaemonRequestMeta = {
materializationId?: string;
lockPolicy?: DaemonLockPolicy;
lockPlatform?: 'ios' | 'macos' | 'android' | 'linux' | 'apple';
requestProgress?: 'replay-test';
};

export type DaemonRequest = {
Expand Down Expand Up @@ -403,28 +404,41 @@ function parseLeaseCommon(
input: unknown,
path: string,
): {
token?: string;
session?: string;
record: Record<string, unknown>;
leaseId?: string;
ttlMs?: number;
} {
const record = expectObject(input, path);
return {
token: optionalString(record, 'token', path),
session: optionalString(record, 'session', path),
record,
leaseId: optionalString(record, 'leaseId', path),
ttlMs: optionalInteger(record, 'ttlMs', path),
};
}

export const leaseAllocateSchema = schema<LeaseAllocatePayload>((input, path) => {
const record = expectObject(input, path);
function parseLeaseScope(
record: Record<string, unknown>,
path: string,
): {
token?: string;
session?: string;
tenantId?: string;
tenant?: string;
runId?: string;
} {
return {
token: optionalString(record, 'token', path),
session: optionalString(record, 'session', path),
tenantId: optionalString(record, 'tenantId', path),
tenant: optionalString(record, 'tenant', path),
runId: optionalString(record, 'runId', path),
};
}

export const leaseAllocateSchema = schema<LeaseAllocatePayload>((input, path) => {
const record = expectObject(input, path);
return {
...parseLeaseScope(record, path),
ttlMs: optionalInteger(record, 'ttlMs', path),
backend: optionalEnum(
record,
Expand All @@ -437,13 +451,8 @@ export const leaseAllocateSchema = schema<LeaseAllocatePayload>((input, path) =>

export const leaseHeartbeatSchema = schema<LeaseHeartbeatPayload>((input, path) => {
const parsed = parseLeaseCommon(input, path);
const record = expectObject(input, path);
return {
token: parsed.token,
session: parsed.session,
tenantId: optionalString(record, 'tenantId', path),
tenant: optionalString(record, 'tenant', path),
runId: optionalString(record, 'runId', path),
...parseLeaseScope(parsed.record, path),
leaseId: parsed.leaseId,
ttlMs: parsed.ttlMs,
};
Expand All @@ -455,11 +464,7 @@ export const leaseReleaseSchema = schema<LeaseReleasePayload>((input, path) => {
fail(`${path}.ttlMs`, 'Unexpected field');
}
return {
token: optionalString(record, 'token', path),
session: optionalString(record, 'session', path),
tenantId: optionalString(record, 'tenantId', path),
tenant: optionalString(record, 'tenant', path),
runId: optionalString(record, 'runId', path),
...parseLeaseScope(record, path),
leaseId: optionalString(record, 'leaseId', path),
};
});
Expand Down
113 changes: 113 additions & 0 deletions src/daemon-client-progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import http from 'node:http';
import { AppError } from './utils/errors.ts';
import type { DaemonRequest } from './daemon/types.ts';
import type { RequestProgressEvent } from './daemon/request-progress.ts';
import { consumeTextLines } from './utils/line-stream.ts';
import {
formatRequestProgressEvent,
isDaemonProgressEnvelope,
isDaemonResponseEnvelope,
shouldStreamRequestProgress,
} from './daemon/request-progress-protocol.ts';

export function writeRequestProgressEvent(event: RequestProgressEvent): void {
const line = formatRequestProgressEvent(event);
if (line) process.stderr.write(`${line}\n`);
}

export function shouldReadDaemonProgressStream(
req: DaemonRequest,
contentType: string | string[] | undefined,
): boolean {
return (
shouldStreamRequestProgress(req) &&
String(Array.isArray(contentType) ? contentType.join(',') : (contentType ?? '')).includes(
'application/x-ndjson',
)
);
}

export function readDaemonHttpProgressResponse(
res: http.IncomingMessage,
options: {
req: DaemonRequest;
handleResponseBody: (body: string) => void;
reject: (error: unknown) => void;
clearTimeout: () => void;
},
): void {
const { req, handleResponseBody, reject, clearTimeout } = options;
let buffer = '';
let settled = false;
const rejectInvalidLine = (line: string, error: unknown) => {
settled = true;
clearTimeout();
reject(
new AppError(
'COMMAND_FAILED',
'Invalid daemon response',
{
requestId: req.meta?.requestId,
line,
},
error instanceof Error ? error : undefined,
),
);
};

const handleLine = (line: string): boolean => {
try {
const message = JSON.parse(line) as unknown;
if (isDaemonProgressEnvelope(message)) {
writeRequestProgressEvent(message.event);
return false;
}
if (isDaemonResponseEnvelope<unknown>(message)) {
settled = true;
clearTimeout();
handleResponseBody(JSON.stringify(message.response));
return true;
}
throw new Error('Missing daemon progress response envelope');
} catch (error) {
rejectInvalidLine(line, error);
return true;
}
};

res.setEncoding('utf8');
res.on('data', (chunk) => {
if (settled) return;
const parsed = consumeTextLines(buffer, chunk);
buffer = parsed.buffer;
for (const line of parsed.lines) {
if (line && handleLine(line)) return;
}
});
res.on('end', () => {
if (settled) return;
const line = buffer.trim();
if (line && handleLine(line)) return;
settled = true;
clearTimeout();
reject(
new AppError('COMMAND_FAILED', 'Invalid daemon response', {
requestId: req.meta?.requestId,
line,
}),
);
});
res.on('error', (error) => {
if (settled) return;
settled = true;
clearTimeout();
reject(
new AppError(
'COMMAND_FAILED',
'Failed to read daemon response',
{ requestId: req.meta?.requestId },
error instanceof Error ? error : undefined,
),
);
});
}
Loading
Loading