Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/desktop/electron/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ if (!gotSingleInstanceLock) {

function spawnDetached(command, args) {
return new Promise((resolve, reject) => {
const env = { ...process.env };
delete env.NODE_OPTIONS;
const child = spawn(command, args, {
detached: true,
env,
stdio: "ignore",
});
child.once("error", reject);
Expand Down
83 changes: 53 additions & 30 deletions apps/desktop/electron/workflow-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ function setRunningStep(state, workflow, stepId) {
}
}

function markCompletedBeforeCurrent(state, workflow) {
function syncPhaseStatusesAroundCurrent(state, workflow) {
const order = getWorkflowStepOrder(workflow);
const currentIndex = order.indexOf(state.currentPhase);
for (let i = 0; i < order.length; i += 1) {
Expand All @@ -235,8 +235,15 @@ function markCompletedBeforeCurrent(state, workflow) {
if (!phase) continue;
if (state.overallStatus === "completed" || (currentIndex >= 0 && i < currentIndex)) {
phase.status = "completed";
} else if (currentIndex >= 0 && i > currentIndex) {
phase.status = "pending";
}
}

for (const step of state.steps || []) {
const phase = state.phases.find((item) => item.id === step.id);
if (phase) Object.assign(step, phase);
}
}

async function persistLangGraphState(taskId, runId, langState, workflow) {
Expand All @@ -251,13 +258,14 @@ async function persistLangGraphState(taskId, runId, langState, workflow) {
next.currentStep = stepId;
next.overallStatus = "awaiting_input";
updatePhaseStatus(next, stepId, "awaiting_input");
syncPhaseStatusesAroundCurrent(next, runtimeWorkflow);
} else if (next.overallStatus === "completed") {
next.currentPhase = "completed";
next.currentStep = "completed";
for (const phase of next.phases) updatePhaseStatus(next, phase.id, "completed", phase.sessionId);
} else {
setRunningStep(next, runtimeWorkflow, next.currentPhase);
markCompletedBeforeCurrent(next, runtimeWorkflow);
syncPhaseStatusesAroundCurrent(next, runtimeWorkflow);
}

await writeState(taskId, next);
Expand Down Expand Up @@ -350,7 +358,12 @@ function createGraph(taskId, runId, wf, imagePaths = []) {
const adapter = createAppSdkAgentAdapter({
taskId,
runId,
send: (message) => wf.send?.(message),
send: (message) => {
if (message?.type === "backend_selected" && message.phase) {
wf.currentPhase = message.phase;
}
wf.send?.(message);
},
workFolder: wf.workFolder,
taskRunDir: wf.taskRunDir,
imagePaths,
Expand Down Expand Up @@ -427,13 +440,19 @@ async function ensureCheckpointReady(taskId, runId, state) {

async function markWorkflowFailed(taskId, runId, phase, error, sender) {
const state = await readState(taskId, runId);
const failedPhase = phase || state.currentPhase;
const message = error?.message || "Workflow run failed";
updatePhaseStatus(state, phase || state.currentPhase, "failed");
if (failedPhase) {
state.currentPhase = failedPhase;
state.currentStep = failedPhase;
}
syncPhaseStatusesAroundCurrent(state, getRuntimeWorkflow(activeWorkflows.get(taskId), state));
updatePhaseStatus(state, failedPhase, "failed");
state.overallStatus = "failed";
state.logs = [...(state.logs || []), `failed:${phase || state.currentPhase}:${message}`];
state.logs = [...(state.logs || []), `failed:${failedPhase}:${message}`];
await writeState(taskId, state);
await upsertTask(state.originalWorkFolder || state.workFolder, taskId, "failed", state.runId || runId, { create: false }).catch(() => {});
sender?.({ type: "phase_failed", phase: phase || state.currentPhase, message });
sender?.({ type: "phase_failed", phase: failedPhase, message });
sender?.({ type: "state", state });
return state;
}
Expand All @@ -452,14 +471,20 @@ export async function startWorkflowSession(taskId, workFolder, taskInputs, image
if (existingState && existingState.overallStatus !== "completed") {
const existingRunId = existingState.runId || runId || "";
const send = createEmitter(sender, taskId, existingRunId);
activeWorkflows.set(taskId, {
workFolder: existingState.workFolder,
taskRunDir: await taskDir(taskId, existingRunId),
send,
abortController: null,
runId: existingRunId,
workflow: existingState.workflowDefinition,
});
const activeWorkflow = activeWorkflows.get(taskId);
if (activeWorkflow && (!activeWorkflow.runId || activeWorkflow.runId === existingRunId)) {
activeWorkflow.send = send;
} else {
activeWorkflows.set(taskId, {
workFolder: existingState.workFolder,
taskRunDir: await taskDir(taskId, existingRunId),
send,
abortController: null,
runId: existingRunId,
workflow: existingState.workflowDefinition,
currentPhase: existingState.currentPhase,
});
}
send({ type: "state", state: existingState });
await emitExistingTaskFiles(taskId, existingState, send);
return;
Expand Down Expand Up @@ -527,27 +552,25 @@ export async function startWorkflowSession(taskId, workFolder, taskInputs, image
abortController: null,
runId: finalRunId,
workflow,
currentPhase: stepOrder[0],
});
await upsertTask(workFolder, taskId, "in_progress", finalRunId);
send({ type: "state", state });

const safeImages = await getSafeRunImagePaths(taskId, finalRunId, images);
const graph = createGraph(taskId, finalRunId, activeWorkflows.get(taskId), safeImages);
activeGraphs.set(getThreadId(taskId, finalRunId), graph);
try {
await invokeGraph(taskId, finalRunId, {
void invokeGraph(taskId, finalRunId, {
taskId,
runId: finalRunId,
workFolder: runtimeWorkFolder,
taskDir: taskRunDir,
currentStep: stepOrder[0],
overallStatus: "in_progress",
taskInputs: taskInputs || {},
});
} catch (err) {
await markWorkflowFailed(taskId, finalRunId, stepOrder[0], err, send);
throw err;
}
})
.catch((err) => markWorkflowFailed(taskId, finalRunId, activeWorkflows.get(taskId)?.currentPhase || stepOrder[0], err, send))
.catch(() => {});
}

export async function approveWorkflow(taskId, sender, runId = "") {
Expand Down Expand Up @@ -703,11 +726,15 @@ export async function retryWorkflowPhase(taskId, phase, sender, runId = "") {
wf.send({ type: "phase_retried", phase, requestedBy: "user", trigger: "toolbar" });
wf.send({ type: "state", state });
try {
await invokeGraph(taskId, stateRunId, {
...state,
currentStep: phase,
overallStatus: "in_progress",
});
await invokeGraph(taskId, stateRunId, new Command({
update: {
...state,
currentStep: phase,
currentPhase: phase,
overallStatus: "in_progress",
},
goto: phase,
}));
} catch (err) {
await markWorkflowFailed(taskId, stateRunId, phase, err, wf.send);
throw err;
Expand Down Expand Up @@ -741,10 +768,6 @@ export function detachWorkflowSender(taskId, runId = "") {
if (!wf) return;
if (runId && wf.runId && wf.runId !== runId) return;
wf.send = null;
if (wf.abortController) {
try { wf.abortController.abort(); } catch {}
wf.abortController = null;
}
}

export async function cleanupWorkflowWorktree(taskId, options = {}) {
Expand Down
1 change: 1 addition & 0 deletions apps/desktop/renderer/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export default function App() {

useEffect(() => {
useConfigStore.getState().loadAll();
useWorkflowStore.getState().ensureWorkflowEvents();
}, []);

return (
Expand Down
48 changes: 46 additions & 2 deletions apps/desktop/renderer/src/StepDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ function normalizeConversation(interactions) {
if (!interaction) continue;
if (interaction.type === "prompt") continue;

if (interaction.type === "phase_start") {
activeAssistant = null;
items.push(interaction);
continue;
}

if (interaction.type === "assistant_delta") {
const assistant = ensureAssistant(interaction);
const lastSegment = assistant.segments[assistant.segments.length - 1];
Expand Down Expand Up @@ -80,6 +86,28 @@ function normalizeConversation(interactions) {
return items;
}

function getConversationWithRunMarkers(conversation) {
const startMarkers = conversation.filter((item) => item?.type === "phase_start");
const shouldShowMarkers = startMarkers.length > 1 || startMarkers.some((item) => Number(item.runIndex) > 1);
if (!shouldShowMarkers) {
return conversation.filter((item) => item?.type !== "phase_start");
}

const items = [];
for (const item of conversation) {
if (item?.type === "phase_start" && Number(item.runIndex) > 1) {
const triggerMessages = [];
while (items[items.length - 1]?.role === "user") {
triggerMessages.unshift(items.pop());
}
items.push(item, ...triggerMessages);
} else {
items.push(item);
}
}
return items;
}

function getBackendLabel(backend) {
if (String(backend || "").startsWith("ai-api:")) return `AI API: ${String(backend).slice("ai-api:".length)}`;
if (backend === "ai-api") return "AI API";
Expand Down Expand Up @@ -198,6 +226,18 @@ function ConversationItem({ item, t }) {
);
}

function RunDivider({ item, t }) {
return (
<div className="flex items-center gap-3 py-1 text-[11px] font-semibold uppercase text-muted-foreground">
<div className="h-px flex-1 bg-border" />
<span className="shrink-0 rounded-full border border-border bg-background px-2 py-0.5">
{t("stepDetail.runDivider", { count: item.runIndex || "" })}
</span>
<div className="h-px flex-1 bg-border" />
</div>
);
}

function LoadingItem({ t }) {
return (
<div className="rounded-lg border border-border bg-card/72 px-3 py-2.5">
Expand Down Expand Up @@ -230,7 +270,7 @@ export default function StepDetail({ phase, content, artifact, interactions = []
const activeTask = useWorkflowStore((s) => s.activeTask);
const workflowState = useWorkflowStore((s) => s.workflowState);

const conversation = normalizeConversation(interactions);
const conversation = getConversationWithRunMarkers(normalizeConversation(interactions));
const isCheckpoint = phaseTypes?.[phase] === "checkpoint";
const currentBackend = isCheckpoint ? t("stepDetail.checkpoint") : getBackendLabel(getCurrentBackend(activeBackend, interactions));
const showLoading = !isCheckpoint && isWaitingForAssistant(conversation, isRunning, isStreaming);
Expand Down Expand Up @@ -458,7 +498,11 @@ export default function StepDetail({ phase, content, artifact, interactions = []
<div className="flex-1 space-y-3 overflow-y-auto px-4 py-4" ref={conversationRef}>
{conversation.length > 0 ? (
conversation.map((item, idx) => (
<ConversationItem key={item.id || `${item.type}-${idx}`} item={item} t={t} />
item.type === "phase_start" ? (
<RunDivider key={item.id || `${item.type}-${idx}`} item={item} t={t} />
) : (
<ConversationItem key={item.id || `${item.type}-${idx}`} item={item} t={t} />
)
))
) : !showLoading ? (
<div className="rounded-lg border border-dashed border-border bg-background/45 px-4 py-8 text-center text-sm text-muted-foreground">
Expand Down
2 changes: 2 additions & 0 deletions apps/desktop/renderer/src/lib/i18n.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ export const messages = {
"stepDetail.user": "You",
"stepDetail.assistant": "AI",
"stepDetail.tool": "Tool",
"stepDetail.runDivider": "Run {count}",
"stepDetail.toolsCount": "Tools ({count})",
"stepDetail.attach": "Attach image",
"stepDetail.attachments": "{count} attachment(s)",
Expand Down Expand Up @@ -549,6 +550,7 @@ export const messages = {
"stepDetail.user": "你",
"stepDetail.assistant": "AI",
"stepDetail.tool": "工具",
"stepDetail.runDivider": "第 {count} 轮",
"stepDetail.toolsCount": "工具({count})",
"stepDetail.attach": "添加图片",
"stepDetail.attachments": "{count} 个附件",
Expand Down
68 changes: 66 additions & 2 deletions apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
import { mkdir, readFile, rm, writeFile } from "fs/promises";
import { tmpdir } from "os";
import { join } from "path";
import { EventEmitter } from "events";
import { PassThrough } from "stream";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";

const mocks = vi.hoisted(() => ({
execFile: vi.fn(),
query: vi.fn(),
spawn: vi.fn(),
}));

vi.mock("@anthropic-ai/claude-agent-sdk", () => ({
query: mocks.query,
}));

vi.mock("@openai/codex-sdk", () => ({
Codex: vi.fn(),
vi.mock("child_process", () => ({
execFile: mocks.execFile,
spawn: mocks.spawn,
default: {
execFile: mocks.execFile,
spawn: mocks.spawn,
},
}));

import { createSdkAgentAdapter } from "../../../../../packages/core-lib/langgraph-runtime";
Expand Down Expand Up @@ -56,7 +65,10 @@ function buildRun(overrides = {}) {
describe("createSdkAgentAdapter output artifacts", () => {
beforeEach(async () => {
taskDir = await mkdir(join(tmpdir(), `sdk-agent-adapter-${Date.now()}-`), { recursive: true });
mocks.execFile.mockReset();
mocks.execFile.mockImplementation((_file, _args, _options, callback) => callback(null, "__DEV_WORKFLOW_SHELL_ENV__\0PATH=/usr/bin\0"));
mocks.query.mockReset();
mocks.spawn.mockReset();
});

afterEach(async () => {
Expand Down Expand Up @@ -91,4 +103,56 @@ describe("createSdkAgentAdapter output artifacts", () => {
summary: "Written by the agent tool",
});
});

test("runs read-only Codex steps with a writable task directory", async () => {
const child = new EventEmitter();
child.stdin = new PassThrough();
child.stdout = new PassThrough();
child.stderr = new PassThrough();
mocks.spawn.mockReturnValue(child);
const adapter = createSdkAgentAdapter({ workFolder: "/project", taskDir });

const runPromise = adapter.runAgent(buildRun({
agent: {
backend: "codex",
workspaceAccess: "read",
options: { thread: { modelReasoningEffort: "low" } },
},
state: {
taskId: "task-1",
runId: "run-1",
workFolder: "/project",
taskDir,
taskInputs: {},
stepOutputs: {},
},
}));

await vi.waitFor(() => {
expect(mocks.spawn).toHaveBeenCalled();
});
child.stdout.write(`${JSON.stringify({ type: "thread.started", thread_id: "codex-thread" })}\n`);
child.stdout.write(`${JSON.stringify({ type: "item.completed", item: { id: "msg-1", type: "agent_message", text: "Codex response" } })}\n`);
child.stdout.write(`${JSON.stringify({ type: "turn.completed", usage: null })}\n`);
child.stdout.end();
child.emit("exit", 0, null);

await runPromise;

expect(mocks.spawn).toHaveBeenCalledWith(
expect.stringContaining("@openai/codex"),
expect.arrayContaining([
"exec",
"--json",
"--sandbox",
"workspace-write",
"--cd",
taskDir,
"--skip-git-repo-check",
"--config",
"model_reasoning_effort=\"low\"",
]),
expect.objectContaining({ env: expect.any(Object), signal: expect.any(AbortSignal) }),
);
});
});
Loading
Loading