Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions src/local/entrypoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { intake } from '../product/spec-intake/index.js';
import type { ClarificationQuestion, ExecutionPreference, InputSurface, RawSpecPayload, RouteTarget } from '../product/spec-intake/index.js';
import { defaultRepoDetector, type RepoDetector } from '../product/spec-intake/detect-current-repo.js';
import { LocalCoordinator } from '../runtime/local-coordinator.js';
import { DEFAULT_RUN_TIMEOUT_MS } from '../shared/constants.js';
import { DEFAULT_RUN_TIMEOUT_MS, DEFAULT_RUN_IDLE_TIMEOUT_MS } from '../shared/constants.js';
import { localRunArtifactDir, localRunStateRoot } from '../shared/state-paths.js';
import type {
CommandInvocation,
Expand Down Expand Up @@ -401,6 +401,32 @@ class SdkScriptWorkflowCoordinator implements CoordinatorLauncher {

const abortController = new AbortController();

// Inactivity watchdog: a healthy run constantly emits broker/agent output.
// Total silence for the idle window means the runner is hung (dead broker,
// half-open stdio pipe, a subprocess parked at 0% CPU). Aborting on idle
// makes the run fail fast instead of stalling until DEFAULT_RUN_TIMEOUT_MS.
const idleTimeoutMs = resolveIdleTimeoutMs();
const idleAbortMessage = `Workflow runner aborted after ${Math.round(idleTimeoutMs / 1000)}s of inactivity (suspected hang).`;
let lastOutputMs = Date.now();
let idleAborted = false;
const idleInterval = idleTimeoutMs > 0
? setInterval(() => {
if (Date.now() - lastOutputMs >= idleTimeoutMs) {
idleAborted = true;
// Record the abort reason on stderr + events *before* aborting, so
// it survives the runner promise rejecting and surfaces as the real
// cause in the coordinator result (the post-await path below is
// skipped once abort() makes the awaited promise reject).
stderr.push(idleAbortMessage);
this.onRuntimeOutput?.('stderr', idleAbortMessage);
emit('stderr', idleAbortMessage, { stream: 'stderr', reason: 'idle-timeout' });
abortController.abort();
}
}, Math.min(idleTimeoutMs, 60_000))
: undefined;
idleInterval?.unref?.();
const markActivity = (): void => { lastOutputMs = Date.now(); };

try {
const runnerResult = await withTimeout(
this.runner(request.workflowFile, {
Expand All @@ -411,11 +437,13 @@ class SdkScriptWorkflowCoordinator implements CoordinatorLauncher {
startFrom: retry.startFromStep,
previousRunId: retry.previousRunId,
onStdout: (line) => {
markActivity();
stdout.push(line);
this.onRuntimeOutput?.('stdout', line);
emit('stdout', line, { stream: 'stdout' });
},
onStderr: (line) => {
markActivity();
stderr.push(line);
this.onRuntimeOutput?.('stderr', line);
emit('stderr', line, { stream: 'stderr' });
Expand Down Expand Up @@ -447,9 +475,16 @@ class SdkScriptWorkflowCoordinator implements CoordinatorLauncher {
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
status = message.startsWith('timed out after ') ? 'timed_out' : 'failed';
stderr.push(message);
emit(status === 'timed_out' ? 'timeout' : 'error', message, { error: message });
// An idle-watchdog abort is a timeout, not a generic failure. Its marker
// is already on stderr/events from the watchdog callback, so don't push
// the raw abort error on top of it.
status = idleAborted || message.startsWith('timed out after ') ? 'timed_out' : 'failed';
if (!idleAborted) stderr.push(message);
emit(
status === 'timed_out' ? 'timeout' : 'error',
idleAborted ? idleAbortMessage : message,
idleAborted ? { error: message, reason: 'idle-timeout' } : { error: message },
);
return coordinatorResultFromSdkRun({
request,
runId,
Expand All @@ -465,6 +500,11 @@ class SdkScriptWorkflowCoordinator implements CoordinatorLauncher {
snippetLimit,
error: message,
});
} finally {
// Always clear the watchdog — covers a synchronous throw from
// this.runner() (before withTimeout is even reached) and every other
// exit path, so the interval can never leak.
if (idleInterval) clearInterval(idleInterval);
}
}
}
Expand Down Expand Up @@ -619,6 +659,23 @@ function withTimeout<T>(promise: Promise<T>, timeoutMs: number, onTimeout?: () =
});
}

/**
* Resolve the inactivity-watchdog window. `RICKY_RUN_IDLE_TIMEOUT_MS=0`
* disables it; any positive integer overrides the default. A non-numeric,
* negative, or fractional value (e.g. `0.5`, which would floor to 0 and
* silently disable the watchdog) falls back to {@link DEFAULT_RUN_IDLE_TIMEOUT_MS}.
*/
function resolveIdleTimeoutMs(): number {
const raw = process.env.RICKY_RUN_IDLE_TIMEOUT_MS;
if (raw === undefined || raw.trim() === '') return DEFAULT_RUN_IDLE_TIMEOUT_MS;
const parsed = Number(raw);
// Require a non-negative integer. 0 explicitly disables the watchdog; any
// other value must be a whole number of ms — reject fractions so a typo
// like `0.5` does not floor to 0 and quietly turn the watchdog off.
if (!Number.isInteger(parsed) || parsed < 0) return DEFAULT_RUN_IDLE_TIMEOUT_MS;
return parsed;
}

export function createSdkScriptWorkflowRunner(): ScriptWorkflowRunner {
return async (workflowFile, options) => {
const absoluteWorkflowFile = isAbsolute(workflowFile) ? workflowFile : resolve(options.cwd, workflowFile);
Expand Down Expand Up @@ -1323,7 +1380,11 @@ export function createLocalExecutor(options: LocalExecutorOptions = {}): LocalEx
cwd,
timeoutMs: options.timeoutMs,
route,
env: { AGENT_RELAY_RUN_ID_FILE: runtimeRunIdFile },
// `--input KEY=VALUE` pairs are injected into the workflow runner env so
// workflow scripts can read them via process.env.KEY (e.g. TARGET_SPEC
// for the reusable review/fix workflows). AGENT_RELAY_RUN_ID_FILE wins
// on conflict since it is Ricky-owned runtime state.
env: { ...(activeRequest.inputs ?? {}), AGENT_RELAY_RUN_ID_FILE: runtimeRunIdFile },
...stableRunIdFor(activeRequest),
retry: activeRequest.retry,
metadata: {
Expand Down
11 changes: 10 additions & 1 deletion src/local/request-normalizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export interface BaseHandoff {
* assumptions instead of asking the user before generation.
*/
bestJudgement?: boolean;
/** KEY=VALUE pairs from `--input KEY=VALUE` flags, injected into the workflow runner env. */
inputs?: Record<string, string>;
}

/** Free-form spec string from a direct local caller. */
Expand Down Expand Up @@ -158,6 +160,12 @@ export interface LocalInvocationRequest {
refine?: false | { model?: string };
/** Resolve blocking clarification questions using implementer best judgement. */
bestJudgement?: boolean;
/**
* KEY=VALUE pairs from `--input KEY=VALUE` CLI flags. Injected into the
* workflow runner subprocess env so workflow scripts can read them via
* `process.env.KEY` (e.g. `TARGET_SPEC` for reusable review/fix workflows).
*/
inputs?: Record<string, string>;
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -311,12 +319,13 @@ export async function normalizeRequest(
}
}

function runtimeOptionsFor(raw: BaseHandoff): Pick<LocalInvocationRequest, 'autoFix' | 'retry' | 'refine' | 'bestJudgement'> {
function runtimeOptionsFor(raw: BaseHandoff): Pick<LocalInvocationRequest, 'autoFix' | 'retry' | 'refine' | 'bestJudgement' | 'inputs'> {
return {
...(raw.autoFix ? { autoFix: raw.autoFix } : {}),
...(raw.retry ? { retry: raw.retry } : {}),
...(raw.refine ? { refine: raw.refine } : {}),
...(raw.bestJudgement ? { bestJudgement: true } : {}),
...(raw.inputs && Object.keys(raw.inputs).length > 0 ? { inputs: raw.inputs } : {}),
};
}

Expand Down
9 changes: 9 additions & 0 deletions src/shared/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ export const DEFAULT_RUN_TIMEOUT_MS = 43_200_000; // 12 h

export const DEFAULT_TIMEOUT_MS = DEFAULT_RUN_TIMEOUT_MS;

// Inactivity (idle-output) watchdog for the workflow runner subprocess. A
// healthy run constantly emits broker logs and agent output; total silence for
// this long means the runner is hung (dead broker, half-open stdio pipe, a
// subprocess parked at 0% CPU). When this fires the runner is aborted so the
// run fails fast and the orchestrator can move on instead of stalling for the
// full 12 h DEFAULT_RUN_TIMEOUT_MS. Override with RICKY_RUN_IDLE_TIMEOUT_MS=0
// to disable, or any positive ms value to retune.
export const DEFAULT_RUN_IDLE_TIMEOUT_MS = 1_800_000; // 30 min of zero output

// Per-step budgets for generated workflows. Tuned so that:
// - implement/fix-loop steps (real codex code-writing) get ~20 min each
// - lead-plan and review steps get ~10 min each
Expand Down
8 changes: 8 additions & 0 deletions src/surfaces/cli/commands/cli-main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ export interface ParsedArgs {
bestJudgement?: boolean;
login?: boolean;
connectMissing?: boolean;
/** KEY=VALUE pairs from `--input KEY=VALUE` flags, injected into the workflow runner env. */
inputs?: Record<string, string>;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
workforcePersonaWriterCli?: boolean;
errors?: string[];
}
Expand Down Expand Up @@ -254,6 +256,7 @@ export function parseArgs(argv: string[]): ParsedArgs {
if (parsed.bestJudgement) result.bestJudgement = true;
if (parsed.login) result.login = true;
if (parsed.connectMissing) result.connectMissing = true;
if (parsed.inputs && Object.keys(parsed.inputs).length > 0) result.inputs = parsed.inputs;
if (parsed.workforcePersonaWriterCli !== undefined) result.workforcePersonaWriterCli = parsed.workforcePersonaWriterCli;
if (parsed.errors && parsed.errors.length > 0) result.errors = parsed.errors;
return result;
Expand Down Expand Up @@ -440,6 +443,7 @@ export function renderHelp(): string[] {
' --no-refine Disable refinement; emit only the deterministic artifact',
' --with-llm[=model] Alias for --refine',
' --best-judgement Answer unresolved spec questions with implementer assumptions',
' --input KEY=VALUE Set an env var for the workflow run (repeatable); read via process.env.KEY',
' --workforce-persona Use Workforce personas to author the workflow',
' --no-workforce-persona Disable Workforce persona authoring',
` --auto-fix[=N] Local diagnose/repair/resume loop (default ${DEFAULT_AUTO_FIX_ATTEMPTS} attempts, max 10)`,
Expand Down Expand Up @@ -616,6 +620,7 @@ async function buildCliHandoff(parsed: ParsedArgs, deps: CliMainDeps): Promise<R
...runAutoFix,
...(parsed.refine ? { refine: parsed.refine } : {}),
...(parsed.bestJudgement ? { bestJudgement: true } : {}),
...(parsed.inputs ? { inputs: parsed.inputs } : {}),
...(retry ? { retry } : {}),
metadata: cliMetadataFor(parsed, 'artifact'),
};
Expand All @@ -636,6 +641,7 @@ async function buildCliHandoff(parsed: ParsedArgs, deps: CliMainDeps): Promise<R
...runAutoFix,
...(parsed.refine ? { refine: parsed.refine } : {}),
...(parsed.bestJudgement ? { bestJudgement: true } : {}),
...(parsed.inputs ? { inputs: parsed.inputs } : {}),
...(retry ? { retry } : {}),
cliMetadata: cliMetadataFor(parsed, 'inline-spec'),
};
Expand All @@ -655,6 +661,7 @@ async function buildCliHandoff(parsed: ParsedArgs, deps: CliMainDeps): Promise<R
...runAutoFix,
...(parsed.refine ? { refine: parsed.refine } : {}),
...(parsed.bestJudgement ? { bestJudgement: true } : {}),
...(parsed.inputs ? { inputs: parsed.inputs } : {}),
...(retry ? { retry } : {}),
cliMetadata: cliMetadataFor(parsed, 'spec-file'),
};
Expand All @@ -675,6 +682,7 @@ async function buildCliHandoff(parsed: ParsedArgs, deps: CliMainDeps): Promise<R
...runAutoFix,
...(parsed.refine ? { refine: parsed.refine } : {}),
...(parsed.bestJudgement ? { bestJudgement: true } : {}),
...(parsed.inputs ? { inputs: parsed.inputs } : {}),
...(retry ? { retry } : {}),
cliMetadata: cliMetadataFor(parsed, 'stdin'),
};
Expand Down
48 changes: 48 additions & 0 deletions src/surfaces/cli/flows/power-user-parser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,54 @@ describe('power user parser defaults', () => {
});
});

it('parses --input KEY=VALUE flags into an inputs record', () => {
const parsed = parsePowerUserArgs([
'local', '--spec-file', './_review.md', '--run',
'--input', 'TARGET_SPEC=specs/021-sentry.md',
'--input', 'DIFF_RANGE=abc..HEAD',
]);
expect(parsed.inputs).toEqual({
TARGET_SPEC: 'specs/021-sentry.md',
DIFF_RANGE: 'abc..HEAD',
});
expect(parsed).not.toHaveProperty('errors');
});

it('accepts the --input=KEY=VALUE inline form and an empty value', () => {
const parsed = parsePowerUserArgs([
'local', '--spec-file', './_review.md', '--run',
'--input=TARGET_SPEC=',
]);
expect(parsed.inputs).toEqual({ TARGET_SPEC: '' });
});

it('reports an error for a malformed --input without KEY=VALUE', () => {
const parsed = parsePowerUserArgs([
'local', '--spec-file', './_review.md', '--run',
'--input', 'NOTAPAIR',
]);
expect(parsed.errors).toBeDefined();
expect(parsed.errors?.some((e) => e.includes('KEY=VALUE'))).toBe(true);
});

it('reports an error for an invalid --input env var name', () => {
const parsed = parsePowerUserArgs([
'local', '--spec-file', './_review.md', '--run',
'--input', '1BAD=value',
]);
expect(parsed.errors?.some((e) => e.includes('not a valid environment variable name'))).toBe(true);
});

it('does not consume a following flag when --input has no value (keeps --run intact)', () => {
const parsed = parsePowerUserArgs([
'local', '--spec-file', './_review.md', '--input', '--run',
]);
// --run must still be recognized, not swallowed as the --input value.
expect(parsed.runRequested).toBe(true);
expect(parsed.errors?.some((e) => e.includes('--input requires a KEY=VALUE'))).toBe(true);
expect(parsed.inputs).toBeUndefined();
});

it('parses the workflow one-shot command for local execution and Cloud generation', () => {
expect(parsePowerUserArgs(['workflow', '--spec-file', './SPEC.md', '--run'])).toMatchObject({
command: 'run',
Expand Down
61 changes: 61 additions & 0 deletions src/surfaces/cli/flows/power-user-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ export interface PowerUserParsedArgs {
bestJudgement?: boolean;
login?: boolean;
connectMissing?: boolean;
/**
* KEY=VALUE pairs from repeated `--input KEY=VALUE` flags. Injected into the
* workflow runner subprocess env so workflow scripts can read them via
* `process.env.KEY` (e.g. `TARGET_SPEC` for reusable review/fix workflows).
*/
inputs?: Record<string, string>;
/** Set only when the CLI passes --workforce-persona/--no-workforce-persona; omitted otherwise. */
workforcePersonaWriterCli?: boolean;
errors?: string[];
Expand Down Expand Up @@ -141,6 +147,7 @@ export function parsePowerUserArgs(argv: string[]): PowerUserParsedArgs {
const login = effectiveArgv.includes('--login');
const connectMissing = effectiveArgv.includes('--connect-missing');
const workforcePersonaWriterCli = parseWorkforcePersonaWriterCliFlag(effectiveArgv);
const inputs = parseInputFlags(effectiveArgv);

const errors: string[] = [...(parsed.errors ?? [])];
if (surface === 'workflow' && modeFlagPresent && !explicitMode) {
Expand All @@ -154,6 +161,9 @@ export function parsePowerUserArgs(argv: string[]): PowerUserParsedArgs {
errors.push(`${flag} requires a value.`);
}
}
if (inputs.errors.length > 0) {
errors.push(...inputs.errors);
}
if (artifact && (spec !== undefined || specFile !== undefined || stdin)) {
errors.push('Artifact execution cannot be combined with --spec, --spec-file, --file, or --stdin.');
}
Expand Down Expand Up @@ -181,11 +191,62 @@ export function parsePowerUserArgs(argv: string[]): PowerUserParsedArgs {
...(bestJudgement ? { bestJudgement: true } : {}),
...(login ? { login: true } : {}),
...(connectMissing ? { connectMissing: true } : {}),
...(Object.keys(inputs.values).length > 0 ? { inputs: inputs.values } : {}),
...(workforcePersonaWriterCli !== undefined ? { workforcePersonaWriterCli } : {}),
...(errors.length > 0 ? { errors } : {}),
};
}

/**
* Parse repeated `--input KEY=VALUE` flags into a record. Both `--input K=V`
* and `--input=K=V` forms are accepted. The KEY must be a valid env-var name
* ([A-Za-z_][A-Za-z0-9_]*); anything else is reported as an error. The VALUE
* may be empty (`--input TARGET_SPEC=`) — callers fill it in via a trailing
* positional in some scripts, so an empty value is not an error here.
*/
function parseInputFlags(argv: string[]): { values: Record<string, string>; errors: string[] } {
const values: Record<string, string> = {};
const errors: string[] = [];
const KEY_RE = /^[A-Za-z_][A-Za-z0-9_]*$/;
for (let index = 0; index < argv.length; index += 1) {
const arg = argv[index];
let pair: string | undefined;
if (arg === '--input') {
const next = argv[index + 1];
// Only consume the next token as the value when it is a real argument,
// not another flag. Advancing the index past a following flag (e.g.
// `--input --run`) would silently drop that flag from parsing.
if (next === undefined || next.startsWith('--')) {
errors.push('--input requires a KEY=VALUE argument.');
continue;
}
pair = next;
index += 1;
} else if (arg.startsWith('--input=')) {
pair = arg.slice('--input='.length);
} else {
continue;
}
if (pair === '') {
errors.push('--input requires a KEY=VALUE argument.');
continue;
}
Comment on lines +213 to +233
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If the --input flag is followed by another flag starting with -- (for example, if the user forgets the argument or makes a typo like ricky local --input --run), the parser will consume --run as the pair value and increment index. This causes the next flag (--run) to be completely skipped in the next iteration of the loop, silently ignoring it.

We should validate that the next argument exists and does not start with -- before consuming it and incrementing the index.

Suggested change
let pair: string | undefined;
if (arg === '--input') {
pair = argv[index + 1];
index += 1;
} else if (arg.startsWith('--input=')) {
pair = arg.slice('--input='.length);
} else {
continue;
}
if (pair === undefined || pair.startsWith('--')) {
errors.push('--input requires a KEY=VALUE argument.');
continue;
}
let pair: string | undefined;
if (arg === '--input') {
const next = argv[index + 1];
if (next === undefined || next.startsWith('--')) {
errors.push('--input requires a KEY=VALUE argument.');
continue;
}
pair = next;
index += 1;
} else if (arg.startsWith('--input=')) {
pair = arg.slice('--input='.length);
} else {
continue;
}
if (pair === undefined || pair.startsWith('--')) {
errors.push('--input requires a KEY=VALUE argument.');
continue;
}

const eq = pair.indexOf('=');
if (eq <= 0) {
errors.push(`--input "${pair}" must be in KEY=VALUE form.`);
continue;
}
const key = pair.slice(0, eq);
const value = pair.slice(eq + 1);
if (!KEY_RE.test(key)) {
errors.push(`--input key "${key}" is not a valid environment variable name.`);
continue;
}
values[key] = value;
}
return { values, errors };
}

function parseConnect(argv: string[]): PowerUserParsedArgs {
const target = argv[0]?.trim().toLowerCase();
const base = withCommonFlags({ command: 'connect', surface: 'connect' }, argv.slice(target ? 1 : 0));
Expand Down
Loading