From 2c6204bd7dbf4ffbb2f2165406854b620d1d4b14 Mon Sep 17 00:00:00 2001 From: lei Date: Mon, 18 May 2026 23:26:05 +0800 Subject: [PATCH 1/2] refactor: task --- apps/desktop/electron/main.ts | 3 + apps/desktop/electron/workflow-runtime.ts | 83 +- apps/desktop/renderer/src/App.tsx | 1 + apps/desktop/renderer/src/StepDetail.tsx | 48 +- apps/desktop/renderer/src/lib/i18n.ts | 2 + .../src/lib/sdk-agent-adapter.test.ts | 60 +- apps/desktop/renderer/src/pages/HomePage.tsx | 6 +- apps/desktop/renderer/src/pages/TaskPage.tsx | 43 +- .../renderer/src/stores/workflowStore.test.ts | 34 + .../renderer/src/stores/workflowStore.ts | 839 +++++++++++++----- .../todo-agent-activity-terminal-mode.zh.md | 2 + package.json | 7 +- .../core-lib/langgraph-runtime/app-adapter.ts | 44 +- .../langgraph-runtime/file-checkpointer.ts | 45 +- .../langgraph-runtime/sdk-agent-adapter.ts | 224 +++-- pnpm-lock.yaml | 11 - scripts/dev-desktop-instance.ts | 86 +- 17 files changed, 1175 insertions(+), 363 deletions(-) create mode 100644 apps/desktop/renderer/src/stores/workflowStore.test.ts diff --git a/apps/desktop/electron/main.ts b/apps/desktop/electron/main.ts index f92142e..704c3a6 100644 --- a/apps/desktop/electron/main.ts +++ b/apps/desktop/electron/main.ts @@ -64,8 +64,11 @@ if (!gotSingleInstanceLock) { function spawnDetached(command, args) { return new Promise((resolve, reject) => { + const env = { ...process.env }; + delete env.NODE_OPTIONS; const child = spawn(command, args, { detached: true, + env, stdio: "ignore", }); child.once("error", reject); diff --git a/apps/desktop/electron/workflow-runtime.ts b/apps/desktop/electron/workflow-runtime.ts index 2852cc1..d83f303 100644 --- a/apps/desktop/electron/workflow-runtime.ts +++ b/apps/desktop/electron/workflow-runtime.ts @@ -226,7 +226,7 @@ function setRunningStep(state, workflow, stepId) { } } -function markCompletedBeforeCurrent(state, workflow) { +function syncPhaseStatusesAroundCurrent(state, workflow) { const order = getWorkflowStepOrder(workflow); const currentIndex = order.indexOf(state.currentPhase); for (let i = 0; i < order.length; i += 1) { @@ -235,8 +235,15 @@ function markCompletedBeforeCurrent(state, workflow) { if (!phase) continue; if (state.overallStatus === "completed" || (currentIndex >= 0 && i < currentIndex)) { phase.status = "completed"; + } else if (currentIndex >= 0 && i > currentIndex) { + phase.status = "pending"; } } + + for (const step of state.steps || []) { + const phase = state.phases.find((item) => item.id === step.id); + if (phase) Object.assign(step, phase); + } } async function persistLangGraphState(taskId, runId, langState, workflow) { @@ -251,13 +258,14 @@ async function persistLangGraphState(taskId, runId, langState, workflow) { next.currentStep = stepId; next.overallStatus = "awaiting_input"; updatePhaseStatus(next, stepId, "awaiting_input"); + syncPhaseStatusesAroundCurrent(next, runtimeWorkflow); } else if (next.overallStatus === "completed") { next.currentPhase = "completed"; next.currentStep = "completed"; for (const phase of next.phases) updatePhaseStatus(next, phase.id, "completed", phase.sessionId); } else { setRunningStep(next, runtimeWorkflow, next.currentPhase); - markCompletedBeforeCurrent(next, runtimeWorkflow); + syncPhaseStatusesAroundCurrent(next, runtimeWorkflow); } await writeState(taskId, next); @@ -350,7 +358,12 @@ function createGraph(taskId, runId, wf, imagePaths = []) { const adapter = createAppSdkAgentAdapter({ taskId, runId, - send: (message) => wf.send?.(message), + send: (message) => { + if (message?.type === "backend_selected" && message.phase) { + wf.currentPhase = message.phase; + } + wf.send?.(message); + }, workFolder: wf.workFolder, taskRunDir: wf.taskRunDir, imagePaths, @@ -427,13 +440,19 @@ async function ensureCheckpointReady(taskId, runId, state) { async function markWorkflowFailed(taskId, runId, phase, error, sender) { const state = await readState(taskId, runId); + const failedPhase = phase || state.currentPhase; const message = error?.message || "Workflow run failed"; - updatePhaseStatus(state, phase || state.currentPhase, "failed"); + if (failedPhase) { + state.currentPhase = failedPhase; + state.currentStep = failedPhase; + } + syncPhaseStatusesAroundCurrent(state, getRuntimeWorkflow(activeWorkflows.get(taskId), state)); + updatePhaseStatus(state, failedPhase, "failed"); state.overallStatus = "failed"; - state.logs = [...(state.logs || []), `failed:${phase || state.currentPhase}:${message}`]; + state.logs = [...(state.logs || []), `failed:${failedPhase}:${message}`]; await writeState(taskId, state); await upsertTask(state.originalWorkFolder || state.workFolder, taskId, "failed", state.runId || runId, { create: false }).catch(() => {}); - sender?.({ type: "phase_failed", phase: phase || state.currentPhase, message }); + sender?.({ type: "phase_failed", phase: failedPhase, message }); sender?.({ type: "state", state }); return state; } @@ -452,14 +471,20 @@ export async function startWorkflowSession(taskId, workFolder, taskInputs, image if (existingState && existingState.overallStatus !== "completed") { const existingRunId = existingState.runId || runId || ""; const send = createEmitter(sender, taskId, existingRunId); - activeWorkflows.set(taskId, { - workFolder: existingState.workFolder, - taskRunDir: await taskDir(taskId, existingRunId), - send, - abortController: null, - runId: existingRunId, - workflow: existingState.workflowDefinition, - }); + const activeWorkflow = activeWorkflows.get(taskId); + if (activeWorkflow && (!activeWorkflow.runId || activeWorkflow.runId === existingRunId)) { + activeWorkflow.send = send; + } else { + activeWorkflows.set(taskId, { + workFolder: existingState.workFolder, + taskRunDir: await taskDir(taskId, existingRunId), + send, + abortController: null, + runId: existingRunId, + workflow: existingState.workflowDefinition, + currentPhase: existingState.currentPhase, + }); + } send({ type: "state", state: existingState }); await emitExistingTaskFiles(taskId, existingState, send); return; @@ -527,6 +552,7 @@ export async function startWorkflowSession(taskId, workFolder, taskInputs, image abortController: null, runId: finalRunId, workflow, + currentPhase: stepOrder[0], }); await upsertTask(workFolder, taskId, "in_progress", finalRunId); send({ type: "state", state }); @@ -534,8 +560,7 @@ export async function startWorkflowSession(taskId, workFolder, taskInputs, image const safeImages = await getSafeRunImagePaths(taskId, finalRunId, images); const graph = createGraph(taskId, finalRunId, activeWorkflows.get(taskId), safeImages); activeGraphs.set(getThreadId(taskId, finalRunId), graph); - try { - await invokeGraph(taskId, finalRunId, { + void invokeGraph(taskId, finalRunId, { taskId, runId: finalRunId, workFolder: runtimeWorkFolder, @@ -543,11 +568,9 @@ export async function startWorkflowSession(taskId, workFolder, taskInputs, image currentStep: stepOrder[0], overallStatus: "in_progress", taskInputs: taskInputs || {}, - }); - } catch (err) { - await markWorkflowFailed(taskId, finalRunId, stepOrder[0], err, send); - throw err; - } + }) + .catch((err) => markWorkflowFailed(taskId, finalRunId, activeWorkflows.get(taskId)?.currentPhase || stepOrder[0], err, send)) + .catch(() => {}); } export async function approveWorkflow(taskId, sender, runId = "") { @@ -703,11 +726,15 @@ export async function retryWorkflowPhase(taskId, phase, sender, runId = "") { wf.send({ type: "phase_retried", phase, requestedBy: "user", trigger: "toolbar" }); wf.send({ type: "state", state }); try { - await invokeGraph(taskId, stateRunId, { - ...state, - currentStep: phase, - overallStatus: "in_progress", - }); + await invokeGraph(taskId, stateRunId, new Command({ + update: { + ...state, + currentStep: phase, + currentPhase: phase, + overallStatus: "in_progress", + }, + goto: phase, + })); } catch (err) { await markWorkflowFailed(taskId, stateRunId, phase, err, wf.send); throw err; @@ -741,10 +768,6 @@ export function detachWorkflowSender(taskId, runId = "") { if (!wf) return; if (runId && wf.runId && wf.runId !== runId) return; wf.send = null; - if (wf.abortController) { - try { wf.abortController.abort(); } catch {} - wf.abortController = null; - } } export async function cleanupWorkflowWorktree(taskId, options = {}) { diff --git a/apps/desktop/renderer/src/App.tsx b/apps/desktop/renderer/src/App.tsx index e8c4687..18714a2 100644 --- a/apps/desktop/renderer/src/App.tsx +++ b/apps/desktop/renderer/src/App.tsx @@ -11,6 +11,7 @@ export default function App() { useEffect(() => { useConfigStore.getState().loadAll(); + useWorkflowStore.getState().ensureWorkflowEvents(); }, []); return ( diff --git a/apps/desktop/renderer/src/StepDetail.tsx b/apps/desktop/renderer/src/StepDetail.tsx index 1cde56d..216f711 100644 --- a/apps/desktop/renderer/src/StepDetail.tsx +++ b/apps/desktop/renderer/src/StepDetail.tsx @@ -46,6 +46,12 @@ function normalizeConversation(interactions) { if (!interaction) continue; if (interaction.type === "prompt") continue; + if (interaction.type === "phase_start") { + activeAssistant = null; + items.push(interaction); + continue; + } + if (interaction.type === "assistant_delta") { const assistant = ensureAssistant(interaction); const lastSegment = assistant.segments[assistant.segments.length - 1]; @@ -80,6 +86,28 @@ function normalizeConversation(interactions) { return items; } +function getConversationWithRunMarkers(conversation) { + const startMarkers = conversation.filter((item) => item?.type === "phase_start"); + const shouldShowMarkers = startMarkers.length > 1 || startMarkers.some((item) => Number(item.runIndex) > 1); + if (!shouldShowMarkers) { + return conversation.filter((item) => item?.type !== "phase_start"); + } + + const items = []; + for (const item of conversation) { + if (item?.type === "phase_start" && Number(item.runIndex) > 1) { + const triggerMessages = []; + while (items[items.length - 1]?.role === "user") { + triggerMessages.unshift(items.pop()); + } + items.push(item, ...triggerMessages); + } else { + items.push(item); + } + } + return items; +} + function getBackendLabel(backend) { if (String(backend || "").startsWith("ai-api:")) return `AI API: ${String(backend).slice("ai-api:".length)}`; if (backend === "ai-api") return "AI API"; @@ -198,6 +226,18 @@ function ConversationItem({ item, t }) { ); } +function RunDivider({ item, t }) { + return ( +
+
+ + {t("stepDetail.runDivider", { count: item.runIndex || "" })} + +
+
+ ); +} + function LoadingItem({ t }) { return (
@@ -230,7 +270,7 @@ export default function StepDetail({ phase, content, artifact, interactions = [] const activeTask = useWorkflowStore((s) => s.activeTask); const workflowState = useWorkflowStore((s) => s.workflowState); - const conversation = normalizeConversation(interactions); + const conversation = getConversationWithRunMarkers(normalizeConversation(interactions)); const isCheckpoint = phaseTypes?.[phase] === "checkpoint"; const currentBackend = isCheckpoint ? t("stepDetail.checkpoint") : getBackendLabel(getCurrentBackend(activeBackend, interactions)); const showLoading = !isCheckpoint && isWaitingForAssistant(conversation, isRunning, isStreaming); @@ -458,7 +498,11 @@ export default function StepDetail({ phase, content, artifact, interactions = []
{conversation.length > 0 ? ( conversation.map((item, idx) => ( - + item.type === "phase_start" ? ( + + ) : ( + + ) )) ) : !showLoading ? (
diff --git a/apps/desktop/renderer/src/lib/i18n.ts b/apps/desktop/renderer/src/lib/i18n.ts index c6a2cca..67bdcea 100644 --- a/apps/desktop/renderer/src/lib/i18n.ts +++ b/apps/desktop/renderer/src/lib/i18n.ts @@ -193,6 +193,7 @@ export const messages = { "stepDetail.user": "You", "stepDetail.assistant": "AI", "stepDetail.tool": "Tool", + "stepDetail.runDivider": "Run {count}", "stepDetail.toolsCount": "Tools ({count})", "stepDetail.attach": "Attach image", "stepDetail.attachments": "{count} attachment(s)", @@ -549,6 +550,7 @@ export const messages = { "stepDetail.user": "你", "stepDetail.assistant": "AI", "stepDetail.tool": "工具", + "stepDetail.runDivider": "第 {count} 轮", "stepDetail.toolsCount": "工具({count})", "stepDetail.attach": "添加图片", "stepDetail.attachments": "{count} 个附件", diff --git a/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts b/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts index 3b28051..62b1a4a 100644 --- a/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts +++ b/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts @@ -1,18 +1,24 @@ import { mkdir, readFile, rm, writeFile } from "fs/promises"; import { tmpdir } from "os"; import { join } from "path"; +import { EventEmitter } from "events"; +import { PassThrough } from "stream"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; const mocks = vi.hoisted(() => ({ query: vi.fn(), + spawn: vi.fn(), })); vi.mock("@anthropic-ai/claude-agent-sdk", () => ({ query: mocks.query, })); -vi.mock("@openai/codex-sdk", () => ({ - Codex: vi.fn(), +vi.mock("child_process", () => ({ + spawn: mocks.spawn, + default: { + spawn: mocks.spawn, + }, })); import { createSdkAgentAdapter } from "../../../../../packages/core-lib/langgraph-runtime"; @@ -57,6 +63,7 @@ describe("createSdkAgentAdapter output artifacts", () => { beforeEach(async () => { taskDir = await mkdir(join(tmpdir(), `sdk-agent-adapter-${Date.now()}-`), { recursive: true }); mocks.query.mockReset(); + mocks.spawn.mockReset(); }); afterEach(async () => { @@ -91,4 +98,53 @@ describe("createSdkAgentAdapter output artifacts", () => { summary: "Written by the agent tool", }); }); + + test("runs read-only Codex steps with a writable task directory", async () => { + const child = new EventEmitter(); + child.stdin = new PassThrough(); + child.stdout = new PassThrough(); + child.stderr = new PassThrough(); + mocks.spawn.mockReturnValue(child); + const adapter = createSdkAgentAdapter({ workFolder: "/project", taskDir }); + + const runPromise = adapter.runAgent(buildRun({ + agent: { + backend: "codex", + workspaceAccess: "read", + options: { thread: { modelReasoningEffort: "low" } }, + }, + state: { + taskId: "task-1", + runId: "run-1", + workFolder: "/project", + taskDir, + taskInputs: {}, + stepOutputs: {}, + }, + })); + + child.stdout.write(`${JSON.stringify({ type: "thread.started", thread_id: "codex-thread" })}\n`); + child.stdout.write(`${JSON.stringify({ type: "item.completed", item: { id: "msg-1", type: "agent_message", text: "Codex response" } })}\n`); + child.stdout.write(`${JSON.stringify({ type: "turn.completed", usage: null })}\n`); + child.stdout.end(); + child.emit("exit", 0, null); + + await runPromise; + + expect(mocks.spawn).toHaveBeenCalledWith( + expect.stringContaining("@openai/codex"), + expect.arrayContaining([ + "exec", + "--json", + "--sandbox", + "workspace-write", + "--cd", + taskDir, + "--skip-git-repo-check", + "--config", + "model_reasoning_effort=\"low\"", + ]), + expect.objectContaining({ env: expect.any(Object), signal: expect.any(AbortSignal) }), + ); + }); }); diff --git a/apps/desktop/renderer/src/pages/HomePage.tsx b/apps/desktop/renderer/src/pages/HomePage.tsx index 1b884af..35d5c41 100644 --- a/apps/desktop/renderer/src/pages/HomePage.tsx +++ b/apps/desktop/renderer/src/pages/HomePage.tsx @@ -346,7 +346,6 @@ export default function HomePage() { const activeWorkflowFile = useConfigStore((s) => s.activeWorkflowFile); const workflowConfig = useConfigStore((s) => s.workflowConfig); const loadWorkFolders = useConfigStore((s) => s.loadWorkFolders); - const workflowState = useWorkflowStore((s) => s.workflowState); const workflowStatesByRun = useWorkflowStore((s) => s.workflowStatesByRun); const syncTaskSummaries = useWorkflowStore((s) => s.syncTaskSummaries); const startWorkflow = useWorkflowStore((s) => s.startWorkflow); @@ -368,13 +367,12 @@ export default function HomePage() { const tasks = taskSummaries.map((task) => { const taskRunId = task.runId || task.taskId || ""; const sharedState = workflowStatesByRun[`${task.taskId}:${taskRunId}`]; - const state = sharedState || workflowState; + const state = sharedState; const stateRunId = state?.runId || ""; if ( state && task.taskId === state.taskId && - taskRunId === stateRunId && - state.overallStatus !== "completed" + taskRunId === stateRunId ) { return { ...task, diff --git a/apps/desktop/renderer/src/pages/TaskPage.tsx b/apps/desktop/renderer/src/pages/TaskPage.tsx index 9852776..b0e7722 100644 --- a/apps/desktop/renderer/src/pages/TaskPage.tsx +++ b/apps/desktop/renderer/src/pages/TaskPage.tsx @@ -12,7 +12,7 @@ import WorkflowDebugPanel from "../components/WorkflowDebugPanel"; import { getAppApi } from "../lib/api-client"; import { buildClientDebugPayload, DEBUG_EVENT_TRIGGERS, DEBUG_EVENT_TYPES } from "../lib/debug-events"; import { cn } from "../lib/utils"; -import { pushClientDebugEvent, useWorkflowStore } from "../stores/workflowStore"; +import { getWorkflowStateKey, pushClientDebugEvent, useWorkflowStore } from "../stores/workflowStore"; import { useConfigStore } from "../stores/configStore"; const appApi = getAppApi(); @@ -384,24 +384,27 @@ export default function TaskPage() { const activeTask = useWorkflowStore((s) => s.activeTask); const loadTask = useWorkflowStore((s) => s.loadTask); const showToast = useWorkflowStore((s) => s.showToast); - const workflowState = useWorkflowStore((s) => s.workflowState); + const workflowStatesByRun = useWorkflowStore((s) => s.workflowStatesByRun); + const taskId = urlTaskId || activeTask; + const routeStateKey = getWorkflowStateKey(taskId, urlRunId || taskId); + const workflowState = workflowStatesByRun[routeStateKey] || null; useEffect(() => { if (urlTaskId && (urlTaskId !== activeTask || workflowState?.taskId !== urlTaskId || (urlRunId && workflowState?.runId !== urlRunId))) { loadTask(urlTaskId, urlRunId); } }, [urlTaskId, urlRunId, activeTask, workflowState?.taskId, workflowState?.runId]); - const selectedPhase = useWorkflowStore((s) => s.selectedPhase); + const selectedPhaseByRun = useWorkflowStore((s) => s.selectedPhaseByRun); const setSelectedPhase = useWorkflowStore((s) => s.setSelectedPhase); - const phaseMessages = useWorkflowStore((s) => s.phaseMessages); - const phaseOutputArtifacts = useWorkflowStore((s) => s.phaseOutputArtifacts); - const phaseInteractions = useWorkflowStore((s) => s.phaseInteractions); - const isStreaming = useWorkflowStore((s) => s.isStreaming); - const streamingPhase = useWorkflowStore((s) => s.streamingPhase); - const debugEvents = useWorkflowStore((s) => s.debugEvents); - const connectionState = useWorkflowStore((s) => s.connectionState); - const lastEventAt = useWorkflowStore((s) => s.lastEventAt); - const lastError = useWorkflowStore((s) => s.lastError); + const phaseMessagesByRun = useWorkflowStore((s) => s.phaseMessagesByRun); + const phaseOutputArtifactsByRun = useWorkflowStore((s) => s.phaseOutputArtifactsByRun); + const phaseInteractionsByRun = useWorkflowStore((s) => s.phaseInteractionsByRun); + const isStreamingByRun = useWorkflowStore((s) => s.isStreamingByRun); + const streamingPhaseByRun = useWorkflowStore((s) => s.streamingPhaseByRun); + const debugEventsByRun = useWorkflowStore((s) => s.debugEventsByRun); + const connectionStateByRun = useWorkflowStore((s) => s.connectionStateByRun); + const lastEventAtByRun = useWorkflowStore((s) => s.lastEventAtByRun); + const lastErrorByRun = useWorkflowStore((s) => s.lastErrorByRun); const approve = useWorkflowStore((s) => s.approve); const reject = useWorkflowStore((s) => s.reject); const sendMessage = useWorkflowStore((s) => s.sendMessage); @@ -413,7 +416,6 @@ export default function TaskPage() { const workflowConfig = useConfigStore((s) => s.workflowConfig); const runWorkflowConfig = workflowState?.workflowConfig || workflowConfig; - const taskId = urlTaskId || activeTask; const [cleanedWorktree, setCleanedWorktree] = useState(null); const visibleWorktree = workflowState?.worktree || cleanedWorktree; const worktreeDisplayName = getWorktreeDisplayName(visibleWorktree); @@ -421,6 +423,19 @@ export default function TaskPage() { const isWorktreeCleaned = Boolean(visibleWorktree?.cleaned); const hasWorktree = Boolean(visibleWorktree?.enabled); const currentPhase = workflowState?.currentPhase; + const currentStateKey = workflowState + ? getWorkflowStateKey(workflowState.taskId, workflowState.runId) + : routeStateKey; + const selectedPhase = selectedPhaseByRun[currentStateKey] || null; + const phaseMessages = phaseMessagesByRun[currentStateKey] || {}; + const phaseOutputArtifacts = phaseOutputArtifactsByRun[currentStateKey] || {}; + const phaseInteractions = phaseInteractionsByRun[currentStateKey] || {}; + const isStreaming = Boolean(isStreamingByRun[currentStateKey]); + const streamingPhase = streamingPhaseByRun[currentStateKey] || null; + const debugEvents = debugEventsByRun[currentStateKey] || []; + const connectionState = connectionStateByRun[currentStateKey] || "disconnected"; + const lastEventAt = lastEventAtByRun[currentStateKey] || null; + const lastError = lastErrorByRun[currentStateKey] || null; const phases = workflowState?.phases || []; const selectedRunPhase = selectedPhase; const activePhase = selectedRunPhase || currentPhase || phases[0]?.id || null; @@ -742,7 +757,7 @@ export default function TaskPage() { phases={phases} workflowConfig={runWorkflowConfig} selectedPhase={detailPhase} - onSelect={setSelectedPhase} + onSelect={(phase) => setSelectedPhase(phase, currentStateKey)} />
diff --git a/apps/desktop/renderer/src/stores/workflowStore.test.ts b/apps/desktop/renderer/src/stores/workflowStore.test.ts new file mode 100644 index 0000000..d3343c3 --- /dev/null +++ b/apps/desktop/renderer/src/stores/workflowStore.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, test } from "vitest"; +import { getSelectedPhaseFromWorkflowState } from "./workflowStore"; + +describe("getSelectedPhaseFromWorkflowState", () => { + test("prefers the active phase over a stale currentPhase", () => { + expect(getSelectedPhaseFromWorkflowState({ + currentPhase: "implement", + phases: [ + { id: "implement", status: "completed" }, + { id: "code_review", status: "in_progress" }, + ], + })).toBe("code_review"); + }); + + test("shows the latest started phase when the run is completed", () => { + expect(getSelectedPhaseFromWorkflowState({ + currentPhase: "completed", + phases: [ + { id: "implement", status: "completed" }, + { id: "code_review", status: "completed" }, + ], + })).toBe("code_review"); + }); + + test("falls back to currentPhase when there is no active phase", () => { + expect(getSelectedPhaseFromWorkflowState({ + currentPhase: "code_review", + phases: [ + { id: "implement", status: "completed" }, + { id: "code_review", status: "pending" }, + ], + })).toBe("code_review"); + }); +}); diff --git a/apps/desktop/renderer/src/stores/workflowStore.ts b/apps/desktop/renderer/src/stores/workflowStore.ts index 9df1f59..2450011 100644 --- a/apps/desktop/renderer/src/stores/workflowStore.ts +++ b/apps/desktop/renderer/src/stores/workflowStore.ts @@ -25,12 +25,19 @@ function playNotificationSound() { let prevStatusRef = {}; let unsubscribeWorkflowEvents = null; -function getWorkflowStateKey(taskId, runId = "") { +export function getWorkflowStateKey(taskId, runId = "") { const id = taskId || ""; const run = runId || taskId || ""; return `${id}:${run}`; } +function getEventStateKey(msg) { + const taskId = msg?.taskId || msg?.state?.taskId || ""; + const runId = msg?.runId || msg?.state?.runId || taskId; + if (!taskId) return ""; + return getWorkflowStateKey(taskId, runId); +} + function getCurrentPhaseFromTask(task) { if (task.status === "completed") return "completed"; const activePhase = (task.phases || []).find((phase) => @@ -39,9 +46,32 @@ function getCurrentPhaseFromTask(task) { return activePhase?.id || activePhase?.name || task.phases?.[0]?.id || null; } +export function getSelectedPhaseFromWorkflowState(state) { + const phases = state?.phases || []; + const activePhase = phases.find((phase) => + phase.status === "in_progress" || phase.status === "paused" || phase.status === "awaiting_input" || phase.status === "failed" + ); + if (activePhase) return activePhase.id || activePhase.name || null; + + if (state?.currentPhase && state.currentPhase !== "completed") { + const currentPhase = phases.find((phase) => (phase.id || phase.name) === state.currentPhase); + if (currentPhase) return currentPhase.id || currentPhase.name || null; + } + + const latestStartedPhase = phases.findLast?.((phase) => phase.status && phase.status !== "pending") + || [...phases].reverse().find((phase) => phase.status && phase.status !== "pending"); + return latestStartedPhase?.id || latestStartedPhase?.name || phases[0]?.id || phases[0]?.name || null; +} + +function getRunningPhaseFromWorkflowState(state) { + const runningPhase = (state?.phases || []).find((phase) => phase.status === "in_progress"); + return runningPhase?.id || runningPhase?.name || null; +} + function getWorkflowStateFromTask(task) { const taskId = task.taskId || ""; if (!taskId) return null; + if (!Array.isArray(task.phases) || task.phases.length === 0) return null; const runId = task.runId || taskId; return { taskId, @@ -49,6 +79,7 @@ function getWorkflowStateFromTask(task) { workFolder: task.workFolderPath || "", currentPhase: getCurrentPhaseFromTask(task), overallStatus: task.status || "pending", + updated: task.updated || task.updatedAt || "", workflowConfig: task.workflowConfig || null, phases: task.phases || [], }; @@ -102,25 +133,50 @@ function createDebugEvent(payload, source = "workflow") { }; } -function appendDebugEvent(set, payload, source = "workflow") { +function appendDebugEvent(set, get, payload, source = "workflow") { const event = createDebugEvent(payload, source); set((state) => { - const debugEvents = [...state.debugEvents, event]; + const stateKey = getEventStateKey(payload) || getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId); + const debugEvents = stateKey ? [...(state.debugEventsByRun[stateKey] || []), event] : []; const nextState = { - debugEvents: debugEvents.slice(-MAX_DEBUG_EVENTS), - lastEventAt: event.at, - connectionState: source === "workflow" ? "connected" : state.connectionState, + debugEventsByRun: stateKey + ? { + ...state.debugEventsByRun, + [stateKey]: debugEvents.slice(-MAX_DEBUG_EVENTS), + } + : state.debugEventsByRun, + lastEventAtByRun: stateKey + ? { + ...state.lastEventAtByRun, + [stateKey]: event.at, + } + : state.lastEventAtByRun, + connectionStateByRun: stateKey && source === "workflow" + ? { + ...state.connectionStateByRun, + [stateKey]: "connected", + } + : state.connectionStateByRun, }; - if (payload?.type === DEBUG_EVENT_TYPES.ERROR || payload?.type === DEBUG_EVENT_TYPES.PHASE_FAILED) { - nextState.lastError = { - at: event.at, - phase: payload.phase || null, - message: payload.message || "Unknown error", - payload, + if (stateKey && (payload?.type === DEBUG_EVENT_TYPES.ERROR || payload?.type === DEBUG_EVENT_TYPES.PHASE_FAILED)) { + nextState.lastErrorByRun = { + ...state.lastErrorByRun, + [stateKey]: { + at: event.at, + phase: payload.phase || null, + message: payload.message || "Unknown error", + payload, + }, + }; + nextState.isStreamingByRun = { + ...state.isStreamingByRun, + [stateKey]: false, + }; + nextState.streamingPhaseByRun = { + ...state.streamingPhaseByRun, + [stateKey]: null, }; - nextState.isStreaming = false; - nextState.streamingPhase = null; } return nextState; @@ -130,9 +186,17 @@ function appendDebugEvent(set, payload, source = "workflow") { export function pushClientDebugEvent(payload) { useWorkflowStore.setState((state) => { const event = createDebugEvent(payload, "client"); + const stateKey = getEventStateKey(payload) || getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId); + if (!stateKey) return {}; return { - debugEvents: [...state.debugEvents, event].slice(-MAX_DEBUG_EVENTS), - lastEventAt: event.at, + debugEventsByRun: { + ...state.debugEventsByRun, + [stateKey]: [...(state.debugEventsByRun[stateKey] || []), event].slice(-MAX_DEBUG_EVENTS), + }, + lastEventAtByRun: { + ...state.lastEventAtByRun, + [stateKey]: event.at, + }, }; }); } @@ -140,219 +204,408 @@ export function pushClientDebugEvent(payload) { export function pushClientErrorEvent(payload) { useWorkflowStore.setState((state) => { const event = createDebugEvent(payload, "client"); + const stateKey = getEventStateKey(payload) || getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId); + if (!stateKey) return {}; return { - debugEvents: [...state.debugEvents, event].slice(-MAX_DEBUG_EVENTS), - lastEventAt: event.at, - lastError: { - at: event.at, - phase: payload.phase || null, - message: payload.message || "Unknown error", - payload, - }, - isStreaming: false, - streamingPhase: null, + debugEventsByRun: { + ...state.debugEventsByRun, + [stateKey]: [...(state.debugEventsByRun[stateKey] || []), event].slice(-MAX_DEBUG_EVENTS), + }, + lastEventAtByRun: { + ...state.lastEventAtByRun, + [stateKey]: event.at, + }, + lastErrorByRun: { + ...state.lastErrorByRun, + [stateKey]: { + at: event.at, + phase: payload.phase || null, + message: payload.message || "Unknown error", + payload, + }, + }, + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: null, + }, }; }); } -function matchesWorkflowEvent(msg, taskId, runId = "") { - const eventTaskId = msg?.taskId || msg?.state?.taskId || ""; - const eventRunId = msg?.runId || msg?.state?.runId || ""; - if (eventTaskId !== taskId) return false; - if (runId && eventRunId !== runId) return false; - return true; -} - -function appendPhaseInteraction(set, phase, interaction) { - if (!phase || !interaction) return; - set((state) => ({ - phaseInteractions: { - ...state.phaseInteractions, - [phase]: [...(state.phaseInteractions[phase] || []), interaction], - }, - })); -} - -function markPhaseRunning(set, phaseId) { +function markPhaseRunning(set, msg, phaseId) { if (!phaseId) return; set((state) => { - if (!state.workflowState) return {}; + const stateKey = getEventStateKey(msg); + if (!stateKey) return {}; + const existingState = state.workflowStatesByRun[stateKey] || ( + getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId) === stateKey + ? state.workflowState + : null + ); + if (!existingState) return {}; + + const now = new Date().toISOString(); const workflowState = { - ...state.workflowState, + ...existingState, currentPhase: phaseId, overallStatus: "in_progress", - phases: (state.workflowState.phases || []).map((phase) => ({ + updated: now, + phases: (existingState.phases || []).map((phase) => ({ ...phase, status: phase.id === phaseId ? "in_progress" : phase.status === "in_progress" ? "completed" : phase.status, + updated: phase.id === phaseId || phase.status === "in_progress" ? now : phase.updated, })), }; - const stateKey = getWorkflowStateKey(workflowState.taskId, workflowState.runId); + const activeStateKey = getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId); + const isActiveState = activeStateKey === stateKey; return { - selectedPhase: phaseId, - workflowState, + workflowState: isActiveState ? workflowState : state.workflowState, workflowStatesByRun: { ...state.workflowStatesByRun, [stateKey]: workflowState, }, + selectedPhaseByRun: { + ...state.selectedPhaseByRun, + [stateKey]: phaseId, + }, + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: true, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: phaseId, + }, + connectionStateByRun: { + ...state.connectionStateByRun, + [stateKey]: "connected", + }, }; }); } function applyDisconnectedState(set, get) { const state = get().workflowState; + const stateKey = getWorkflowStateKey(state?.taskId, state?.runId); if (state) { const updated = { ...state, overallStatus: state.overallStatus, phases: state.phases.map((phase) => ({ ...phase })), }; - set({ workflowState: updated, isStreaming: false, streamingPhase: null, connectionState: "disconnected" }); + set((current) => ({ + workflowState: updated, + isStreamingByRun: { + ...current.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...current.streamingPhaseByRun, + [stateKey]: null, + }, + connectionStateByRun: { + ...current.connectionStateByRun, + [stateKey]: "disconnected", + }, + })); } else { - set({ isStreaming: false, streamingPhase: null, connectionState: "disconnected" }); + set({}); } } -function attachWorkflowEvents(set, get, taskId, runId = "") { - if (unsubscribeWorkflowEvents) unsubscribeWorkflowEvents(); +function isActiveWorkflowEvent(state, msg) { + const eventKey = getEventStateKey(msg); + if (!eventKey) return false; + return getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId) === eventKey; +} + +function attachWorkflowEvents(set, get) { + if (unsubscribeWorkflowEvents) return () => applyDisconnectedState(set, get); unsubscribeWorkflowEvents = desktopApi.onWorkflowEvent((msg) => { if (!msg) return; - if (!matchesWorkflowEvent(msg, taskId, runId)) return; - appendDebugEvent(set, msg); + appendDebugEvent(set, get, msg); if (msg.type === "phase_artifact") { if (!msg.outputKey) return; set((state) => ({ - phaseOutputArtifacts: { - ...state.phaseOutputArtifacts, - [msg.phase]: { - ...(state.phaseOutputArtifacts[msg.phase] || {}), - [msg.outputKey]: msg.content, + phaseOutputArtifactsByRun: { + ...state.phaseOutputArtifactsByRun, + [getEventStateKey(msg)]: { + ...(state.phaseOutputArtifactsByRun[getEventStateKey(msg)] || {}), + [msg.phase]: { + ...(state.phaseOutputArtifactsByRun[getEventStateKey(msg)]?.[msg.phase] || {}), + [msg.outputKey]: msg.content, + }, }, }, })); } else if (msg.type === "backend_selected") { - markPhaseRunning(set, msg.phase); + markPhaseRunning(set, msg, msg.phase); + set((state) => { + const stateKey = getEventStateKey(msg); + if (!stateKey || !msg.phase) return {}; + const existingInteractions = state.phaseInteractionsByRun[stateKey]?.[msg.phase] || []; + const lastInteraction = existingInteractions[existingInteractions.length - 1]; + if (lastInteraction?.type === "phase_start" && lastInteraction.backend === msg.backend) return {}; + const startIndexes = existingInteractions + .filter((item) => item?.type === "phase_start") + .map((item) => Number(item.runIndex) || 0); + const lastRunIndex = Math.max(0, ...startIndexes); + const hasEarlierConversation = existingInteractions.some((item) => item?.type !== "phase_start"); + const runIndex = lastRunIndex > 0 ? lastRunIndex + 1 : hasEarlierConversation ? 2 : 1; + return { + phaseInteractionsByRun: { + ...state.phaseInteractionsByRun, + [stateKey]: { + ...(state.phaseInteractionsByRun[stateKey] || {}), + [msg.phase]: [ + ...existingInteractions, + { + id: `phase-start-${Date.now()}`, + at: new Date().toISOString(), + phase: msg.phase, + role: "system", + type: "phase_start", + text: "", + backend: msg.backend, + runIndex, + }, + ], + }, + }, + }; + }); } else if (msg.type === "text_delta") { - markPhaseRunning(set, msg.phase); + markPhaseRunning(set, msg, msg.phase); set((state) => ({ - phaseMessages: { ...state.phaseMessages, [msg.phase]: (state.phaseMessages[msg.phase] || "") + msg.text }, - phaseInteractions: { - ...state.phaseInteractions, - [msg.phase]: [ - ...(state.phaseInteractions[msg.phase] || []), - { - role: "assistant", - type: "assistant_delta", - text: msg.text, - backend: msg.backend, - }, - ], + phaseMessagesByRun: { + ...state.phaseMessagesByRun, + [getEventStateKey(msg)]: { + ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}), + [msg.phase]: (state.phaseMessagesByRun[getEventStateKey(msg)]?.[msg.phase] || "") + msg.text, + }, + }, + phaseInteractionsByRun: { + ...state.phaseInteractionsByRun, + [getEventStateKey(msg)]: { + ...(state.phaseInteractionsByRun[getEventStateKey(msg)] || {}), + [msg.phase]: [ + ...(state.phaseInteractionsByRun[getEventStateKey(msg)]?.[msg.phase] || []), + { + role: "assistant", + type: "assistant_delta", + text: msg.text, + backend: msg.backend, + }, + ], + }, }, - isStreaming: true, - streamingPhase: msg.phase, })); } else if (msg.type === "state") { const state = msg.state; const stateKey = getWorkflowStateKey(state.taskId, state.runId); - set((current) => ({ - workflowState: state, - workflowStatesByRun: { - ...current.workflowStatesByRun, - [stateKey]: state, - }, - })); + const cachedState = get().workflowStatesByRun[stateKey]; + const displayState = getLatestWorkflowState(state, cachedState); + let isActiveState = false; + set((current) => { + const activeStateKey = getWorkflowStateKey(current.workflowState?.taskId, current.workflowState?.runId); + isActiveState = activeStateKey === stateKey || (!current.workflowState && current.activeTask === displayState.taskId); + return { + workflowState: isActiveState ? displayState : current.workflowState, + workflowStatesByRun: { + ...current.workflowStatesByRun, + [stateKey]: displayState, + }, + selectedPhaseByRun: { + ...current.selectedPhaseByRun, + [stateKey]: getSelectedPhaseFromWorkflowState(displayState), + }, + }; + }); - const prevPhase = prevStatusRef._currentPhase; + const prevStatus = prevStatusRef[stateKey] || {}; + const prevPhase = prevStatus._currentPhase; + const selectedPhase = getSelectedPhaseFromWorkflowState(displayState); - if (state.currentPhase && state.currentPhase !== prevPhase) { - if (state.currentPhase !== "completed") set({ selectedPhase: state.currentPhase }); + if (isActiveState && displayState.currentPhase && displayState.currentPhase !== prevPhase) { + if (selectedPhase) { + set((state) => ({ + selectedPhaseByRun: { + ...state.selectedPhaseByRun, + [stateKey]: selectedPhase, + }, + })); + } if (prevPhase) playNotificationSound(); } let streaming = false; - for (const phase of state.phases) { - const prevStatus = prevStatusRef[phase.id]; - if (phase.status === "awaiting_input" && prevStatus !== "awaiting_input") { + for (const phase of displayState.phases) { + const prevPhaseStatus = prevStatus[phase.id]; + if (isActiveState && phase.status === "awaiting_input" && prevPhaseStatus !== "awaiting_input") { playNotificationSound(); - set({ selectedPhase: phase.id }); + set((state) => ({ + selectedPhaseByRun: { + ...state.selectedPhaseByRun, + [stateKey]: phase.id, + }, + })); } - if (phase.status === "paused" && prevStatus !== "paused") { - set({ selectedPhase: phase.id }); + if (isActiveState && phase.status === "paused" && prevPhaseStatus !== "paused") { + set((state) => ({ + selectedPhaseByRun: { + ...state.selectedPhaseByRun, + [stateKey]: phase.id, + }, + })); } if (phase.status === "in_progress") streaming = true; } - if (!streaming) { - set({ isStreaming: false, streamingPhase: null }); + if (isActiveState && !streaming) { + set((state) => ({ + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: null, + }, + })); } const statusMap = {}; - for (const phase of state.phases) statusMap[phase.id] = phase.status; - statusMap._currentPhase = state.currentPhase; - prevStatusRef = statusMap; + for (const phase of displayState.phases) statusMap[phase.id] = phase.status; + statusMap._currentPhase = displayState.currentPhase; + prevStatusRef = { + ...prevStatusRef, + [stateKey]: statusMap, + }; - if (state.overallStatus === "completed") { + if (displayState.overallStatus === "completed") { useConfigStore.getState().loadWorkFolders(); } } else if (msg.type === "phase_done") { - set({ isStreaming: false, streamingPhase: null }); + set((state) => { + const stateKey = getEventStateKey(msg); + if (!stateKey) return {}; + return { + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: null, + }, + }; + }); } else if (msg.type === DEBUG_EVENT_TYPES.PHASE_PAUSED) { - set({ isStreaming: false, streamingPhase: null }); + set((state) => { + const stateKey = getEventStateKey(msg); + if (!stateKey) return {}; + return { + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: null, + }, + }; + }); } else if (msg.type === "phase_content") { - set((state) => ({ phaseMessages: { ...state.phaseMessages, [msg.phase]: msg.content } })); + set((state) => ({ + phaseMessagesByRun: { + ...state.phaseMessagesByRun, + [getEventStateKey(msg)]: { + ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}), + [msg.phase]: msg.content, + }, + }, + })); } else if (msg.type === "phase_interaction") { - appendPhaseInteraction(set, msg.phase, msg.interaction); + set((state) => { + if (!msg.phase || !msg.interaction) return {}; + const nextByRun = { + ...state.phaseInteractionsByRun, + [getEventStateKey(msg)]: { + ...(state.phaseInteractionsByRun[getEventStateKey(msg)] || {}), + [msg.phase]: [...(state.phaseInteractionsByRun[getEventStateKey(msg)]?.[msg.phase] || []), msg.interaction], + }, + }; + return { phaseInteractionsByRun: nextByRun }; + }); } else if (msg.type === "user_message") { set((state) => ({ - phaseMessages: { - ...state.phaseMessages, - [msg.phase]: (state.phaseMessages[msg.phase] || "") + `\n\n---\n\n**You:** ${msg.text}\n\n`, + phaseMessagesByRun: { + ...state.phaseMessagesByRun, + [getEventStateKey(msg)]: { + ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}), + [msg.phase]: (state.phaseMessagesByRun[getEventStateKey(msg)]?.[msg.phase] || "") + `\n\n---\n\n**You:** ${msg.text}\n\n`, + }, }, })); } else if (msg.type === "tool_use") { - markPhaseRunning(set, msg.phase); + markPhaseRunning(set, msg, msg.phase); set((state) => ({ - phaseMessages: { - ...state.phaseMessages, - [msg.phase]: (state.phaseMessages[msg.phase] || "") + `\n\n*${msg.log || msg.name}*\n\n`, + phaseMessagesByRun: { + ...state.phaseMessagesByRun, + [getEventStateKey(msg)]: { + ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}), + [msg.phase]: (state.phaseMessagesByRun[getEventStateKey(msg)]?.[msg.phase] || "") + `\n\n*${msg.log || msg.name}*\n\n`, + }, }, - phaseInteractions: { - ...state.phaseInteractions, - [msg.phase]: [ - ...(state.phaseInteractions[msg.phase] || []), - { - role: "tool", - type: "tool_use", - text: msg.log || msg.name || "", - backend: msg.name, - }, - ], + phaseInteractionsByRun: { + ...state.phaseInteractionsByRun, + [getEventStateKey(msg)]: { + ...(state.phaseInteractionsByRun[getEventStateKey(msg)] || {}), + [msg.phase]: [ + ...(state.phaseInteractionsByRun[getEventStateKey(msg)]?.[msg.phase] || []), + { + role: "tool", + type: "tool_use", + text: msg.log || msg.name || "", + backend: msg.name, + }, + ], + }, }, })); } else if (msg.type === "session_attached" && msg.phase && msg.sessionId) { - set((state) => ({ - workflowState: state.workflowState - ? { - ...state.workflowState, - phases: state.workflowState.phases.map((phase) => - phase.id === msg.phase ? { ...phase, sessionId: msg.sessionId } : phase - ), - } - : state.workflowState, - })); + set((state) => { + const stateKey = getEventStateKey(msg); + const existingState = state.workflowStatesByRun[stateKey] || ( + isActiveWorkflowEvent(state, msg) ? state.workflowState : null + ); + if (!existingState) return {}; + const nextWorkflowState = { + ...existingState, + phases: existingState.phases.map((phase) => + phase.id === msg.phase ? { ...phase, sessionId: msg.sessionId } : phase + ), + }; + return { + workflowState: isActiveWorkflowEvent(state, msg) ? nextWorkflowState : state.workflowState, + workflowStatesByRun: { + ...state.workflowStatesByRun, + [stateKey]: nextWorkflowState, + }, + }; + }); } }); return () => { - pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.CLIENT_DETACH, { taskId, runId })); - if (taskId) desktopApi.detachWorkflow(taskId, runId); - if (unsubscribeWorkflowEvents) { - unsubscribeWorkflowEvents(); - unsubscribeWorkflowEvents = null; - } applyDisconnectedState(set, get); }; } @@ -361,20 +614,27 @@ export const useWorkflowStore = create((set, get) => ({ activeTask: null, workflowState: null, workflowStatesByRun: {}, - selectedPhase: null, - phaseMessages: {}, - phaseOutputArtifacts: {}, - phaseInteractions: {}, - isStreaming: false, - streamingPhase: null, - connectionState: "disconnected", - debugEvents: [], - lastEventAt: null, - lastError: null, + selectedPhaseByRun: {}, + phaseMessagesByRun: {}, + phaseOutputArtifactsByRun: {}, + phaseInteractionsByRun: {}, + isStreamingByRun: {}, + streamingPhaseByRun: {}, + connectionStateByRun: {}, + debugEventsByRun: {}, + lastEventAtByRun: {}, + lastErrorByRun: {}, toast: null, - setSelectedPhase(phase) { - set({ selectedPhase: phase }); + setSelectedPhase(phase, stateKey = "") { + const key = stateKey || getWorkflowStateKey(get().workflowState?.taskId, get().workflowState?.runId); + if (!key) return; + set((state) => ({ + selectedPhaseByRun: { + ...state.selectedPhaseByRun, + [key]: phase, + }, + })); }, showToast(message, duration = 3000) { @@ -382,10 +642,18 @@ export const useWorkflowStore = create((set, get) => ({ setTimeout(() => set({ toast: null }), duration); }, + ensureWorkflowEvents() { + attachWorkflowEvents(set, get); + }, + syncTaskSummaries(tasks) { if (!Array.isArray(tasks) || tasks.length === 0) return; set((state) => { const workflowStatesByRun = { ...state.workflowStatesByRun }; + const selectedPhaseByRun = { ...state.selectedPhaseByRun }; + const isStreamingByRun = { ...state.isStreamingByRun }; + const streamingPhaseByRun = { ...state.streamingPhaseByRun }; + const connectionStateByRun = { ...state.connectionStateByRun }; let changed = false; for (const task of tasks) { @@ -393,9 +661,18 @@ export const useWorkflowStore = create((set, get) => ({ if (!summaryState) continue; const stateKey = getWorkflowStateKey(summaryState.taskId, summaryState.runId); const existingState = workflowStatesByRun[stateKey]; - if (existingState?.overallStatus === "completed") continue; - workflowStatesByRun[stateKey] = mergeWorkflowStates(existingState, summaryState); - changed = true; + const nextState = getLatestWorkflowState(summaryState, existingState); + if (nextState !== existingState) { + workflowStatesByRun[stateKey] = nextState; + selectedPhaseByRun[stateKey] = getSelectedPhaseFromWorkflowState(nextState); + const runningPhase = getRunningPhaseFromWorkflowState(nextState); + isStreamingByRun[stateKey] = Boolean(runningPhase); + streamingPhaseByRun[stateKey] = runningPhase; + connectionStateByRun[stateKey] = nextState.overallStatus === "completed" + ? "disconnected" + : connectionStateByRun[stateKey] || (runningPhase ? "connected" : "connecting"); + changed = true; + } } if (!changed) return {}; @@ -403,6 +680,10 @@ export const useWorkflowStore = create((set, get) => ({ const activeKey = getWorkflowStateKey(activeState?.taskId, activeState?.runId); return { workflowStatesByRun, + selectedPhaseByRun, + isStreamingByRun, + streamingPhaseByRun, + connectionStateByRun, workflowState: workflowStatesByRun[activeKey] || activeState, }; }); @@ -413,39 +694,77 @@ export const useWorkflowStore = create((set, get) => ({ const stateKey = getWorkflowStateKey(taskId, runId); const cachedState = get().workflowStatesByRun[stateKey]; if (cachedState) { - const selectedPhase = cachedState.currentPhase && cachedState.currentPhase !== "completed" - ? cachedState.currentPhase - : cachedState.phases?.[0]?.id || null; + const selectedPhase = getSelectedPhaseFromWorkflowState(cachedState); + const runningPhase = getRunningPhaseFromWorkflowState(cachedState); set({ activeTask: taskId, workflowState: cachedState, - selectedPhase, - connectionState: cachedState.overallStatus === "completed" ? "disconnected" : get().connectionState, + selectedPhaseByRun: { + ...get().selectedPhaseByRun, + [stateKey]: selectedPhase, + }, + isStreamingByRun: { + ...get().isStreamingByRun, + [stateKey]: Boolean(runningPhase), + }, + streamingPhaseByRun: { + ...get().streamingPhaseByRun, + [stateKey]: runningPhase, + }, + connectionStateByRun: { + ...get().connectionStateByRun, + [stateKey]: cachedState.overallStatus === "completed" + ? "disconnected" + : get().connectionStateByRun[stateKey] || (runningPhase ? "connected" : "connecting"), + }, }); } const { state, messages, outputArtifacts, interactions } = await desktopApi.getTaskState(taskId, runId); const currentState = get().workflowStatesByRun[stateKey]; const displayState = getLatestWorkflowState(state, currentState); - const selectedPhase = displayState.currentPhase && displayState.currentPhase !== "completed" - ? displayState.currentPhase - : displayState.phases?.[0]?.id || null; + const displayStateKey = getWorkflowStateKey(displayState.taskId, displayState.runId || runId); + const selectedPhase = getSelectedPhaseFromWorkflowState(displayState); + const runningPhase = getRunningPhaseFromWorkflowState(displayState); + const existingMessages = get().phaseMessagesByRun[displayStateKey] || {}; + const existingOutputArtifacts = get().phaseOutputArtifactsByRun[displayStateKey] || {}; + const existingInteractions = get().phaseInteractionsByRun[displayStateKey] || {}; set({ activeTask: taskId, workflowState: displayState, - phaseMessages: messages || {}, - phaseOutputArtifacts: outputArtifacts || {}, - phaseInteractions: interactions || {}, - selectedPhase, - isStreaming: false, - streamingPhase: null, - connectionState: displayState.overallStatus === "completed" ? "disconnected" : "connecting", - debugEvents: [], - lastEventAt: null, - lastError: null, + selectedPhaseByRun: { + ...get().selectedPhaseByRun, + [displayStateKey]: selectedPhase, + }, + isStreamingByRun: { + ...get().isStreamingByRun, + [displayStateKey]: Boolean(runningPhase), + }, + streamingPhaseByRun: { + ...get().streamingPhaseByRun, + [displayStateKey]: runningPhase, + }, + connectionStateByRun: { + ...get().connectionStateByRun, + [displayStateKey]: displayState.overallStatus === "completed" + ? "disconnected" + : get().connectionStateByRun[displayStateKey] || (runningPhase ? "connected" : "connecting"), + }, workflowStatesByRun: { ...get().workflowStatesByRun, - [stateKey]: displayState, + [displayStateKey]: displayState, + }, + phaseMessagesByRun: { + ...get().phaseMessagesByRun, + [displayStateKey]: Object.keys(existingMessages).length ? existingMessages : messages || {}, + }, + phaseOutputArtifactsByRun: { + ...get().phaseOutputArtifactsByRun, + [displayStateKey]: Object.keys(existingOutputArtifacts).length ? existingOutputArtifacts : outputArtifacts || {}, + }, + phaseInteractionsByRun: { + ...get().phaseInteractionsByRun, + [displayStateKey]: Object.keys(existingInteractions).length ? existingInteractions : interactions || {}, }, }); @@ -456,8 +775,14 @@ export const useWorkflowStore = create((set, get) => ({ }, async connectWorkflow(taskId, workFolder, taskInputs, images, runId, worktreeName, workflowFilename) { - pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.CLIENT_CONNECT, { taskId, workFolder })); - set({ connectionState: "connecting" }); + pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.CLIENT_CONNECT, { taskId, runId, workFolder })); + const stateKey = getWorkflowStateKey(taskId, runId || taskId); + set((state) => ({ + connectionStateByRun: { + ...state.connectionStateByRun, + [stateKey]: "connecting", + }, + })); const detach = attachWorkflowEvents(set, get, taskId, runId); try { await desktopApi.startWorkflow({ taskId, workFolder, taskInputs, images, runId, worktreeName, workflowFilename }); @@ -497,19 +822,51 @@ export const useWorkflowStore = create((set, get) => ({ set({ activeTask: id, - selectedPhase: firstPhase, - phaseMessages: {}, - phaseOutputArtifacts: {}, - phaseInteractions: {}, workflowState, workflowStatesByRun: { ...get().workflowStatesByRun, [stateKey]: workflowState, }, - connectionState: "connecting", - debugEvents: [], - lastEventAt: null, - lastError: null, + selectedPhaseByRun: { + ...get().selectedPhaseByRun, + [stateKey]: firstPhase, + }, + phaseMessagesByRun: { + ...get().phaseMessagesByRun, + [stateKey]: {}, + }, + phaseOutputArtifactsByRun: { + ...get().phaseOutputArtifactsByRun, + [stateKey]: {}, + }, + phaseInteractionsByRun: { + ...get().phaseInteractionsByRun, + [stateKey]: {}, + }, + isStreamingByRun: { + ...get().isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...get().streamingPhaseByRun, + [stateKey]: null, + }, + connectionStateByRun: { + ...get().connectionStateByRun, + [stateKey]: "connecting", + }, + debugEventsByRun: { + ...get().debugEventsByRun, + [stateKey]: [], + }, + lastEventAtByRun: { + ...get().lastEventAtByRun, + [stateKey]: null, + }, + lastErrorByRun: { + ...get().lastErrorByRun, + [stateKey]: null, + }, }); prevStatusRef = {}; @@ -566,10 +923,20 @@ export const useWorkflowStore = create((set, get) => ({ const { activeTask, workflowState } = get(); if (!activeTask || !phase) return; pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.PHASE_RESUME_REQUESTED, { phase, trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR })); + const stateKey = getWorkflowStateKey(workflowState?.taskId, workflowState?.runId); set((state) => ({ - isStreaming: true, - streamingPhase: phase, - lastError: null, + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: true, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: phase, + }, + lastErrorByRun: { + ...state.lastErrorByRun, + [stateKey]: null, + }, workflowState: state.workflowState ? { ...state.workflowState, @@ -608,11 +975,26 @@ export const useWorkflowStore = create((set, get) => ({ ...state.workflowStatesByRun, [getWorkflowStateKey(reverted.taskId, reverted.runId)]: reverted, }, - isStreaming: false, - streamingPhase: null, + isStreamingByRun: { + ...state.isStreamingByRun, + [getWorkflowStateKey(reverted.taskId, reverted.runId)]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [getWorkflowStateKey(reverted.taskId, reverted.runId)]: null, + }, })); } else { - set({ isStreaming: false, streamingPhase: null }); + set((state) => ({ + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: null, + }, + })); } pushClientErrorEvent({ type: DEBUG_EVENT_TYPES.ERROR, message: err?.message || "Resume failed", phase }); } @@ -622,10 +1004,20 @@ export const useWorkflowStore = create((set, get) => ({ const { activeTask, workflowState } = get(); if (!activeTask || !phase) return; pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.PHASE_RETRY_REQUESTED, { phase, trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR })); + const stateKey = getWorkflowStateKey(workflowState?.taskId, workflowState?.runId); set((state) => ({ - isStreaming: true, - streamingPhase: phase, - lastError: null, + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: true, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: phase, + }, + lastErrorByRun: { + ...state.lastErrorByRun, + [stateKey]: null, + }, workflowState: state.workflowState ? { ...state.workflowState, @@ -664,11 +1056,26 @@ export const useWorkflowStore = create((set, get) => ({ ...state.workflowStatesByRun, [getWorkflowStateKey(reverted.taskId, reverted.runId)]: reverted, }, - isStreaming: false, - streamingPhase: null, + isStreamingByRun: { + ...state.isStreamingByRun, + [getWorkflowStateKey(reverted.taskId, reverted.runId)]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [getWorkflowStateKey(reverted.taskId, reverted.runId)]: null, + }, })); } else { - set({ isStreaming: false, streamingPhase: null }); + set((state) => ({ + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: null, + }, + })); } pushClientErrorEvent({ type: DEBUG_EVENT_TYPES.ERROR, message: err?.message || "Retry failed", phase }); } @@ -678,9 +1085,16 @@ export const useWorkflowStore = create((set, get) => ({ const { activeTask, workflowState } = get(); if (!activeTask || !phase) return; pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.PHASE_PAUSE_REQUESTED, { phase, trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR })); + const stateKey = getWorkflowStateKey(workflowState?.taskId, workflowState?.runId); set((state) => ({ - isStreaming: false, - streamingPhase: null, + isStreamingByRun: { + ...state.isStreamingByRun, + [stateKey]: false, + }, + streamingPhaseByRun: { + ...state.streamingPhaseByRun, + [stateKey]: null, + }, workflowState: state.workflowState ? { ...state.workflowState, @@ -736,10 +1150,6 @@ export const useWorkflowStore = create((set, get) => ({ trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR, })); try { - if (unsubscribeWorkflowEvents) { - unsubscribeWorkflowEvents(); - unsubscribeWorkflowEvents = null; - } desktopApi.detachWorkflow(targetTaskId, targetRunId); await desktopApi.removeTask(targetTaskId, targetRunId, options); pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.TASK_DELETED, { @@ -750,19 +1160,30 @@ export const useWorkflowStore = create((set, get) => ({ })); const stateKey = getWorkflowStateKey(targetTaskId, targetRunId); const { [stateKey]: _removed, ...workflowStatesByRun } = get().workflowStatesByRun; + const { [stateKey]: _removedSelectedPhase, ...selectedPhaseByRun } = get().selectedPhaseByRun; + const { [stateKey]: _removedMessages, ...phaseMessagesByRun } = get().phaseMessagesByRun; + const { [stateKey]: _removedArtifacts, ...phaseOutputArtifactsByRun } = get().phaseOutputArtifactsByRun; + const { [stateKey]: _removedInteractions, ...phaseInteractionsByRun } = get().phaseInteractionsByRun; + const { [stateKey]: _removedStreaming, ...isStreamingByRun } = get().isStreamingByRun; + const { [stateKey]: _removedStreamingPhase, ...streamingPhaseByRun } = get().streamingPhaseByRun; + const { [stateKey]: _removedConnection, ...connectionStateByRun } = get().connectionStateByRun; + const { [stateKey]: _removedDebugEvents, ...debugEventsByRun } = get().debugEventsByRun; + const { [stateKey]: _removedLastEventAt, ...lastEventAtByRun } = get().lastEventAtByRun; + const { [stateKey]: _removedLastError, ...lastErrorByRun } = get().lastErrorByRun; set({ activeTask: null, workflowState: null, workflowStatesByRun, - phaseMessages: {}, - phaseOutputArtifacts: {}, - phaseInteractions: {}, - isStreaming: false, - streamingPhase: null, - connectionState: "disconnected", - debugEvents: [], - lastEventAt: null, - lastError: null, + selectedPhaseByRun, + phaseMessagesByRun, + phaseOutputArtifactsByRun, + phaseInteractionsByRun, + isStreamingByRun, + streamingPhaseByRun, + connectionStateByRun, + debugEventsByRun, + lastEventAtByRun, + lastErrorByRun, }); await useConfigStore.getState().loadWorkFolders(); return true; diff --git a/docs/todo/todo-agent-activity-terminal-mode.zh.md b/docs/todo/todo-agent-activity-terminal-mode.zh.md index 6c9319d..454728d 100644 --- a/docs/todo/todo-agent-activity-terminal-mode.zh.md +++ b/docs/todo/todo-agent-activity-terminal-mode.zh.md @@ -118,6 +118,8 @@ StepDetail Terminal Tab - SDK 模式:继续用于结构化 workflow 执行、artifact、state、mobile relay - CLI/PTY 模式:用于高级 terminal 体验和调试 +TODO:评估并实现用 Codex CLI / Claude Code CLI 直接替换当前 `@openai/codex-sdk` / `@anthropic-ai/claude-agent-sdk` adapter。目标是拿到 CLI 原生能力和最新 flags,例如 Codex `--dangerously-bypass-approvals-and-sandbox`、Claude Code `--dangerously-skip-permissions`,同时保持 workflow state、artifact、session resume 和 debug event 的回写能力。 + 注意:PTY 模式不能绕开 workflow state。即使 terminal 显示原生 CLI 输出,最终 step 状态、artifact、session id、错误信息仍然要回写到当前 task run。 ### Resume 策略 diff --git a/package.json b/package.json index fea1be1..d3f5fd3 100644 --- a/package.json +++ b/package.json @@ -11,14 +11,16 @@ "prepare:electron": "tsc -p tsconfig.preload.json", "dev:electron": "pnpm run prepare:electron && wait-on http://127.0.0.1:${RENDERER_PORT:-5173} && ELECTRON_RENDERER_URL=http://127.0.0.1:${RENDERER_PORT:-5173} NODE_OPTIONS='--import tsx' electronmon .", "dev:desktop-ui": "concurrently -k --names \"renderer,electron\" --prefix name --prefix-colors \"blue,magenta\" \"pnpm exec tsx scripts/run-if-port-free.ts ${RENDERER_PORT:-5173} pnpm dev:renderer\" \"pnpm dev:electron\"", - "dev:desktop": "concurrently -k --names \"desktop,server,connector\" --prefix name --prefix-colors \"green,blue,cyan\" \"pnpm run dev:desktop-ui\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm run dev:connector\"", + "dev:desktop": "pnpm exec tsx scripts/dev-desktop-instance.ts", + "dev:desktop:raw": "concurrently -k --names \"desktop,server,connector\" --prefix name --prefix-colors \"green,blue,cyan\" \"pnpm run dev:desktop-ui\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm run dev:connector\"", "dev:backend": "pnpm exec tsx watch apps/backend/src/server.ts", "dev:mobile": "cd apps/mobile && flutter run", "dev:server": "pnpm exec tsx apps/desktop/server.ts", "dev:connector": "pnpm exec tsx apps/desktop/connector/src/index.ts", "dev:desktop-stack": "concurrently -k --names \"server,backend,connector\" --prefix name --prefix-colors \"blue,magenta,cyan\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\" \"pnpm run dev:connector\"", "dev:mobile-stack": "concurrently -k --names \"server,backend,connector\" --prefix name --prefix-colors \"blue,magenta,cyan\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\" \"pnpm run dev:connector\"", - "dev:all": "concurrently -k --names \"desktop,backend\" --prefix name --prefix-colors \"green,magenta\" \"pnpm run dev:desktop\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\"", + "dev:all": "pnpm exec tsx scripts/dev-desktop-instance.ts", + "dev:all:raw": "concurrently -k --names \"desktop,backend\" --prefix name --prefix-colors \"green,magenta\" \"pnpm run dev:desktop:raw\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\"", "dev:desktop:instance": "pnpm exec tsx scripts/dev-desktop-instance.ts", "build:desktop": "pnpm run prepare:electron && pnpm --dir apps/desktop/renderer build", "package:desktop": "pnpm build:desktop && electron-forge package", @@ -36,7 +38,6 @@ "@fastify/websocket": "^11.2.0", "@langchain/langgraph": "^1.3.0", "@openai/codex": "^0.128.0", - "@openai/codex-sdk": "^0.128.0", "cors": "^2.8.5", "express": "^4.21.0", "fastify": "^5.6.1", diff --git a/packages/core-lib/langgraph-runtime/app-adapter.ts b/packages/core-lib/langgraph-runtime/app-adapter.ts index fc1004d..8d66d7c 100644 --- a/packages/core-lib/langgraph-runtime/app-adapter.ts +++ b/packages/core-lib/langgraph-runtime/app-adapter.ts @@ -2,7 +2,7 @@ import OpenAI from "openai"; import { mkdir, readFile, writeFile } from "fs/promises"; import { dirname, relative, resolve } from "path"; import { readAiApiProfileSync, readAiApiProfilesSync } from "../../core-models/config"; -import { appendPhaseInteraction, appendToPhaseFile, readState, taskDir, updatePhaseStatus, writeState } from "../../core-models/state"; +import { appendPhaseInteraction, appendToPhaseFile, readPhaseInteractions, readState, taskDir, updatePhaseStatus, writeState } from "../../core-models/state"; import { createSdkAgentAdapter } from "./sdk-agent-adapter"; import { createContentPreview, createContentSummary, createStepOutputMetadata, formatStepOutputForPrompt } from "./artifacts"; @@ -116,6 +116,44 @@ async function publishCheckpointOutput({ taskId, runId, step, state, rule }) { } export function createAppSdkAgentAdapter({ taskId, runId, send, workFolder, taskRunDir, imagePaths = [], abortController, aiBackendOverride = "" }) { + async function markStepRunning(phase) { + if (!phase) return; + const state = await readState(taskId, runId); + state.currentPhase = phase; + state.currentStep = phase; + state.overallStatus = "in_progress"; + + for (const item of state.phases || []) { + if (item.id === phase) { + updatePhaseStatus(state, item.id, "in_progress", item.sessionId); + } else if (item.status === "in_progress") { + updatePhaseStatus(state, item.id, "completed", item.sessionId); + } + } + + await writeState(taskId, state); + } + + async function appendPhaseStart(phase, backend) { + try { + const existingInteractions = await readPhaseInteractions(taskId, phase, runId); + const startIndexes = existingInteractions + .filter((item) => item?.type === "phase_start") + .map((item) => Number(item.runIndex) || 0); + const lastRunIndex = Math.max(0, ...startIndexes); + const hasEarlierConversation = existingInteractions.some((item) => item?.type !== "phase_start"); + const runIndex = lastRunIndex > 0 ? lastRunIndex + 1 : hasEarlierConversation ? 2 : 1; + const interaction = await appendPhaseInteraction(taskId, phase, { + role: "system", + type: "phase_start", + text: "", + backend, + runIndex, + }, runId); + if (interaction) send({ type: "phase_interaction", phase, interaction }); + } catch {} + } + const adapter = createSdkAgentAdapter({ workFolder, taskDir: taskRunDir, @@ -166,6 +204,8 @@ export function createAppSdkAgentAdapter({ taskId, runId, send, workFolder, task if (!profile.model) throw new Error(`AI API profile ${profile.name} is missing a model`); const backend = `ai-api:${profile.name || profile.id}`; + await markStepRunning(step.id); + await appendPhaseStart(step.id, backend); send({ type: "backend_selected", phase: step.id, backend, mode: "workflow" }); const inputSections = []; for (const input of step.inputs || []) { @@ -277,6 +317,8 @@ export function createAppSdkAgentAdapter({ taskId, runId, send, workFolder, task const useBackendOverride = Boolean(aiBackendOverride && !hasStepBackend && agent.backend !== "ai_api"); const runtimeAgent = useBackendOverride ? { ...agent, backend: aiBackendOverride, model: "" } : agent; if (runtimeAgent.backend === "ai_api") return runAiApiStep({ step, state }); + await markStepRunning(step.id); + await appendPhaseStart(step.id, runtimeAgent.backend); send({ type: "backend_selected", phase: step.id, backend: runtimeAgent.backend, mode: useBackendOverride ? "app" : "workflow" }); const result = await adapter.runAgent({ step, diff --git a/packages/core-lib/langgraph-runtime/file-checkpointer.ts b/packages/core-lib/langgraph-runtime/file-checkpointer.ts index 446303a..1812bb9 100644 --- a/packages/core-lib/langgraph-runtime/file-checkpointer.ts +++ b/packages/core-lib/langgraph-runtime/file-checkpointer.ts @@ -2,6 +2,8 @@ import { readFile, writeFile, mkdir } from "fs/promises"; import { dirname, join } from "path"; import { BaseCheckpointSaver, WRITES_IDX_MAP, TASKS, copyCheckpoint, getCheckpointId, maxChannelVersion } from "@langchain/langgraph-checkpoint"; +const SERIALIZED_UINT8_ARRAY = "__serialized_uint8_array__"; + function generateKey(threadId, checkpointNamespace, checkpointId) { return JSON.stringify([threadId, checkpointNamespace || "", checkpointId]); } @@ -24,6 +26,33 @@ async function writeJson(file, value) { await writeFile(file, JSON.stringify(value, null, 2)); } +function encodeSerializedValue(value) { + if (value instanceof Uint8Array) { + return { + type: SERIALIZED_UINT8_ARRAY, + data: Buffer.from(value).toString("base64"), + }; + } + return value; +} + +function decodeSerializedValue(value) { + if (value?.type === SERIALIZED_UINT8_ARRAY && typeof value.data === "string") { + return Uint8Array.from(Buffer.from(value.data, "base64")); + } + + if (value && typeof value === "object" && !Array.isArray(value)) { + const keys = Object.keys(value); + if (keys.length && keys.every((key) => /^\d+$/.test(key))) { + const sortedKeys = keys.map(Number).sort((a, b) => a - b); + const isContiguous = sortedKeys.every((key, index) => key === index); + if (isContiguous) return Uint8Array.from(sortedKeys.map((key) => value[key])); + } + } + + return value; +} + export class FileCheckpointSaver extends BaseCheckpointSaver { constructor(rootDir, serde) { super(serde); @@ -50,7 +79,7 @@ export class FileCheckpointSaver extends BaseCheckpointSaver { const parentKey = generateKey(threadId, checkpointNs, parentCheckpointId); const pendingSends = await Promise.all(Object.values(this.writes[parentKey] ?? {}) .filter(([_taskId, channel]) => channel === TASKS) - .map(async ([_taskId, _channel, writes]) => await this.serde.loadsTyped("json", writes))); + .map(async ([_taskId, _channel, writes]) => await this.serde.loadsTyped("json", decodeSerializedValue(writes)))); mutableCheckpoint.channel_values ??= {}; mutableCheckpoint.channel_values[TASKS] = pendingSends; mutableCheckpoint.channel_versions ??= {}; @@ -62,19 +91,19 @@ export class FileCheckpointSaver extends BaseCheckpointSaver { async createTuple(threadId, checkpointNamespace, checkpointId, saved, config) { const [checkpoint, metadata, parentCheckpointId] = saved; const key = generateKey(threadId, checkpointNamespace, checkpointId); - const deserializedCheckpoint = await this.serde.loadsTyped("json", checkpoint); + const deserializedCheckpoint = await this.serde.loadsTyped("json", decodeSerializedValue(checkpoint)); if (deserializedCheckpoint.v < 4 && parentCheckpointId !== undefined) { await this.migratePendingSends(deserializedCheckpoint, threadId, checkpointNamespace, parentCheckpointId); } const pendingWrites = await Promise.all(Object.values(this.writes[key] || {}).map(async ([taskId, channel, value]) => [ taskId, channel, - await this.serde.loadsTyped("json", value), + await this.serde.loadsTyped("json", decodeSerializedValue(value)), ])); const checkpointTuple = { config, checkpoint: deserializedCheckpoint, - metadata: await this.serde.loadsTyped("json", metadata), + metadata: await this.serde.loadsTyped("json", decodeSerializedValue(metadata)), pendingWrites, }; if (parentCheckpointId !== undefined) { @@ -130,7 +159,7 @@ export class FileCheckpointSaver extends BaseCheckpointSaver { for (const [checkpointId, saved] of sortedCheckpoints) { if (configCheckpointId && checkpointId !== configCheckpointId) continue; if (before?.configurable?.checkpoint_id && checkpointId >= before.configurable.checkpoint_id) continue; - const metadata = await this.serde.loadsTyped("json", saved[1]); + const metadata = await this.serde.loadsTyped("json", decodeSerializedValue(saved[1])); if (filter && !Object.entries(filter).every(([key, value]) => metadata[key] === value)) continue; if (limit !== undefined) { if (limit <= 0) break; @@ -161,8 +190,8 @@ export class FileCheckpointSaver extends BaseCheckpointSaver { this.serde.dumpsTyped(metadata), ]); this.storage[threadId][checkpointNamespace][checkpoint.id] = [ - serializedCheckpoint, - serializedMetadata, + encodeSerializedValue(serializedCheckpoint), + encodeSerializedValue(serializedMetadata), config.configurable?.checkpoint_id, ]; await this.persist(); @@ -190,7 +219,7 @@ export class FileCheckpointSaver extends BaseCheckpointSaver { const innerKey = [taskId, WRITES_IDX_MAP[channel] || index]; const innerKeyStr = `${innerKey[0]},${innerKey[1]}`; if (innerKey[1] >= 0 && existingWrites && innerKeyStr in existingWrites) return; - this.writes[outerKey][innerKeyStr] = [taskId, channel, serializedValue]; + this.writes[outerKey][innerKeyStr] = [taskId, channel, encodeSerializedValue(serializedValue)]; })); await this.persist(); } diff --git a/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts b/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts index 8d84f11..d5ac6b8 100644 --- a/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts +++ b/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts @@ -1,13 +1,19 @@ import { query } from "@anthropic-ai/claude-agent-sdk"; -import { Codex } from "@openai/codex-sdk"; +import { execFile, spawn } from "child_process"; +import { createRequire } from "module"; +import readline from "readline"; import { mkdir, readFile, writeFile } from "fs/promises"; -import { dirname, extname, join, relative, resolve } from "path"; +import { basename, dirname, extname, join, relative, resolve } from "path"; import { createContentPreview, createContentSummary, formatStepOutputForPrompt } from "./artifacts"; const SDK_BACKENDS = { CLAUDE: "claude", CODEX: "codex", }; +const require = createRequire(import.meta.url); +const SHELL_ENV_MARKER = "__DEV_WORKFLOW_SHELL_ENV__"; +const SHELL_ENV_TIMEOUT_MS = 5000; +const shellEnvCache = new Map(); function isPathInside(parent, child) { const rel = relative(resolve(parent), resolve(child)); @@ -93,6 +99,18 @@ function buildPrompt({ step, state }) { return parts.filter(Boolean).join("\n\n"); } +function buildWorkspaceAccessPrompt(workspaceWrite, workFolder, taskDir) { + if (workspaceWrite) return ""; + return [ + "# Workspace access", + "", + "The project workspace is read-only for this step.", + `You may read files in: ${workFolder || ""}`, + `You may write only task artifact files under: ${taskDir || ""}`, + "Do not create, edit, move, or delete files in the project workspace.", + ].join("\n"); +} + function guessImageMediaType(filePath) { const ext = extname(filePath).toLowerCase(); if (ext === ".jpg" || ext === ".jpeg") return "image/jpeg"; @@ -131,22 +149,70 @@ async function buildClaudePrompt(prompt, imagePaths) { })(); } -function buildCodexInput(prompt, imagePaths) { - if (!imagePaths || imagePaths.length === 0) return prompt; - const input = []; - if (prompt) input.push({ type: "text", text: prompt }); - for (const imagePath of imagePaths) input.push({ type: "local_image", path: imagePath }); - return input; +function getShellArgs(shell, command) { + const name = basename(shell || ""); + if (name.includes("bash") || name.includes("zsh")) return ["-lc", command]; + if (name.includes("fish")) return ["-lc", command]; + return ["-lc", command]; } -function buildCodexEnv() { +function parseShellEnv(raw) { + const entries = String(raw || "").split("\0"); + const markerIndex = entries.indexOf(SHELL_ENV_MARKER); + if (markerIndex < 0) return {}; + const envEntries = entries.slice(markerIndex + 1); + const env = {}; + for (const entry of envEntries) { + const index = entry.indexOf("="); + if (index <= 0) continue; + env[entry.slice(0, index)] = entry.slice(index + 1); + } + return env; +} + +function readUserShellEnv(workFolder) { + const shell = process.env.SHELL || "/bin/sh"; + const cwd = workFolder || process.cwd(); + const cacheKey = `${shell}:${cwd}`; + if (shellEnvCache.has(cacheKey)) return shellEnvCache.get(cacheKey); + + const env = { ...process.env }; + delete env.NODE_OPTIONS; + const command = `printf '${SHELL_ENV_MARKER}\\0'; env -0`; + const promise = new Promise((resolve) => { + execFile(shell, getShellArgs(shell, command), { + cwd, + env, + maxBuffer: 1024 * 1024, + timeout: SHELL_ENV_TIMEOUT_MS, + }, (error, stdout) => { + if (error) { + resolve({}); + return; + } + resolve(parseShellEnv(stdout)); + }); + }); + shellEnvCache.set(cacheKey, promise); + return promise; +} + +async function buildCodexEnv(workFolder) { const env = {}; for (const [key, value] of Object.entries(process.env)) { + if (value !== undefined && key !== "CODEX_API_KEY" && key !== "NODE_OPTIONS") env[key] = value; + } + const shellEnv = await readUserShellEnv(workFolder); + for (const [key, value] of Object.entries(shellEnv)) { if (value !== undefined && key !== "CODEX_API_KEY") env[key] = value; } return env; } +function getCodexCliPath() { + return require.resolve("@openai/codex/bin/codex.js"); +} + function getToolFilePath(input) { return input?.file_path || input?.path || input?.notebook_path || ""; } @@ -268,71 +334,98 @@ async function streamClaudeSdk({ prompt, workFolder, taskDir, sessionId, imagePa return { sessionId: nextSessionId || sessionId || "", fallbackResult }; } -async function streamCodexSdk({ prompt, workFolder, sessionId, imagePaths, abortController, workspaceWrite, agent, onText, onTool, onSession }) { - const codex = new Codex({ - env: buildCodexEnv(), - ...(agent.options?.client || {}), +async function streamCodexCli({ prompt, workFolder, taskDir, sessionId, imagePaths, abortController, workspaceWrite, agent, onText, onTool, onSession }) { + const workingDirectory = workspaceWrite ? workFolder : taskDir; + const args = [ + "exec", + "--json", + "--sandbox", + workspaceWrite ? "danger-full-access" : "workspace-write", + "--cd", + workingDirectory, + "--skip-git-repo-check", + "--config", + "approval_policy=\"never\"", + "--config", + "sandbox_workspace_write.network_access=true", + ]; + + const modelReasoningEffort = agent.options?.thread?.modelReasoningEffort || ""; + if (agent.model) args.push("--model", agent.model); + if (modelReasoningEffort) args.push("--config", `model_reasoning_effort="${modelReasoningEffort}"`); + for (const imagePath of imagePaths || []) args.push("--image", imagePath); + if (sessionId) args.push("resume", sessionId); + + const child = spawn(getCodexCliPath(), args, { + env: await buildCodexEnv(workFolder), + signal: abortController.signal, }); - const threadOptions = { - workingDirectory: workFolder, - sandboxMode: workspaceWrite ? "danger-full-access" : "read-only", - skipGitRepoCheck: true, - approvalPolicy: "never", - networkAccessEnabled: true, - model: agent.model || undefined, - ...(agent.options?.thread || {}), - }; - const thread = sessionId - ? codex.resumeThread(sessionId, threadOptions) - : codex.startThread(threadOptions); - const input = buildCodexInput(prompt, imagePaths); - const { events } = await thread.runStreamed(input, { signal: abortController.signal }); + let spawnError = null; + const stderrChunks = []; + child.once("error", (error) => { spawnError = error; }); + child.stderr?.on("data", (chunk) => stderrChunks.push(chunk)); + if (!child.stdin) throw new Error("Codex CLI has no stdin"); + if (!child.stdout) throw new Error("Codex CLI has no stdout"); + + child.stdin.write(prompt); + child.stdin.end(); + + const exitPromise = new Promise((resolve) => { + child.once("exit", (code, signal) => resolve({ code, signal })); + }); + const events = readline.createInterface({ input: child.stdout, crlfDelay: Infinity }); const seenItemText = new Map(); const seenToolItems = new Set(); - let completed = false; - for await (const event of events) { - if (event.type === "turn.completed") { - completed = true; - break; - } + try { + for await (const line of events) { + if (!String(line || "").trim()) continue; + const event = JSON.parse(line); + if (event.type === "turn.completed") break; - if (event.type === "thread.started") { - await onSession(event.thread_id); - continue; - } + if (event.type === "thread.started") { + await onSession(event.thread_id); + continue; + } - if (event.type === "item.started" || event.type === "item.updated" || event.type === "item.completed") { - const item = event.item; - if (item.type === "agent_message") { - const previous = seenItemText.get(item.id) || ""; - const next = item.text || ""; - if (next.startsWith(previous)) { - const delta = next.slice(previous.length); - if (delta) await onText(delta); - } else if (next && next !== previous) { - await onText(next); + if (event.type === "item.started" || event.type === "item.updated" || event.type === "item.completed") { + const item = event.item; + if (item.type === "agent_message") { + const previous = seenItemText.get(item.id) || ""; + const next = item.text || ""; + if (next.startsWith(previous)) { + const delta = next.slice(previous.length); + if (delta) await onText(delta); + } else if (next && next !== previous) { + await onText(next); + } + seenItemText.set(item.id, next); + continue; + } + + if ( + (item.type === "command_execution" || item.type === "mcp_tool_call" || item.type === "web_search" || item.type === "file_change" || item.type === "todo_list") + && !seenToolItems.has(item.id) + ) { + seenToolItems.add(item.id); + await onTool(formatCodexToolLog(item)); } - seenItemText.set(item.id, next); continue; } - if ( - (item.type === "command_execution" || item.type === "mcp_tool_call" || item.type === "web_search" || item.type === "file_change" || item.type === "todo_list") - && !seenToolItems.has(item.id) - ) { - seenToolItems.add(item.id); - await onTool(formatCodexToolLog(item)); - } - continue; + if (event.type === "turn.failed") throw new Error(event.error?.message || "Codex CLI run failed"); + if (event.type === "error") throw new Error(event.message || "Codex CLI run failed"); } - if (event.type === "turn.failed") throw new Error(event.error?.message || "Codex run failed"); - if (event.type === "error") throw new Error(event.message || "Codex run failed"); + if (spawnError) throw spawnError; + const { code, signal } = await exitPromise; + if (code !== 0 || signal) { + const detail = signal ? `signal ${signal}` : `code ${code ?? 1}`; + throw new Error(`Codex CLI exited with ${detail}: ${Buffer.concat(stderrChunks).toString("utf8")}`); + } + } finally { + events.close(); } - - if (thread.id) await onSession(thread.id); - return { sessionId: thread.id || sessionId || "", completed }; } function resolveOutputPath(taskDir, step, state) { @@ -375,7 +468,10 @@ export function createSdkAgentAdapter(options = {}) { ]; const abortController = options.abortController || new AbortController(); const workspaceWrite = canWriteWorkspace(step, agent); - const prompt = buildPrompt({ step, state }); + const prompt = [ + buildWorkspaceAccessPrompt(workspaceWrite, workFolder, taskDir), + buildPrompt({ step, state }), + ].filter(Boolean).join("\n\n"); let content = ""; let nextSessionId = sessionId || ""; @@ -409,9 +505,10 @@ export function createSdkAgentAdapter(options = {}) { nextSessionId = result.sessionId || nextSessionId; if (!content && result.fallbackResult) content = result.fallbackResult; } else if (backend === SDK_BACKENDS.CODEX) { - const result = await streamCodexSdk({ + await streamCodexCli({ prompt, workFolder, + taskDir, sessionId, imagePaths, abortController, @@ -419,7 +516,6 @@ export function createSdkAgentAdapter(options = {}) { agent, ...callbacks, }); - nextSessionId = result.sessionId || nextSessionId; } else { throw new Error(`unsupported SDK agent backend ${backend}`); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b84fddf..768cbe6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -23,9 +23,6 @@ importers: '@openai/codex': specifier: ^0.128.0 version: 0.128.0 - '@openai/codex-sdk': - specifier: ^0.128.0 - version: 0.128.0 cors: specifier: ^2.8.5 version: 2.8.6 @@ -1087,10 +1084,6 @@ packages: '@octokit/types@6.41.0': resolution: {integrity: sha512-eJ2jbzjdijiL3B4PrSQaSjuF2sPEQPVCPzBvTHJD9Nz+9dw2SGH4K4xeQJ77YfTq5bRQ+bD8wT11JbeDPmxmGg==} - '@openai/codex-sdk@0.128.0': - resolution: {integrity: sha512-Eao0LLA5x90qwU6SXYd21h4KxdCef1WpCvHFgKdbqzWMJ79lUvguGDGvx1RheP+zTdKGxJfJ6dulI5wSXoUBhQ==} - engines: {node: '>=18'} - '@openai/codex@0.128.0': resolution: {integrity: sha512-+xp6ODmFfBNnexIWRHApEaPXot2j6gyM8A5we/5IS/uY4eYHj4arETct4hQ5M4eO+MK7JY3ZU4xhuobhlysr0A==} engines: {node: '>=16'} @@ -5859,10 +5852,6 @@ snapshots: dependencies: '@octokit/openapi-types': 12.11.0 - '@openai/codex-sdk@0.128.0': - dependencies: - '@openai/codex': 0.128.0 - '@openai/codex@0.128.0': optionalDependencies: '@openai/codex-darwin-arm64': '@openai/codex@0.128.0-darwin-arm64' diff --git a/scripts/dev-desktop-instance.ts b/scripts/dev-desktop-instance.ts index 8359cec..e9bc177 100644 --- a/scripts/dev-desktop-instance.ts +++ b/scripts/dev-desktop-instance.ts @@ -1,18 +1,22 @@ import { spawn } from "child_process"; -import { join } from "path"; +import net from "net"; +import { basename, join } from "path"; import { mkdirSync } from "fs"; type Options = { name: string; + customName: boolean; portOffset: number; + autoPortOffset: boolean; }; const BASE_RENDERER_PORT = 5173; const BASE_LOCAL_SERVER_PORT = 3000; const BASE_BACKEND_PORT = 8787; +const MAX_PORT_OFFSET = 200; function printUsage() { - console.log("usage: pnpm dev:desktop:instance --name [--port-offset ]"); + console.log("usage: pnpm dev:desktop:instance [--name ] [--port-offset ]"); } function normalizeName(value: string) { @@ -25,7 +29,9 @@ function normalizeName(value: string) { function parseArgs(argv: string[]): Options { let name = ""; - let portOffset = Number(process.env.PORT_OFFSET || "0"); + let customName = false; + let portOffset = process.env.PORT_OFFSET ? Number(process.env.PORT_OFFSET) : 0; + let autoPortOffset = !process.env.PORT_OFFSET; for (let index = 0; index < argv.length; index++) { const arg = argv[index]; @@ -35,42 +41,91 @@ function parseArgs(argv: string[]): Options { } if (arg === "--name") { name = argv[++index] || ""; + customName = true; continue; } if (arg.startsWith("--name=")) { name = arg.slice("--name=".length); + customName = true; continue; } if (arg === "--port-offset") { portOffset = Number(argv[++index] || ""); + autoPortOffset = false; continue; } if (arg.startsWith("--port-offset=")) { portOffset = Number(arg.slice("--port-offset=".length)); + autoPortOffset = false; continue; } throw new Error(`unknown argument: ${arg}`); } - const normalizedName = normalizeName(name); + const normalizedName = normalizeName(name || basename(process.cwd()) || "desktop"); if (!normalizedName) { - throw new Error("--name is required"); + throw new Error("instance name is required"); } if (!Number.isInteger(portOffset) || portOffset < 0) { throw new Error("--port-offset must be a non-negative integer"); } - return { name: normalizedName, portOffset }; + return { name: normalizedName, customName, portOffset, autoPortOffset }; +} + +function isPortOpen(port: number, host = "127.0.0.1") { + return new Promise((resolve) => { + const socket = net.createConnection({ port, host }); + socket.once("connect", () => { + socket.destroy(); + resolve(true); + }); + socket.once("error", () => { + resolve(false); + }); + }); +} + +function getPorts(portOffset: number) { + return { + rendererPort: BASE_RENDERER_PORT + portOffset, + localServerPort: BASE_LOCAL_SERVER_PORT + portOffset, + backendPort: BASE_BACKEND_PORT + portOffset, + }; +} + +async function isPortOffsetAvailable(portOffset: number) { + const ports = getPorts(portOffset); + const occupied = await Promise.all([ + isPortOpen(ports.rendererPort), + isPortOpen(ports.localServerPort), + isPortOpen(ports.backendPort), + ]); + return occupied.every((value) => !value); +} + +async function findAvailablePortOffset(startOffset: number) { + for (let offset = startOffset; offset <= MAX_PORT_OFFSET; offset += 1) { + if (await isPortOffsetAvailable(offset)) return offset; + } + throw new Error(`no available port offset found between ${startOffset} and ${MAX_PORT_OFFSET}`); } async function main() { - const { name, portOffset } = parseArgs(process.argv.slice(2)); - const rendererPort = BASE_RENDERER_PORT + portOffset; - const localServerPort = BASE_LOCAL_SERVER_PORT + portOffset; - const backendPort = BASE_BACKEND_PORT + portOffset; - const userDataDir = join(process.cwd(), ".dev-instances", name); + const options = parseArgs(process.argv.slice(2)); + const portOffset = options.autoPortOffset + ? await findAvailablePortOffset(options.portOffset) + : options.portOffset; + if (!options.autoPortOffset && !(await isPortOffsetAvailable(portOffset))) { + throw new Error(`port offset ${portOffset} is already in use`); + } + const { rendererPort, localServerPort, backendPort } = getPorts(portOffset); + const name = options.name; + const userDataDir = options.customName + ? join(process.cwd(), ".dev-instances", name) + : process.env.DEV_WORKFLOW_USER_DATA_DIR || ""; - mkdirSync(userDataDir, { recursive: true }); + if (userDataDir) mkdirSync(userDataDir, { recursive: true }); const env = { ...process.env, @@ -85,16 +140,17 @@ async function main() { DEVICE_ID: `desktop-${name}`, DEVICE_NAME: `Desktop ${name}`, DEV_WORKFLOW_INSTANCE_NAME: name, - DEV_WORKFLOW_USER_DATA_DIR: userDataDir, }; + if (userDataDir) env.DEV_WORKFLOW_USER_DATA_DIR = userDataDir; console.log(`[dev-instance] name: ${name}`); + console.log(`[dev-instance] port offset: ${portOffset}${options.autoPortOffset ? " (auto)" : ""}`); console.log(`[dev-instance] renderer: http://127.0.0.1:${rendererPort}`); console.log(`[dev-instance] local server: http://127.0.0.1:${localServerPort}`); console.log(`[dev-instance] backend: http://127.0.0.1:${backendPort}`); - console.log(`[dev-instance] user data: ${userDataDir}`); + console.log(`[dev-instance] user data: ${userDataDir || "default worktree-scoped directory"}`); - const child = spawn("pnpm", ["run", "dev:all"], { + const child = spawn("pnpm", ["run", "dev:all:raw"], { env, stdio: "inherit", shell: false, From e81645b892e5cf89dc5a16d20b0b7e9629acbef1 Mon Sep 17 00:00:00 2001 From: lei Date: Mon, 18 May 2026 23:58:18 +0800 Subject: [PATCH 2/2] fix: reset workflow phases when rerunning earlier step --- .../src/lib/sdk-agent-adapter.test.ts | 8 +++++ .../renderer/src/stores/workflowStore.test.ts | 18 ++++++++++- .../renderer/src/stores/workflowStore.ts | 31 +++++++++++++------ .../core-lib/langgraph-runtime/app-adapter.ts | 30 +++++++++++++----- 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts b/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts index 62b1a4a..d080ebd 100644 --- a/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts +++ b/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts @@ -6,6 +6,7 @@ import { PassThrough } from "stream"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; const mocks = vi.hoisted(() => ({ + execFile: vi.fn(), query: vi.fn(), spawn: vi.fn(), })); @@ -15,8 +16,10 @@ vi.mock("@anthropic-ai/claude-agent-sdk", () => ({ })); vi.mock("child_process", () => ({ + execFile: mocks.execFile, spawn: mocks.spawn, default: { + execFile: mocks.execFile, spawn: mocks.spawn, }, })); @@ -62,6 +65,8 @@ function buildRun(overrides = {}) { describe("createSdkAgentAdapter output artifacts", () => { beforeEach(async () => { taskDir = await mkdir(join(tmpdir(), `sdk-agent-adapter-${Date.now()}-`), { recursive: true }); + mocks.execFile.mockReset(); + mocks.execFile.mockImplementation((_file, _args, _options, callback) => callback(null, "__DEV_WORKFLOW_SHELL_ENV__\0PATH=/usr/bin\0")); mocks.query.mockReset(); mocks.spawn.mockReset(); }); @@ -123,6 +128,9 @@ describe("createSdkAgentAdapter output artifacts", () => { }, })); + await vi.waitFor(() => { + expect(mocks.spawn).toHaveBeenCalled(); + }); child.stdout.write(`${JSON.stringify({ type: "thread.started", thread_id: "codex-thread" })}\n`); child.stdout.write(`${JSON.stringify({ type: "item.completed", item: { id: "msg-1", type: "agent_message", text: "Codex response" } })}\n`); child.stdout.write(`${JSON.stringify({ type: "turn.completed", usage: null })}\n`); diff --git a/apps/desktop/renderer/src/stores/workflowStore.test.ts b/apps/desktop/renderer/src/stores/workflowStore.test.ts index d3343c3..0dc4793 100644 --- a/apps/desktop/renderer/src/stores/workflowStore.test.ts +++ b/apps/desktop/renderer/src/stores/workflowStore.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from "vitest"; -import { getSelectedPhaseFromWorkflowState } from "./workflowStore"; +import { getPhasesWithRunningPhase, getSelectedPhaseFromWorkflowState } from "./workflowStore"; describe("getSelectedPhaseFromWorkflowState", () => { test("prefers the active phase over a stale currentPhase", () => { @@ -31,4 +31,20 @@ describe("getSelectedPhaseFromWorkflowState", () => { ], })).toBe("code_review"); }); + + test("resets later phases when an earlier phase starts running again", () => { + expect(getPhasesWithRunningPhase([ + { id: "implement", status: "completed" }, + { id: "code_review", status: "completed" }, + { id: "risk_gate", status: "completed" }, + { id: "review", status: "awaiting_input" }, + { id: "commit", status: "pending" }, + ], "implement", "now")).toEqual([ + { id: "implement", status: "in_progress", updated: "now" }, + { id: "code_review", status: "pending", updated: "now" }, + { id: "risk_gate", status: "pending", updated: "now" }, + { id: "review", status: "pending", updated: "now" }, + { id: "commit", status: "pending", updated: undefined }, + ]); + }); }); diff --git a/apps/desktop/renderer/src/stores/workflowStore.ts b/apps/desktop/renderer/src/stores/workflowStore.ts index 2450011..e64c168 100644 --- a/apps/desktop/renderer/src/stores/workflowStore.ts +++ b/apps/desktop/renderer/src/stores/workflowStore.ts @@ -24,6 +24,7 @@ function playNotificationSound() { let prevStatusRef = {}; let unsubscribeWorkflowEvents = null; +const ACTIVE_PHASE_STATUSES = new Set(["in_progress", "awaiting_input", "paused"]); export function getWorkflowStateKey(taskId, runId = "") { const id = taskId || ""; @@ -68,6 +69,26 @@ function getRunningPhaseFromWorkflowState(state) { return runningPhase?.id || runningPhase?.name || null; } +export function getPhasesWithRunningPhase(phases = [], phaseId, now = new Date().toISOString()) { + const phaseIndex = phases.findIndex((phase) => (phase.id || phase.name) === phaseId); + return phases.map((phase, index) => { + const id = phase.id || phase.name; + let status = phase.status; + if (id === phaseId) { + status = "in_progress"; + } else if (phaseIndex >= 0 && index > phaseIndex) { + status = "pending"; + } else if (ACTIVE_PHASE_STATUSES.has(phase.status)) { + status = "completed"; + } + return { + ...phase, + status, + updated: id === phaseId || status !== phase.status ? now : phase.updated, + }; + }); +} + function getWorkflowStateFromTask(task) { const taskId = task.taskId || ""; if (!taskId) return null; @@ -254,15 +275,7 @@ function markPhaseRunning(set, msg, phaseId) { currentPhase: phaseId, overallStatus: "in_progress", updated: now, - phases: (existingState.phases || []).map((phase) => ({ - ...phase, - status: phase.id === phaseId - ? "in_progress" - : phase.status === "in_progress" - ? "completed" - : phase.status, - updated: phase.id === phaseId || phase.status === "in_progress" ? now : phase.updated, - })), + phases: getPhasesWithRunningPhase(existingState.phases || [], phaseId, now), }; const activeStateKey = getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId); const isActiveState = activeStateKey === stateKey; diff --git a/packages/core-lib/langgraph-runtime/app-adapter.ts b/packages/core-lib/langgraph-runtime/app-adapter.ts index 8d66d7c..440df85 100644 --- a/packages/core-lib/langgraph-runtime/app-adapter.ts +++ b/packages/core-lib/langgraph-runtime/app-adapter.ts @@ -72,6 +72,28 @@ function renderAiApiPrompt(step, state, inputSections = []) { return parts.filter(Boolean).join("\n\n"); } +function syncStepsFromPhases(state) { + for (const step of state.steps || []) { + const phase = state.phases.find((item) => item.id === step.id); + if (phase) Object.assign(step, phase); + } +} + +function markRunningStepInState(state, phase) { + const phaseIndex = (state.phases || []).findIndex((item) => item.id === phase); + for (let i = 0; i < (state.phases || []).length; i += 1) { + const item = state.phases[i]; + if (item.id === phase) { + updatePhaseStatus(state, item.id, "in_progress", item.sessionId); + } else if (phaseIndex >= 0 && i > phaseIndex) { + updatePhaseStatus(state, item.id, "pending", item.sessionId); + } else if (item.status === "in_progress" || item.status === "awaiting_input" || item.status === "paused") { + updatePhaseStatus(state, item.id, "completed", item.sessionId); + } + } + syncStepsFromPhases(state); +} + async function readInputContent(input, state) { if (!input) return ""; if (input.sourceType === "task_input") return state.taskInputs?.[input.name] || ""; @@ -123,13 +145,7 @@ export function createAppSdkAgentAdapter({ taskId, runId, send, workFolder, task state.currentStep = phase; state.overallStatus = "in_progress"; - for (const item of state.phases || []) { - if (item.id === phase) { - updatePhaseStatus(state, item.id, "in_progress", item.sessionId); - } else if (item.status === "in_progress") { - updatePhaseStatus(state, item.id, "completed", item.sessionId); - } - } + markRunningStepInState(state, phase); await writeState(taskId, state); }