@@ -230,7 +270,7 @@ export default function StepDetail({ phase, content, artifact, interactions = []
const activeTask = useWorkflowStore((s) => s.activeTask);
const workflowState = useWorkflowStore((s) => s.workflowState);
- const conversation = normalizeConversation(interactions);
+ const conversation = getConversationWithRunMarkers(normalizeConversation(interactions));
const isCheckpoint = phaseTypes?.[phase] === "checkpoint";
const currentBackend = isCheckpoint ? t("stepDetail.checkpoint") : getBackendLabel(getCurrentBackend(activeBackend, interactions));
const showLoading = !isCheckpoint && isWaitingForAssistant(conversation, isRunning, isStreaming);
@@ -458,7 +498,11 @@ export default function StepDetail({ phase, content, artifact, interactions = []
{conversation.length > 0 ? (
conversation.map((item, idx) => (
-
+ item.type === "phase_start" ? (
+
+ ) : (
+
+ )
))
) : !showLoading ? (
diff --git a/apps/desktop/renderer/src/lib/i18n.ts b/apps/desktop/renderer/src/lib/i18n.ts
index c6a2cca..67bdcea 100644
--- a/apps/desktop/renderer/src/lib/i18n.ts
+++ b/apps/desktop/renderer/src/lib/i18n.ts
@@ -193,6 +193,7 @@ export const messages = {
"stepDetail.user": "You",
"stepDetail.assistant": "AI",
"stepDetail.tool": "Tool",
+ "stepDetail.runDivider": "Run {count}",
"stepDetail.toolsCount": "Tools ({count})",
"stepDetail.attach": "Attach image",
"stepDetail.attachments": "{count} attachment(s)",
@@ -549,6 +550,7 @@ export const messages = {
"stepDetail.user": "你",
"stepDetail.assistant": "AI",
"stepDetail.tool": "工具",
+ "stepDetail.runDivider": "第 {count} 轮",
"stepDetail.toolsCount": "工具({count})",
"stepDetail.attach": "添加图片",
"stepDetail.attachments": "{count} 个附件",
diff --git a/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts b/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts
index 3b28051..d080ebd 100644
--- a/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts
+++ b/apps/desktop/renderer/src/lib/sdk-agent-adapter.test.ts
@@ -1,18 +1,27 @@
import { mkdir, readFile, rm, writeFile } from "fs/promises";
import { tmpdir } from "os";
import { join } from "path";
+import { EventEmitter } from "events";
+import { PassThrough } from "stream";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const mocks = vi.hoisted(() => ({
+ execFile: vi.fn(),
query: vi.fn(),
+ spawn: vi.fn(),
}));
vi.mock("@anthropic-ai/claude-agent-sdk", () => ({
query: mocks.query,
}));
-vi.mock("@openai/codex-sdk", () => ({
- Codex: vi.fn(),
+vi.mock("child_process", () => ({
+ execFile: mocks.execFile,
+ spawn: mocks.spawn,
+ default: {
+ execFile: mocks.execFile,
+ spawn: mocks.spawn,
+ },
}));
import { createSdkAgentAdapter } from "../../../../../packages/core-lib/langgraph-runtime";
@@ -56,7 +65,10 @@ function buildRun(overrides = {}) {
describe("createSdkAgentAdapter output artifacts", () => {
beforeEach(async () => {
taskDir = await mkdir(join(tmpdir(), `sdk-agent-adapter-${Date.now()}-`), { recursive: true });
+ mocks.execFile.mockReset();
+ mocks.execFile.mockImplementation((_file, _args, _options, callback) => callback(null, "__DEV_WORKFLOW_SHELL_ENV__\0PATH=/usr/bin\0"));
mocks.query.mockReset();
+ mocks.spawn.mockReset();
});
afterEach(async () => {
@@ -91,4 +103,56 @@ describe("createSdkAgentAdapter output artifacts", () => {
summary: "Written by the agent tool",
});
});
+
+ test("runs read-only Codex steps with a writable task directory", async () => {
+ const child = new EventEmitter();
+ child.stdin = new PassThrough();
+ child.stdout = new PassThrough();
+ child.stderr = new PassThrough();
+ mocks.spawn.mockReturnValue(child);
+ const adapter = createSdkAgentAdapter({ workFolder: "/project", taskDir });
+
+ const runPromise = adapter.runAgent(buildRun({
+ agent: {
+ backend: "codex",
+ workspaceAccess: "read",
+ options: { thread: { modelReasoningEffort: "low" } },
+ },
+ state: {
+ taskId: "task-1",
+ runId: "run-1",
+ workFolder: "/project",
+ taskDir,
+ taskInputs: {},
+ stepOutputs: {},
+ },
+ }));
+
+ await vi.waitFor(() => {
+ expect(mocks.spawn).toHaveBeenCalled();
+ });
+ child.stdout.write(`${JSON.stringify({ type: "thread.started", thread_id: "codex-thread" })}\n`);
+ child.stdout.write(`${JSON.stringify({ type: "item.completed", item: { id: "msg-1", type: "agent_message", text: "Codex response" } })}\n`);
+ child.stdout.write(`${JSON.stringify({ type: "turn.completed", usage: null })}\n`);
+ child.stdout.end();
+ child.emit("exit", 0, null);
+
+ await runPromise;
+
+ expect(mocks.spawn).toHaveBeenCalledWith(
+ expect.stringContaining("@openai/codex"),
+ expect.arrayContaining([
+ "exec",
+ "--json",
+ "--sandbox",
+ "workspace-write",
+ "--cd",
+ taskDir,
+ "--skip-git-repo-check",
+ "--config",
+ "model_reasoning_effort=\"low\"",
+ ]),
+ expect.objectContaining({ env: expect.any(Object), signal: expect.any(AbortSignal) }),
+ );
+ });
});
diff --git a/apps/desktop/renderer/src/pages/HomePage.tsx b/apps/desktop/renderer/src/pages/HomePage.tsx
index 1b884af..35d5c41 100644
--- a/apps/desktop/renderer/src/pages/HomePage.tsx
+++ b/apps/desktop/renderer/src/pages/HomePage.tsx
@@ -346,7 +346,6 @@ export default function HomePage() {
const activeWorkflowFile = useConfigStore((s) => s.activeWorkflowFile);
const workflowConfig = useConfigStore((s) => s.workflowConfig);
const loadWorkFolders = useConfigStore((s) => s.loadWorkFolders);
- const workflowState = useWorkflowStore((s) => s.workflowState);
const workflowStatesByRun = useWorkflowStore((s) => s.workflowStatesByRun);
const syncTaskSummaries = useWorkflowStore((s) => s.syncTaskSummaries);
const startWorkflow = useWorkflowStore((s) => s.startWorkflow);
@@ -368,13 +367,12 @@ export default function HomePage() {
const tasks = taskSummaries.map((task) => {
const taskRunId = task.runId || task.taskId || "";
const sharedState = workflowStatesByRun[`${task.taskId}:${taskRunId}`];
- const state = sharedState || workflowState;
+ const state = sharedState;
const stateRunId = state?.runId || "";
if (
state &&
task.taskId === state.taskId &&
- taskRunId === stateRunId &&
- state.overallStatus !== "completed"
+ taskRunId === stateRunId
) {
return {
...task,
diff --git a/apps/desktop/renderer/src/pages/TaskPage.tsx b/apps/desktop/renderer/src/pages/TaskPage.tsx
index 9852776..b0e7722 100644
--- a/apps/desktop/renderer/src/pages/TaskPage.tsx
+++ b/apps/desktop/renderer/src/pages/TaskPage.tsx
@@ -12,7 +12,7 @@ import WorkflowDebugPanel from "../components/WorkflowDebugPanel";
import { getAppApi } from "../lib/api-client";
import { buildClientDebugPayload, DEBUG_EVENT_TRIGGERS, DEBUG_EVENT_TYPES } from "../lib/debug-events";
import { cn } from "../lib/utils";
-import { pushClientDebugEvent, useWorkflowStore } from "../stores/workflowStore";
+import { getWorkflowStateKey, pushClientDebugEvent, useWorkflowStore } from "../stores/workflowStore";
import { useConfigStore } from "../stores/configStore";
const appApi = getAppApi();
@@ -384,24 +384,27 @@ export default function TaskPage() {
const activeTask = useWorkflowStore((s) => s.activeTask);
const loadTask = useWorkflowStore((s) => s.loadTask);
const showToast = useWorkflowStore((s) => s.showToast);
- const workflowState = useWorkflowStore((s) => s.workflowState);
+ const workflowStatesByRun = useWorkflowStore((s) => s.workflowStatesByRun);
+ const taskId = urlTaskId || activeTask;
+ const routeStateKey = getWorkflowStateKey(taskId, urlRunId || taskId);
+ const workflowState = workflowStatesByRun[routeStateKey] || null;
useEffect(() => {
if (urlTaskId && (urlTaskId !== activeTask || workflowState?.taskId !== urlTaskId || (urlRunId && workflowState?.runId !== urlRunId))) {
loadTask(urlTaskId, urlRunId);
}
}, [urlTaskId, urlRunId, activeTask, workflowState?.taskId, workflowState?.runId]);
- const selectedPhase = useWorkflowStore((s) => s.selectedPhase);
+ const selectedPhaseByRun = useWorkflowStore((s) => s.selectedPhaseByRun);
const setSelectedPhase = useWorkflowStore((s) => s.setSelectedPhase);
- const phaseMessages = useWorkflowStore((s) => s.phaseMessages);
- const phaseOutputArtifacts = useWorkflowStore((s) => s.phaseOutputArtifacts);
- const phaseInteractions = useWorkflowStore((s) => s.phaseInteractions);
- const isStreaming = useWorkflowStore((s) => s.isStreaming);
- const streamingPhase = useWorkflowStore((s) => s.streamingPhase);
- const debugEvents = useWorkflowStore((s) => s.debugEvents);
- const connectionState = useWorkflowStore((s) => s.connectionState);
- const lastEventAt = useWorkflowStore((s) => s.lastEventAt);
- const lastError = useWorkflowStore((s) => s.lastError);
+ const phaseMessagesByRun = useWorkflowStore((s) => s.phaseMessagesByRun);
+ const phaseOutputArtifactsByRun = useWorkflowStore((s) => s.phaseOutputArtifactsByRun);
+ const phaseInteractionsByRun = useWorkflowStore((s) => s.phaseInteractionsByRun);
+ const isStreamingByRun = useWorkflowStore((s) => s.isStreamingByRun);
+ const streamingPhaseByRun = useWorkflowStore((s) => s.streamingPhaseByRun);
+ const debugEventsByRun = useWorkflowStore((s) => s.debugEventsByRun);
+ const connectionStateByRun = useWorkflowStore((s) => s.connectionStateByRun);
+ const lastEventAtByRun = useWorkflowStore((s) => s.lastEventAtByRun);
+ const lastErrorByRun = useWorkflowStore((s) => s.lastErrorByRun);
const approve = useWorkflowStore((s) => s.approve);
const reject = useWorkflowStore((s) => s.reject);
const sendMessage = useWorkflowStore((s) => s.sendMessage);
@@ -413,7 +416,6 @@ export default function TaskPage() {
const workflowConfig = useConfigStore((s) => s.workflowConfig);
const runWorkflowConfig = workflowState?.workflowConfig || workflowConfig;
- const taskId = urlTaskId || activeTask;
const [cleanedWorktree, setCleanedWorktree] = useState(null);
const visibleWorktree = workflowState?.worktree || cleanedWorktree;
const worktreeDisplayName = getWorktreeDisplayName(visibleWorktree);
@@ -421,6 +423,19 @@ export default function TaskPage() {
const isWorktreeCleaned = Boolean(visibleWorktree?.cleaned);
const hasWorktree = Boolean(visibleWorktree?.enabled);
const currentPhase = workflowState?.currentPhase;
+ const currentStateKey = workflowState
+ ? getWorkflowStateKey(workflowState.taskId, workflowState.runId)
+ : routeStateKey;
+ const selectedPhase = selectedPhaseByRun[currentStateKey] || null;
+ const phaseMessages = phaseMessagesByRun[currentStateKey] || {};
+ const phaseOutputArtifacts = phaseOutputArtifactsByRun[currentStateKey] || {};
+ const phaseInteractions = phaseInteractionsByRun[currentStateKey] || {};
+ const isStreaming = Boolean(isStreamingByRun[currentStateKey]);
+ const streamingPhase = streamingPhaseByRun[currentStateKey] || null;
+ const debugEvents = debugEventsByRun[currentStateKey] || [];
+ const connectionState = connectionStateByRun[currentStateKey] || "disconnected";
+ const lastEventAt = lastEventAtByRun[currentStateKey] || null;
+ const lastError = lastErrorByRun[currentStateKey] || null;
const phases = workflowState?.phases || [];
const selectedRunPhase = selectedPhase;
const activePhase = selectedRunPhase || currentPhase || phases[0]?.id || null;
@@ -742,7 +757,7 @@ export default function TaskPage() {
phases={phases}
workflowConfig={runWorkflowConfig}
selectedPhase={detailPhase}
- onSelect={setSelectedPhase}
+ onSelect={(phase) => setSelectedPhase(phase, currentStateKey)}
/>
diff --git a/apps/desktop/renderer/src/stores/workflowStore.test.ts b/apps/desktop/renderer/src/stores/workflowStore.test.ts
new file mode 100644
index 0000000..0dc4793
--- /dev/null
+++ b/apps/desktop/renderer/src/stores/workflowStore.test.ts
@@ -0,0 +1,50 @@
+import { describe, expect, test } from "vitest";
+import { getPhasesWithRunningPhase, getSelectedPhaseFromWorkflowState } from "./workflowStore";
+
+describe("getSelectedPhaseFromWorkflowState", () => {
+ test("prefers the active phase over a stale currentPhase", () => {
+ expect(getSelectedPhaseFromWorkflowState({
+ currentPhase: "implement",
+ phases: [
+ { id: "implement", status: "completed" },
+ { id: "code_review", status: "in_progress" },
+ ],
+ })).toBe("code_review");
+ });
+
+ test("shows the latest started phase when the run is completed", () => {
+ expect(getSelectedPhaseFromWorkflowState({
+ currentPhase: "completed",
+ phases: [
+ { id: "implement", status: "completed" },
+ { id: "code_review", status: "completed" },
+ ],
+ })).toBe("code_review");
+ });
+
+ test("falls back to currentPhase when there is no active phase", () => {
+ expect(getSelectedPhaseFromWorkflowState({
+ currentPhase: "code_review",
+ phases: [
+ { id: "implement", status: "completed" },
+ { id: "code_review", status: "pending" },
+ ],
+ })).toBe("code_review");
+ });
+
+ test("resets later phases when an earlier phase starts running again", () => {
+ expect(getPhasesWithRunningPhase([
+ { id: "implement", status: "completed" },
+ { id: "code_review", status: "completed" },
+ { id: "risk_gate", status: "completed" },
+ { id: "review", status: "awaiting_input" },
+ { id: "commit", status: "pending" },
+ ], "implement", "now")).toEqual([
+ { id: "implement", status: "in_progress", updated: "now" },
+ { id: "code_review", status: "pending", updated: "now" },
+ { id: "risk_gate", status: "pending", updated: "now" },
+ { id: "review", status: "pending", updated: "now" },
+ { id: "commit", status: "pending", updated: undefined },
+ ]);
+ });
+});
diff --git a/apps/desktop/renderer/src/stores/workflowStore.ts b/apps/desktop/renderer/src/stores/workflowStore.ts
index 9df1f59..e64c168 100644
--- a/apps/desktop/renderer/src/stores/workflowStore.ts
+++ b/apps/desktop/renderer/src/stores/workflowStore.ts
@@ -24,13 +24,21 @@ function playNotificationSound() {
let prevStatusRef = {};
let unsubscribeWorkflowEvents = null;
+const ACTIVE_PHASE_STATUSES = new Set(["in_progress", "awaiting_input", "paused"]);
-function getWorkflowStateKey(taskId, runId = "") {
+export function getWorkflowStateKey(taskId, runId = "") {
const id = taskId || "";
const run = runId || taskId || "";
return `${id}:${run}`;
}
+function getEventStateKey(msg) {
+ const taskId = msg?.taskId || msg?.state?.taskId || "";
+ const runId = msg?.runId || msg?.state?.runId || taskId;
+ if (!taskId) return "";
+ return getWorkflowStateKey(taskId, runId);
+}
+
function getCurrentPhaseFromTask(task) {
if (task.status === "completed") return "completed";
const activePhase = (task.phases || []).find((phase) =>
@@ -39,9 +47,52 @@ function getCurrentPhaseFromTask(task) {
return activePhase?.id || activePhase?.name || task.phases?.[0]?.id || null;
}
+export function getSelectedPhaseFromWorkflowState(state) {
+ const phases = state?.phases || [];
+ const activePhase = phases.find((phase) =>
+ phase.status === "in_progress" || phase.status === "paused" || phase.status === "awaiting_input" || phase.status === "failed"
+ );
+ if (activePhase) return activePhase.id || activePhase.name || null;
+
+ if (state?.currentPhase && state.currentPhase !== "completed") {
+ const currentPhase = phases.find((phase) => (phase.id || phase.name) === state.currentPhase);
+ if (currentPhase) return currentPhase.id || currentPhase.name || null;
+ }
+
+ const latestStartedPhase = phases.findLast?.((phase) => phase.status && phase.status !== "pending")
+ || [...phases].reverse().find((phase) => phase.status && phase.status !== "pending");
+ return latestStartedPhase?.id || latestStartedPhase?.name || phases[0]?.id || phases[0]?.name || null;
+}
+
+function getRunningPhaseFromWorkflowState(state) {
+ const runningPhase = (state?.phases || []).find((phase) => phase.status === "in_progress");
+ return runningPhase?.id || runningPhase?.name || null;
+}
+
+export function getPhasesWithRunningPhase(phases = [], phaseId, now = new Date().toISOString()) {
+ const phaseIndex = phases.findIndex((phase) => (phase.id || phase.name) === phaseId);
+ return phases.map((phase, index) => {
+ const id = phase.id || phase.name;
+ let status = phase.status;
+ if (id === phaseId) {
+ status = "in_progress";
+ } else if (phaseIndex >= 0 && index > phaseIndex) {
+ status = "pending";
+ } else if (ACTIVE_PHASE_STATUSES.has(phase.status)) {
+ status = "completed";
+ }
+ return {
+ ...phase,
+ status,
+ updated: id === phaseId || status !== phase.status ? now : phase.updated,
+ };
+ });
+}
+
function getWorkflowStateFromTask(task) {
const taskId = task.taskId || "";
if (!taskId) return null;
+ if (!Array.isArray(task.phases) || task.phases.length === 0) return null;
const runId = task.runId || taskId;
return {
taskId,
@@ -49,6 +100,7 @@ function getWorkflowStateFromTask(task) {
workFolder: task.workFolderPath || "",
currentPhase: getCurrentPhaseFromTask(task),
overallStatus: task.status || "pending",
+ updated: task.updated || task.updatedAt || "",
workflowConfig: task.workflowConfig || null,
phases: task.phases || [],
};
@@ -102,25 +154,50 @@ function createDebugEvent(payload, source = "workflow") {
};
}
-function appendDebugEvent(set, payload, source = "workflow") {
+function appendDebugEvent(set, get, payload, source = "workflow") {
const event = createDebugEvent(payload, source);
set((state) => {
- const debugEvents = [...state.debugEvents, event];
+ const stateKey = getEventStateKey(payload) || getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId);
+ const debugEvents = stateKey ? [...(state.debugEventsByRun[stateKey] || []), event] : [];
const nextState = {
- debugEvents: debugEvents.slice(-MAX_DEBUG_EVENTS),
- lastEventAt: event.at,
- connectionState: source === "workflow" ? "connected" : state.connectionState,
+ debugEventsByRun: stateKey
+ ? {
+ ...state.debugEventsByRun,
+ [stateKey]: debugEvents.slice(-MAX_DEBUG_EVENTS),
+ }
+ : state.debugEventsByRun,
+ lastEventAtByRun: stateKey
+ ? {
+ ...state.lastEventAtByRun,
+ [stateKey]: event.at,
+ }
+ : state.lastEventAtByRun,
+ connectionStateByRun: stateKey && source === "workflow"
+ ? {
+ ...state.connectionStateByRun,
+ [stateKey]: "connected",
+ }
+ : state.connectionStateByRun,
};
- if (payload?.type === DEBUG_EVENT_TYPES.ERROR || payload?.type === DEBUG_EVENT_TYPES.PHASE_FAILED) {
- nextState.lastError = {
- at: event.at,
- phase: payload.phase || null,
- message: payload.message || "Unknown error",
- payload,
+ if (stateKey && (payload?.type === DEBUG_EVENT_TYPES.ERROR || payload?.type === DEBUG_EVENT_TYPES.PHASE_FAILED)) {
+ nextState.lastErrorByRun = {
+ ...state.lastErrorByRun,
+ [stateKey]: {
+ at: event.at,
+ phase: payload.phase || null,
+ message: payload.message || "Unknown error",
+ payload,
+ },
+ };
+ nextState.isStreamingByRun = {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ };
+ nextState.streamingPhaseByRun = {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
};
- nextState.isStreaming = false;
- nextState.streamingPhase = null;
}
return nextState;
@@ -130,9 +207,17 @@ function appendDebugEvent(set, payload, source = "workflow") {
export function pushClientDebugEvent(payload) {
useWorkflowStore.setState((state) => {
const event = createDebugEvent(payload, "client");
+ const stateKey = getEventStateKey(payload) || getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId);
+ if (!stateKey) return {};
return {
- debugEvents: [...state.debugEvents, event].slice(-MAX_DEBUG_EVENTS),
- lastEventAt: event.at,
+ debugEventsByRun: {
+ ...state.debugEventsByRun,
+ [stateKey]: [...(state.debugEventsByRun[stateKey] || []), event].slice(-MAX_DEBUG_EVENTS),
+ },
+ lastEventAtByRun: {
+ ...state.lastEventAtByRun,
+ [stateKey]: event.at,
+ },
};
});
}
@@ -140,219 +225,400 @@ export function pushClientDebugEvent(payload) {
export function pushClientErrorEvent(payload) {
useWorkflowStore.setState((state) => {
const event = createDebugEvent(payload, "client");
+ const stateKey = getEventStateKey(payload) || getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId);
+ if (!stateKey) return {};
return {
- debugEvents: [...state.debugEvents, event].slice(-MAX_DEBUG_EVENTS),
- lastEventAt: event.at,
- lastError: {
- at: event.at,
- phase: payload.phase || null,
- message: payload.message || "Unknown error",
- payload,
- },
- isStreaming: false,
- streamingPhase: null,
+ debugEventsByRun: {
+ ...state.debugEventsByRun,
+ [stateKey]: [...(state.debugEventsByRun[stateKey] || []), event].slice(-MAX_DEBUG_EVENTS),
+ },
+ lastEventAtByRun: {
+ ...state.lastEventAtByRun,
+ [stateKey]: event.at,
+ },
+ lastErrorByRun: {
+ ...state.lastErrorByRun,
+ [stateKey]: {
+ at: event.at,
+ phase: payload.phase || null,
+ message: payload.message || "Unknown error",
+ payload,
+ },
+ },
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
+ },
};
});
}
-function matchesWorkflowEvent(msg, taskId, runId = "") {
- const eventTaskId = msg?.taskId || msg?.state?.taskId || "";
- const eventRunId = msg?.runId || msg?.state?.runId || "";
- if (eventTaskId !== taskId) return false;
- if (runId && eventRunId !== runId) return false;
- return true;
-}
-
-function appendPhaseInteraction(set, phase, interaction) {
- if (!phase || !interaction) return;
- set((state) => ({
- phaseInteractions: {
- ...state.phaseInteractions,
- [phase]: [...(state.phaseInteractions[phase] || []), interaction],
- },
- }));
-}
-
-function markPhaseRunning(set, phaseId) {
+function markPhaseRunning(set, msg, phaseId) {
if (!phaseId) return;
set((state) => {
- if (!state.workflowState) return {};
+ const stateKey = getEventStateKey(msg);
+ if (!stateKey) return {};
+ const existingState = state.workflowStatesByRun[stateKey] || (
+ getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId) === stateKey
+ ? state.workflowState
+ : null
+ );
+ if (!existingState) return {};
+
+ const now = new Date().toISOString();
const workflowState = {
- ...state.workflowState,
+ ...existingState,
currentPhase: phaseId,
overallStatus: "in_progress",
- phases: (state.workflowState.phases || []).map((phase) => ({
- ...phase,
- status: phase.id === phaseId
- ? "in_progress"
- : phase.status === "in_progress"
- ? "completed"
- : phase.status,
- })),
+ updated: now,
+ phases: getPhasesWithRunningPhase(existingState.phases || [], phaseId, now),
};
- const stateKey = getWorkflowStateKey(workflowState.taskId, workflowState.runId);
+ const activeStateKey = getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId);
+ const isActiveState = activeStateKey === stateKey;
return {
- selectedPhase: phaseId,
- workflowState,
+ workflowState: isActiveState ? workflowState : state.workflowState,
workflowStatesByRun: {
...state.workflowStatesByRun,
[stateKey]: workflowState,
},
+ selectedPhaseByRun: {
+ ...state.selectedPhaseByRun,
+ [stateKey]: phaseId,
+ },
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: true,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: phaseId,
+ },
+ connectionStateByRun: {
+ ...state.connectionStateByRun,
+ [stateKey]: "connected",
+ },
};
});
}
function applyDisconnectedState(set, get) {
const state = get().workflowState;
+ const stateKey = getWorkflowStateKey(state?.taskId, state?.runId);
if (state) {
const updated = {
...state,
overallStatus: state.overallStatus,
phases: state.phases.map((phase) => ({ ...phase })),
};
- set({ workflowState: updated, isStreaming: false, streamingPhase: null, connectionState: "disconnected" });
+ set((current) => ({
+ workflowState: updated,
+ isStreamingByRun: {
+ ...current.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...current.streamingPhaseByRun,
+ [stateKey]: null,
+ },
+ connectionStateByRun: {
+ ...current.connectionStateByRun,
+ [stateKey]: "disconnected",
+ },
+ }));
} else {
- set({ isStreaming: false, streamingPhase: null, connectionState: "disconnected" });
+ set({});
}
}
-function attachWorkflowEvents(set, get, taskId, runId = "") {
- if (unsubscribeWorkflowEvents) unsubscribeWorkflowEvents();
+function isActiveWorkflowEvent(state, msg) {
+ const eventKey = getEventStateKey(msg);
+ if (!eventKey) return false;
+ return getWorkflowStateKey(state.workflowState?.taskId, state.workflowState?.runId) === eventKey;
+}
+
+function attachWorkflowEvents(set, get) {
+ if (unsubscribeWorkflowEvents) return () => applyDisconnectedState(set, get);
unsubscribeWorkflowEvents = desktopApi.onWorkflowEvent((msg) => {
if (!msg) return;
- if (!matchesWorkflowEvent(msg, taskId, runId)) return;
- appendDebugEvent(set, msg);
+ appendDebugEvent(set, get, msg);
if (msg.type === "phase_artifact") {
if (!msg.outputKey) return;
set((state) => ({
- phaseOutputArtifacts: {
- ...state.phaseOutputArtifacts,
- [msg.phase]: {
- ...(state.phaseOutputArtifacts[msg.phase] || {}),
- [msg.outputKey]: msg.content,
+ phaseOutputArtifactsByRun: {
+ ...state.phaseOutputArtifactsByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseOutputArtifactsByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: {
+ ...(state.phaseOutputArtifactsByRun[getEventStateKey(msg)]?.[msg.phase] || {}),
+ [msg.outputKey]: msg.content,
+ },
},
},
}));
} else if (msg.type === "backend_selected") {
- markPhaseRunning(set, msg.phase);
+ markPhaseRunning(set, msg, msg.phase);
+ set((state) => {
+ const stateKey = getEventStateKey(msg);
+ if (!stateKey || !msg.phase) return {};
+ const existingInteractions = state.phaseInteractionsByRun[stateKey]?.[msg.phase] || [];
+ const lastInteraction = existingInteractions[existingInteractions.length - 1];
+ if (lastInteraction?.type === "phase_start" && lastInteraction.backend === msg.backend) return {};
+ const startIndexes = existingInteractions
+ .filter((item) => item?.type === "phase_start")
+ .map((item) => Number(item.runIndex) || 0);
+ const lastRunIndex = Math.max(0, ...startIndexes);
+ const hasEarlierConversation = existingInteractions.some((item) => item?.type !== "phase_start");
+ const runIndex = lastRunIndex > 0 ? lastRunIndex + 1 : hasEarlierConversation ? 2 : 1;
+ return {
+ phaseInteractionsByRun: {
+ ...state.phaseInteractionsByRun,
+ [stateKey]: {
+ ...(state.phaseInteractionsByRun[stateKey] || {}),
+ [msg.phase]: [
+ ...existingInteractions,
+ {
+ id: `phase-start-${Date.now()}`,
+ at: new Date().toISOString(),
+ phase: msg.phase,
+ role: "system",
+ type: "phase_start",
+ text: "",
+ backend: msg.backend,
+ runIndex,
+ },
+ ],
+ },
+ },
+ };
+ });
} else if (msg.type === "text_delta") {
- markPhaseRunning(set, msg.phase);
+ markPhaseRunning(set, msg, msg.phase);
set((state) => ({
- phaseMessages: { ...state.phaseMessages, [msg.phase]: (state.phaseMessages[msg.phase] || "") + msg.text },
- phaseInteractions: {
- ...state.phaseInteractions,
- [msg.phase]: [
- ...(state.phaseInteractions[msg.phase] || []),
- {
- role: "assistant",
- type: "assistant_delta",
- text: msg.text,
- backend: msg.backend,
- },
- ],
+ phaseMessagesByRun: {
+ ...state.phaseMessagesByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: (state.phaseMessagesByRun[getEventStateKey(msg)]?.[msg.phase] || "") + msg.text,
+ },
+ },
+ phaseInteractionsByRun: {
+ ...state.phaseInteractionsByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseInteractionsByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: [
+ ...(state.phaseInteractionsByRun[getEventStateKey(msg)]?.[msg.phase] || []),
+ {
+ role: "assistant",
+ type: "assistant_delta",
+ text: msg.text,
+ backend: msg.backend,
+ },
+ ],
+ },
},
- isStreaming: true,
- streamingPhase: msg.phase,
}));
} else if (msg.type === "state") {
const state = msg.state;
const stateKey = getWorkflowStateKey(state.taskId, state.runId);
- set((current) => ({
- workflowState: state,
- workflowStatesByRun: {
- ...current.workflowStatesByRun,
- [stateKey]: state,
- },
- }));
+ const cachedState = get().workflowStatesByRun[stateKey];
+ const displayState = getLatestWorkflowState(state, cachedState);
+ let isActiveState = false;
+ set((current) => {
+ const activeStateKey = getWorkflowStateKey(current.workflowState?.taskId, current.workflowState?.runId);
+ isActiveState = activeStateKey === stateKey || (!current.workflowState && current.activeTask === displayState.taskId);
+ return {
+ workflowState: isActiveState ? displayState : current.workflowState,
+ workflowStatesByRun: {
+ ...current.workflowStatesByRun,
+ [stateKey]: displayState,
+ },
+ selectedPhaseByRun: {
+ ...current.selectedPhaseByRun,
+ [stateKey]: getSelectedPhaseFromWorkflowState(displayState),
+ },
+ };
+ });
- const prevPhase = prevStatusRef._currentPhase;
+ const prevStatus = prevStatusRef[stateKey] || {};
+ const prevPhase = prevStatus._currentPhase;
+ const selectedPhase = getSelectedPhaseFromWorkflowState(displayState);
- if (state.currentPhase && state.currentPhase !== prevPhase) {
- if (state.currentPhase !== "completed") set({ selectedPhase: state.currentPhase });
+ if (isActiveState && displayState.currentPhase && displayState.currentPhase !== prevPhase) {
+ if (selectedPhase) {
+ set((state) => ({
+ selectedPhaseByRun: {
+ ...state.selectedPhaseByRun,
+ [stateKey]: selectedPhase,
+ },
+ }));
+ }
if (prevPhase) playNotificationSound();
}
let streaming = false;
- for (const phase of state.phases) {
- const prevStatus = prevStatusRef[phase.id];
- if (phase.status === "awaiting_input" && prevStatus !== "awaiting_input") {
+ for (const phase of displayState.phases) {
+ const prevPhaseStatus = prevStatus[phase.id];
+ if (isActiveState && phase.status === "awaiting_input" && prevPhaseStatus !== "awaiting_input") {
playNotificationSound();
- set({ selectedPhase: phase.id });
+ set((state) => ({
+ selectedPhaseByRun: {
+ ...state.selectedPhaseByRun,
+ [stateKey]: phase.id,
+ },
+ }));
}
- if (phase.status === "paused" && prevStatus !== "paused") {
- set({ selectedPhase: phase.id });
+ if (isActiveState && phase.status === "paused" && prevPhaseStatus !== "paused") {
+ set((state) => ({
+ selectedPhaseByRun: {
+ ...state.selectedPhaseByRun,
+ [stateKey]: phase.id,
+ },
+ }));
}
if (phase.status === "in_progress") streaming = true;
}
- if (!streaming) {
- set({ isStreaming: false, streamingPhase: null });
+ if (isActiveState && !streaming) {
+ set((state) => ({
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
+ },
+ }));
}
const statusMap = {};
- for (const phase of state.phases) statusMap[phase.id] = phase.status;
- statusMap._currentPhase = state.currentPhase;
- prevStatusRef = statusMap;
+ for (const phase of displayState.phases) statusMap[phase.id] = phase.status;
+ statusMap._currentPhase = displayState.currentPhase;
+ prevStatusRef = {
+ ...prevStatusRef,
+ [stateKey]: statusMap,
+ };
- if (state.overallStatus === "completed") {
+ if (displayState.overallStatus === "completed") {
useConfigStore.getState().loadWorkFolders();
}
} else if (msg.type === "phase_done") {
- set({ isStreaming: false, streamingPhase: null });
+ set((state) => {
+ const stateKey = getEventStateKey(msg);
+ if (!stateKey) return {};
+ return {
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
+ },
+ };
+ });
} else if (msg.type === DEBUG_EVENT_TYPES.PHASE_PAUSED) {
- set({ isStreaming: false, streamingPhase: null });
+ set((state) => {
+ const stateKey = getEventStateKey(msg);
+ if (!stateKey) return {};
+ return {
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
+ },
+ };
+ });
} else if (msg.type === "phase_content") {
- set((state) => ({ phaseMessages: { ...state.phaseMessages, [msg.phase]: msg.content } }));
+ set((state) => ({
+ phaseMessagesByRun: {
+ ...state.phaseMessagesByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: msg.content,
+ },
+ },
+ }));
} else if (msg.type === "phase_interaction") {
- appendPhaseInteraction(set, msg.phase, msg.interaction);
+ set((state) => {
+ if (!msg.phase || !msg.interaction) return {};
+ const nextByRun = {
+ ...state.phaseInteractionsByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseInteractionsByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: [...(state.phaseInteractionsByRun[getEventStateKey(msg)]?.[msg.phase] || []), msg.interaction],
+ },
+ };
+ return { phaseInteractionsByRun: nextByRun };
+ });
} else if (msg.type === "user_message") {
set((state) => ({
- phaseMessages: {
- ...state.phaseMessages,
- [msg.phase]: (state.phaseMessages[msg.phase] || "") + `\n\n---\n\n**You:** ${msg.text}\n\n`,
+ phaseMessagesByRun: {
+ ...state.phaseMessagesByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: (state.phaseMessagesByRun[getEventStateKey(msg)]?.[msg.phase] || "") + `\n\n---\n\n**You:** ${msg.text}\n\n`,
+ },
},
}));
} else if (msg.type === "tool_use") {
- markPhaseRunning(set, msg.phase);
+ markPhaseRunning(set, msg, msg.phase);
set((state) => ({
- phaseMessages: {
- ...state.phaseMessages,
- [msg.phase]: (state.phaseMessages[msg.phase] || "") + `\n\n*${msg.log || msg.name}*\n\n`,
+ phaseMessagesByRun: {
+ ...state.phaseMessagesByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseMessagesByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: (state.phaseMessagesByRun[getEventStateKey(msg)]?.[msg.phase] || "") + `\n\n*${msg.log || msg.name}*\n\n`,
+ },
},
- phaseInteractions: {
- ...state.phaseInteractions,
- [msg.phase]: [
- ...(state.phaseInteractions[msg.phase] || []),
- {
- role: "tool",
- type: "tool_use",
- text: msg.log || msg.name || "",
- backend: msg.name,
- },
- ],
+ phaseInteractionsByRun: {
+ ...state.phaseInteractionsByRun,
+ [getEventStateKey(msg)]: {
+ ...(state.phaseInteractionsByRun[getEventStateKey(msg)] || {}),
+ [msg.phase]: [
+ ...(state.phaseInteractionsByRun[getEventStateKey(msg)]?.[msg.phase] || []),
+ {
+ role: "tool",
+ type: "tool_use",
+ text: msg.log || msg.name || "",
+ backend: msg.name,
+ },
+ ],
+ },
},
}));
} else if (msg.type === "session_attached" && msg.phase && msg.sessionId) {
- set((state) => ({
- workflowState: state.workflowState
- ? {
- ...state.workflowState,
- phases: state.workflowState.phases.map((phase) =>
- phase.id === msg.phase ? { ...phase, sessionId: msg.sessionId } : phase
- ),
- }
- : state.workflowState,
- }));
+ set((state) => {
+ const stateKey = getEventStateKey(msg);
+ const existingState = state.workflowStatesByRun[stateKey] || (
+ isActiveWorkflowEvent(state, msg) ? state.workflowState : null
+ );
+ if (!existingState) return {};
+ const nextWorkflowState = {
+ ...existingState,
+ phases: existingState.phases.map((phase) =>
+ phase.id === msg.phase ? { ...phase, sessionId: msg.sessionId } : phase
+ ),
+ };
+ return {
+ workflowState: isActiveWorkflowEvent(state, msg) ? nextWorkflowState : state.workflowState,
+ workflowStatesByRun: {
+ ...state.workflowStatesByRun,
+ [stateKey]: nextWorkflowState,
+ },
+ };
+ });
}
});
return () => {
- pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.CLIENT_DETACH, { taskId, runId }));
- if (taskId) desktopApi.detachWorkflow(taskId, runId);
- if (unsubscribeWorkflowEvents) {
- unsubscribeWorkflowEvents();
- unsubscribeWorkflowEvents = null;
- }
applyDisconnectedState(set, get);
};
}
@@ -361,20 +627,27 @@ export const useWorkflowStore = create((set, get) => ({
activeTask: null,
workflowState: null,
workflowStatesByRun: {},
- selectedPhase: null,
- phaseMessages: {},
- phaseOutputArtifacts: {},
- phaseInteractions: {},
- isStreaming: false,
- streamingPhase: null,
- connectionState: "disconnected",
- debugEvents: [],
- lastEventAt: null,
- lastError: null,
+ selectedPhaseByRun: {},
+ phaseMessagesByRun: {},
+ phaseOutputArtifactsByRun: {},
+ phaseInteractionsByRun: {},
+ isStreamingByRun: {},
+ streamingPhaseByRun: {},
+ connectionStateByRun: {},
+ debugEventsByRun: {},
+ lastEventAtByRun: {},
+ lastErrorByRun: {},
toast: null,
- setSelectedPhase(phase) {
- set({ selectedPhase: phase });
+ setSelectedPhase(phase, stateKey = "") {
+ const key = stateKey || getWorkflowStateKey(get().workflowState?.taskId, get().workflowState?.runId);
+ if (!key) return;
+ set((state) => ({
+ selectedPhaseByRun: {
+ ...state.selectedPhaseByRun,
+ [key]: phase,
+ },
+ }));
},
showToast(message, duration = 3000) {
@@ -382,10 +655,18 @@ export const useWorkflowStore = create((set, get) => ({
setTimeout(() => set({ toast: null }), duration);
},
+ ensureWorkflowEvents() {
+ attachWorkflowEvents(set, get);
+ },
+
syncTaskSummaries(tasks) {
if (!Array.isArray(tasks) || tasks.length === 0) return;
set((state) => {
const workflowStatesByRun = { ...state.workflowStatesByRun };
+ const selectedPhaseByRun = { ...state.selectedPhaseByRun };
+ const isStreamingByRun = { ...state.isStreamingByRun };
+ const streamingPhaseByRun = { ...state.streamingPhaseByRun };
+ const connectionStateByRun = { ...state.connectionStateByRun };
let changed = false;
for (const task of tasks) {
@@ -393,9 +674,18 @@ export const useWorkflowStore = create((set, get) => ({
if (!summaryState) continue;
const stateKey = getWorkflowStateKey(summaryState.taskId, summaryState.runId);
const existingState = workflowStatesByRun[stateKey];
- if (existingState?.overallStatus === "completed") continue;
- workflowStatesByRun[stateKey] = mergeWorkflowStates(existingState, summaryState);
- changed = true;
+ const nextState = getLatestWorkflowState(summaryState, existingState);
+ if (nextState !== existingState) {
+ workflowStatesByRun[stateKey] = nextState;
+ selectedPhaseByRun[stateKey] = getSelectedPhaseFromWorkflowState(nextState);
+ const runningPhase = getRunningPhaseFromWorkflowState(nextState);
+ isStreamingByRun[stateKey] = Boolean(runningPhase);
+ streamingPhaseByRun[stateKey] = runningPhase;
+ connectionStateByRun[stateKey] = nextState.overallStatus === "completed"
+ ? "disconnected"
+ : connectionStateByRun[stateKey] || (runningPhase ? "connected" : "connecting");
+ changed = true;
+ }
}
if (!changed) return {};
@@ -403,6 +693,10 @@ export const useWorkflowStore = create((set, get) => ({
const activeKey = getWorkflowStateKey(activeState?.taskId, activeState?.runId);
return {
workflowStatesByRun,
+ selectedPhaseByRun,
+ isStreamingByRun,
+ streamingPhaseByRun,
+ connectionStateByRun,
workflowState: workflowStatesByRun[activeKey] || activeState,
};
});
@@ -413,39 +707,77 @@ export const useWorkflowStore = create((set, get) => ({
const stateKey = getWorkflowStateKey(taskId, runId);
const cachedState = get().workflowStatesByRun[stateKey];
if (cachedState) {
- const selectedPhase = cachedState.currentPhase && cachedState.currentPhase !== "completed"
- ? cachedState.currentPhase
- : cachedState.phases?.[0]?.id || null;
+ const selectedPhase = getSelectedPhaseFromWorkflowState(cachedState);
+ const runningPhase = getRunningPhaseFromWorkflowState(cachedState);
set({
activeTask: taskId,
workflowState: cachedState,
- selectedPhase,
- connectionState: cachedState.overallStatus === "completed" ? "disconnected" : get().connectionState,
+ selectedPhaseByRun: {
+ ...get().selectedPhaseByRun,
+ [stateKey]: selectedPhase,
+ },
+ isStreamingByRun: {
+ ...get().isStreamingByRun,
+ [stateKey]: Boolean(runningPhase),
+ },
+ streamingPhaseByRun: {
+ ...get().streamingPhaseByRun,
+ [stateKey]: runningPhase,
+ },
+ connectionStateByRun: {
+ ...get().connectionStateByRun,
+ [stateKey]: cachedState.overallStatus === "completed"
+ ? "disconnected"
+ : get().connectionStateByRun[stateKey] || (runningPhase ? "connected" : "connecting"),
+ },
});
}
const { state, messages, outputArtifacts, interactions } = await desktopApi.getTaskState(taskId, runId);
const currentState = get().workflowStatesByRun[stateKey];
const displayState = getLatestWorkflowState(state, currentState);
- const selectedPhase = displayState.currentPhase && displayState.currentPhase !== "completed"
- ? displayState.currentPhase
- : displayState.phases?.[0]?.id || null;
+ const displayStateKey = getWorkflowStateKey(displayState.taskId, displayState.runId || runId);
+ const selectedPhase = getSelectedPhaseFromWorkflowState(displayState);
+ const runningPhase = getRunningPhaseFromWorkflowState(displayState);
+ const existingMessages = get().phaseMessagesByRun[displayStateKey] || {};
+ const existingOutputArtifacts = get().phaseOutputArtifactsByRun[displayStateKey] || {};
+ const existingInteractions = get().phaseInteractionsByRun[displayStateKey] || {};
set({
activeTask: taskId,
workflowState: displayState,
- phaseMessages: messages || {},
- phaseOutputArtifacts: outputArtifacts || {},
- phaseInteractions: interactions || {},
- selectedPhase,
- isStreaming: false,
- streamingPhase: null,
- connectionState: displayState.overallStatus === "completed" ? "disconnected" : "connecting",
- debugEvents: [],
- lastEventAt: null,
- lastError: null,
+ selectedPhaseByRun: {
+ ...get().selectedPhaseByRun,
+ [displayStateKey]: selectedPhase,
+ },
+ isStreamingByRun: {
+ ...get().isStreamingByRun,
+ [displayStateKey]: Boolean(runningPhase),
+ },
+ streamingPhaseByRun: {
+ ...get().streamingPhaseByRun,
+ [displayStateKey]: runningPhase,
+ },
+ connectionStateByRun: {
+ ...get().connectionStateByRun,
+ [displayStateKey]: displayState.overallStatus === "completed"
+ ? "disconnected"
+ : get().connectionStateByRun[displayStateKey] || (runningPhase ? "connected" : "connecting"),
+ },
workflowStatesByRun: {
...get().workflowStatesByRun,
- [stateKey]: displayState,
+ [displayStateKey]: displayState,
+ },
+ phaseMessagesByRun: {
+ ...get().phaseMessagesByRun,
+ [displayStateKey]: Object.keys(existingMessages).length ? existingMessages : messages || {},
+ },
+ phaseOutputArtifactsByRun: {
+ ...get().phaseOutputArtifactsByRun,
+ [displayStateKey]: Object.keys(existingOutputArtifacts).length ? existingOutputArtifacts : outputArtifacts || {},
+ },
+ phaseInteractionsByRun: {
+ ...get().phaseInteractionsByRun,
+ [displayStateKey]: Object.keys(existingInteractions).length ? existingInteractions : interactions || {},
},
});
@@ -456,8 +788,14 @@ export const useWorkflowStore = create((set, get) => ({
},
async connectWorkflow(taskId, workFolder, taskInputs, images, runId, worktreeName, workflowFilename) {
- pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.CLIENT_CONNECT, { taskId, workFolder }));
- set({ connectionState: "connecting" });
+ pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.CLIENT_CONNECT, { taskId, runId, workFolder }));
+ const stateKey = getWorkflowStateKey(taskId, runId || taskId);
+ set((state) => ({
+ connectionStateByRun: {
+ ...state.connectionStateByRun,
+ [stateKey]: "connecting",
+ },
+ }));
const detach = attachWorkflowEvents(set, get, taskId, runId);
try {
await desktopApi.startWorkflow({ taskId, workFolder, taskInputs, images, runId, worktreeName, workflowFilename });
@@ -497,19 +835,51 @@ export const useWorkflowStore = create((set, get) => ({
set({
activeTask: id,
- selectedPhase: firstPhase,
- phaseMessages: {},
- phaseOutputArtifacts: {},
- phaseInteractions: {},
workflowState,
workflowStatesByRun: {
...get().workflowStatesByRun,
[stateKey]: workflowState,
},
- connectionState: "connecting",
- debugEvents: [],
- lastEventAt: null,
- lastError: null,
+ selectedPhaseByRun: {
+ ...get().selectedPhaseByRun,
+ [stateKey]: firstPhase,
+ },
+ phaseMessagesByRun: {
+ ...get().phaseMessagesByRun,
+ [stateKey]: {},
+ },
+ phaseOutputArtifactsByRun: {
+ ...get().phaseOutputArtifactsByRun,
+ [stateKey]: {},
+ },
+ phaseInteractionsByRun: {
+ ...get().phaseInteractionsByRun,
+ [stateKey]: {},
+ },
+ isStreamingByRun: {
+ ...get().isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...get().streamingPhaseByRun,
+ [stateKey]: null,
+ },
+ connectionStateByRun: {
+ ...get().connectionStateByRun,
+ [stateKey]: "connecting",
+ },
+ debugEventsByRun: {
+ ...get().debugEventsByRun,
+ [stateKey]: [],
+ },
+ lastEventAtByRun: {
+ ...get().lastEventAtByRun,
+ [stateKey]: null,
+ },
+ lastErrorByRun: {
+ ...get().lastErrorByRun,
+ [stateKey]: null,
+ },
});
prevStatusRef = {};
@@ -566,10 +936,20 @@ export const useWorkflowStore = create((set, get) => ({
const { activeTask, workflowState } = get();
if (!activeTask || !phase) return;
pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.PHASE_RESUME_REQUESTED, { phase, trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR }));
+ const stateKey = getWorkflowStateKey(workflowState?.taskId, workflowState?.runId);
set((state) => ({
- isStreaming: true,
- streamingPhase: phase,
- lastError: null,
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: true,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: phase,
+ },
+ lastErrorByRun: {
+ ...state.lastErrorByRun,
+ [stateKey]: null,
+ },
workflowState: state.workflowState
? {
...state.workflowState,
@@ -608,11 +988,26 @@ export const useWorkflowStore = create((set, get) => ({
...state.workflowStatesByRun,
[getWorkflowStateKey(reverted.taskId, reverted.runId)]: reverted,
},
- isStreaming: false,
- streamingPhase: null,
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [getWorkflowStateKey(reverted.taskId, reverted.runId)]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [getWorkflowStateKey(reverted.taskId, reverted.runId)]: null,
+ },
}));
} else {
- set({ isStreaming: false, streamingPhase: null });
+ set((state) => ({
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
+ },
+ }));
}
pushClientErrorEvent({ type: DEBUG_EVENT_TYPES.ERROR, message: err?.message || "Resume failed", phase });
}
@@ -622,10 +1017,20 @@ export const useWorkflowStore = create((set, get) => ({
const { activeTask, workflowState } = get();
if (!activeTask || !phase) return;
pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.PHASE_RETRY_REQUESTED, { phase, trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR }));
+ const stateKey = getWorkflowStateKey(workflowState?.taskId, workflowState?.runId);
set((state) => ({
- isStreaming: true,
- streamingPhase: phase,
- lastError: null,
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: true,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: phase,
+ },
+ lastErrorByRun: {
+ ...state.lastErrorByRun,
+ [stateKey]: null,
+ },
workflowState: state.workflowState
? {
...state.workflowState,
@@ -664,11 +1069,26 @@ export const useWorkflowStore = create((set, get) => ({
...state.workflowStatesByRun,
[getWorkflowStateKey(reverted.taskId, reverted.runId)]: reverted,
},
- isStreaming: false,
- streamingPhase: null,
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [getWorkflowStateKey(reverted.taskId, reverted.runId)]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [getWorkflowStateKey(reverted.taskId, reverted.runId)]: null,
+ },
}));
} else {
- set({ isStreaming: false, streamingPhase: null });
+ set((state) => ({
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
+ },
+ }));
}
pushClientErrorEvent({ type: DEBUG_EVENT_TYPES.ERROR, message: err?.message || "Retry failed", phase });
}
@@ -678,9 +1098,16 @@ export const useWorkflowStore = create((set, get) => ({
const { activeTask, workflowState } = get();
if (!activeTask || !phase) return;
pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.PHASE_PAUSE_REQUESTED, { phase, trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR }));
+ const stateKey = getWorkflowStateKey(workflowState?.taskId, workflowState?.runId);
set((state) => ({
- isStreaming: false,
- streamingPhase: null,
+ isStreamingByRun: {
+ ...state.isStreamingByRun,
+ [stateKey]: false,
+ },
+ streamingPhaseByRun: {
+ ...state.streamingPhaseByRun,
+ [stateKey]: null,
+ },
workflowState: state.workflowState
? {
...state.workflowState,
@@ -736,10 +1163,6 @@ export const useWorkflowStore = create((set, get) => ({
trigger: DEBUG_EVENT_TRIGGERS.TOOLBAR,
}));
try {
- if (unsubscribeWorkflowEvents) {
- unsubscribeWorkflowEvents();
- unsubscribeWorkflowEvents = null;
- }
desktopApi.detachWorkflow(targetTaskId, targetRunId);
await desktopApi.removeTask(targetTaskId, targetRunId, options);
pushClientDebugEvent(buildClientDebugPayload(DEBUG_EVENT_TYPES.TASK_DELETED, {
@@ -750,19 +1173,30 @@ export const useWorkflowStore = create((set, get) => ({
}));
const stateKey = getWorkflowStateKey(targetTaskId, targetRunId);
const { [stateKey]: _removed, ...workflowStatesByRun } = get().workflowStatesByRun;
+ const { [stateKey]: _removedSelectedPhase, ...selectedPhaseByRun } = get().selectedPhaseByRun;
+ const { [stateKey]: _removedMessages, ...phaseMessagesByRun } = get().phaseMessagesByRun;
+ const { [stateKey]: _removedArtifacts, ...phaseOutputArtifactsByRun } = get().phaseOutputArtifactsByRun;
+ const { [stateKey]: _removedInteractions, ...phaseInteractionsByRun } = get().phaseInteractionsByRun;
+ const { [stateKey]: _removedStreaming, ...isStreamingByRun } = get().isStreamingByRun;
+ const { [stateKey]: _removedStreamingPhase, ...streamingPhaseByRun } = get().streamingPhaseByRun;
+ const { [stateKey]: _removedConnection, ...connectionStateByRun } = get().connectionStateByRun;
+ const { [stateKey]: _removedDebugEvents, ...debugEventsByRun } = get().debugEventsByRun;
+ const { [stateKey]: _removedLastEventAt, ...lastEventAtByRun } = get().lastEventAtByRun;
+ const { [stateKey]: _removedLastError, ...lastErrorByRun } = get().lastErrorByRun;
set({
activeTask: null,
workflowState: null,
workflowStatesByRun,
- phaseMessages: {},
- phaseOutputArtifacts: {},
- phaseInteractions: {},
- isStreaming: false,
- streamingPhase: null,
- connectionState: "disconnected",
- debugEvents: [],
- lastEventAt: null,
- lastError: null,
+ selectedPhaseByRun,
+ phaseMessagesByRun,
+ phaseOutputArtifactsByRun,
+ phaseInteractionsByRun,
+ isStreamingByRun,
+ streamingPhaseByRun,
+ connectionStateByRun,
+ debugEventsByRun,
+ lastEventAtByRun,
+ lastErrorByRun,
});
await useConfigStore.getState().loadWorkFolders();
return true;
diff --git a/docs/todo/todo-agent-activity-terminal-mode.zh.md b/docs/todo/todo-agent-activity-terminal-mode.zh.md
index 6c9319d..454728d 100644
--- a/docs/todo/todo-agent-activity-terminal-mode.zh.md
+++ b/docs/todo/todo-agent-activity-terminal-mode.zh.md
@@ -118,6 +118,8 @@ StepDetail Terminal Tab
- SDK 模式:继续用于结构化 workflow 执行、artifact、state、mobile relay
- CLI/PTY 模式:用于高级 terminal 体验和调试
+TODO:评估并实现用 Codex CLI / Claude Code CLI 直接替换当前 `@openai/codex-sdk` / `@anthropic-ai/claude-agent-sdk` adapter。目标是拿到 CLI 原生能力和最新 flags,例如 Codex `--dangerously-bypass-approvals-and-sandbox`、Claude Code `--dangerously-skip-permissions`,同时保持 workflow state、artifact、session resume 和 debug event 的回写能力。
+
注意:PTY 模式不能绕开 workflow state。即使 terminal 显示原生 CLI 输出,最终 step 状态、artifact、session id、错误信息仍然要回写到当前 task run。
### Resume 策略
diff --git a/package.json b/package.json
index fea1be1..d3f5fd3 100644
--- a/package.json
+++ b/package.json
@@ -11,14 +11,16 @@
"prepare:electron": "tsc -p tsconfig.preload.json",
"dev:electron": "pnpm run prepare:electron && wait-on http://127.0.0.1:${RENDERER_PORT:-5173} && ELECTRON_RENDERER_URL=http://127.0.0.1:${RENDERER_PORT:-5173} NODE_OPTIONS='--import tsx' electronmon .",
"dev:desktop-ui": "concurrently -k --names \"renderer,electron\" --prefix name --prefix-colors \"blue,magenta\" \"pnpm exec tsx scripts/run-if-port-free.ts ${RENDERER_PORT:-5173} pnpm dev:renderer\" \"pnpm dev:electron\"",
- "dev:desktop": "concurrently -k --names \"desktop,server,connector\" --prefix name --prefix-colors \"green,blue,cyan\" \"pnpm run dev:desktop-ui\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm run dev:connector\"",
+ "dev:desktop": "pnpm exec tsx scripts/dev-desktop-instance.ts",
+ "dev:desktop:raw": "concurrently -k --names \"desktop,server,connector\" --prefix name --prefix-colors \"green,blue,cyan\" \"pnpm run dev:desktop-ui\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm run dev:connector\"",
"dev:backend": "pnpm exec tsx watch apps/backend/src/server.ts",
"dev:mobile": "cd apps/mobile && flutter run",
"dev:server": "pnpm exec tsx apps/desktop/server.ts",
"dev:connector": "pnpm exec tsx apps/desktop/connector/src/index.ts",
"dev:desktop-stack": "concurrently -k --names \"server,backend,connector\" --prefix name --prefix-colors \"blue,magenta,cyan\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\" \"pnpm run dev:connector\"",
"dev:mobile-stack": "concurrently -k --names \"server,backend,connector\" --prefix name --prefix-colors \"blue,magenta,cyan\" \"pnpm exec tsx scripts/run-if-port-free.ts ${LOCAL_SERVER_PORT:-3000} pnpm run dev:server\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\" \"pnpm run dev:connector\"",
- "dev:all": "concurrently -k --names \"desktop,backend\" --prefix name --prefix-colors \"green,magenta\" \"pnpm run dev:desktop\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\"",
+ "dev:all": "pnpm exec tsx scripts/dev-desktop-instance.ts",
+ "dev:all:raw": "concurrently -k --names \"desktop,backend\" --prefix name --prefix-colors \"green,magenta\" \"pnpm run dev:desktop:raw\" \"pnpm exec tsx scripts/run-if-port-free.ts ${BACKEND_PORT:-8787} pnpm run dev:backend\"",
"dev:desktop:instance": "pnpm exec tsx scripts/dev-desktop-instance.ts",
"build:desktop": "pnpm run prepare:electron && pnpm --dir apps/desktop/renderer build",
"package:desktop": "pnpm build:desktop && electron-forge package",
@@ -36,7 +38,6 @@
"@fastify/websocket": "^11.2.0",
"@langchain/langgraph": "^1.3.0",
"@openai/codex": "^0.128.0",
- "@openai/codex-sdk": "^0.128.0",
"cors": "^2.8.5",
"express": "^4.21.0",
"fastify": "^5.6.1",
diff --git a/packages/core-lib/langgraph-runtime/app-adapter.ts b/packages/core-lib/langgraph-runtime/app-adapter.ts
index fc1004d..440df85 100644
--- a/packages/core-lib/langgraph-runtime/app-adapter.ts
+++ b/packages/core-lib/langgraph-runtime/app-adapter.ts
@@ -2,7 +2,7 @@ import OpenAI from "openai";
import { mkdir, readFile, writeFile } from "fs/promises";
import { dirname, relative, resolve } from "path";
import { readAiApiProfileSync, readAiApiProfilesSync } from "../../core-models/config";
-import { appendPhaseInteraction, appendToPhaseFile, readState, taskDir, updatePhaseStatus, writeState } from "../../core-models/state";
+import { appendPhaseInteraction, appendToPhaseFile, readPhaseInteractions, readState, taskDir, updatePhaseStatus, writeState } from "../../core-models/state";
import { createSdkAgentAdapter } from "./sdk-agent-adapter";
import { createContentPreview, createContentSummary, createStepOutputMetadata, formatStepOutputForPrompt } from "./artifacts";
@@ -72,6 +72,28 @@ function renderAiApiPrompt(step, state, inputSections = []) {
return parts.filter(Boolean).join("\n\n");
}
+function syncStepsFromPhases(state) {
+ for (const step of state.steps || []) {
+ const phase = state.phases.find((item) => item.id === step.id);
+ if (phase) Object.assign(step, phase);
+ }
+}
+
+function markRunningStepInState(state, phase) {
+ const phaseIndex = (state.phases || []).findIndex((item) => item.id === phase);
+ for (let i = 0; i < (state.phases || []).length; i += 1) {
+ const item = state.phases[i];
+ if (item.id === phase) {
+ updatePhaseStatus(state, item.id, "in_progress", item.sessionId);
+ } else if (phaseIndex >= 0 && i > phaseIndex) {
+ updatePhaseStatus(state, item.id, "pending", item.sessionId);
+ } else if (item.status === "in_progress" || item.status === "awaiting_input" || item.status === "paused") {
+ updatePhaseStatus(state, item.id, "completed", item.sessionId);
+ }
+ }
+ syncStepsFromPhases(state);
+}
+
async function readInputContent(input, state) {
if (!input) return "";
if (input.sourceType === "task_input") return state.taskInputs?.[input.name] || "";
@@ -116,6 +138,38 @@ async function publishCheckpointOutput({ taskId, runId, step, state, rule }) {
}
export function createAppSdkAgentAdapter({ taskId, runId, send, workFolder, taskRunDir, imagePaths = [], abortController, aiBackendOverride = "" }) {
+ async function markStepRunning(phase) {
+ if (!phase) return;
+ const state = await readState(taskId, runId);
+ state.currentPhase = phase;
+ state.currentStep = phase;
+ state.overallStatus = "in_progress";
+
+ markRunningStepInState(state, phase);
+
+ await writeState(taskId, state);
+ }
+
+ async function appendPhaseStart(phase, backend) {
+ try {
+ const existingInteractions = await readPhaseInteractions(taskId, phase, runId);
+ const startIndexes = existingInteractions
+ .filter((item) => item?.type === "phase_start")
+ .map((item) => Number(item.runIndex) || 0);
+ const lastRunIndex = Math.max(0, ...startIndexes);
+ const hasEarlierConversation = existingInteractions.some((item) => item?.type !== "phase_start");
+ const runIndex = lastRunIndex > 0 ? lastRunIndex + 1 : hasEarlierConversation ? 2 : 1;
+ const interaction = await appendPhaseInteraction(taskId, phase, {
+ role: "system",
+ type: "phase_start",
+ text: "",
+ backend,
+ runIndex,
+ }, runId);
+ if (interaction) send({ type: "phase_interaction", phase, interaction });
+ } catch {}
+ }
+
const adapter = createSdkAgentAdapter({
workFolder,
taskDir: taskRunDir,
@@ -166,6 +220,8 @@ export function createAppSdkAgentAdapter({ taskId, runId, send, workFolder, task
if (!profile.model) throw new Error(`AI API profile ${profile.name} is missing a model`);
const backend = `ai-api:${profile.name || profile.id}`;
+ await markStepRunning(step.id);
+ await appendPhaseStart(step.id, backend);
send({ type: "backend_selected", phase: step.id, backend, mode: "workflow" });
const inputSections = [];
for (const input of step.inputs || []) {
@@ -277,6 +333,8 @@ export function createAppSdkAgentAdapter({ taskId, runId, send, workFolder, task
const useBackendOverride = Boolean(aiBackendOverride && !hasStepBackend && agent.backend !== "ai_api");
const runtimeAgent = useBackendOverride ? { ...agent, backend: aiBackendOverride, model: "" } : agent;
if (runtimeAgent.backend === "ai_api") return runAiApiStep({ step, state });
+ await markStepRunning(step.id);
+ await appendPhaseStart(step.id, runtimeAgent.backend);
send({ type: "backend_selected", phase: step.id, backend: runtimeAgent.backend, mode: useBackendOverride ? "app" : "workflow" });
const result = await adapter.runAgent({
step,
diff --git a/packages/core-lib/langgraph-runtime/file-checkpointer.ts b/packages/core-lib/langgraph-runtime/file-checkpointer.ts
index 446303a..1812bb9 100644
--- a/packages/core-lib/langgraph-runtime/file-checkpointer.ts
+++ b/packages/core-lib/langgraph-runtime/file-checkpointer.ts
@@ -2,6 +2,8 @@ import { readFile, writeFile, mkdir } from "fs/promises";
import { dirname, join } from "path";
import { BaseCheckpointSaver, WRITES_IDX_MAP, TASKS, copyCheckpoint, getCheckpointId, maxChannelVersion } from "@langchain/langgraph-checkpoint";
+const SERIALIZED_UINT8_ARRAY = "__serialized_uint8_array__";
+
function generateKey(threadId, checkpointNamespace, checkpointId) {
return JSON.stringify([threadId, checkpointNamespace || "", checkpointId]);
}
@@ -24,6 +26,33 @@ async function writeJson(file, value) {
await writeFile(file, JSON.stringify(value, null, 2));
}
+function encodeSerializedValue(value) {
+ if (value instanceof Uint8Array) {
+ return {
+ type: SERIALIZED_UINT8_ARRAY,
+ data: Buffer.from(value).toString("base64"),
+ };
+ }
+ return value;
+}
+
+function decodeSerializedValue(value) {
+ if (value?.type === SERIALIZED_UINT8_ARRAY && typeof value.data === "string") {
+ return Uint8Array.from(Buffer.from(value.data, "base64"));
+ }
+
+ if (value && typeof value === "object" && !Array.isArray(value)) {
+ const keys = Object.keys(value);
+ if (keys.length && keys.every((key) => /^\d+$/.test(key))) {
+ const sortedKeys = keys.map(Number).sort((a, b) => a - b);
+ const isContiguous = sortedKeys.every((key, index) => key === index);
+ if (isContiguous) return Uint8Array.from(sortedKeys.map((key) => value[key]));
+ }
+ }
+
+ return value;
+}
+
export class FileCheckpointSaver extends BaseCheckpointSaver {
constructor(rootDir, serde) {
super(serde);
@@ -50,7 +79,7 @@ export class FileCheckpointSaver extends BaseCheckpointSaver {
const parentKey = generateKey(threadId, checkpointNs, parentCheckpointId);
const pendingSends = await Promise.all(Object.values(this.writes[parentKey] ?? {})
.filter(([_taskId, channel]) => channel === TASKS)
- .map(async ([_taskId, _channel, writes]) => await this.serde.loadsTyped("json", writes)));
+ .map(async ([_taskId, _channel, writes]) => await this.serde.loadsTyped("json", decodeSerializedValue(writes))));
mutableCheckpoint.channel_values ??= {};
mutableCheckpoint.channel_values[TASKS] = pendingSends;
mutableCheckpoint.channel_versions ??= {};
@@ -62,19 +91,19 @@ export class FileCheckpointSaver extends BaseCheckpointSaver {
async createTuple(threadId, checkpointNamespace, checkpointId, saved, config) {
const [checkpoint, metadata, parentCheckpointId] = saved;
const key = generateKey(threadId, checkpointNamespace, checkpointId);
- const deserializedCheckpoint = await this.serde.loadsTyped("json", checkpoint);
+ const deserializedCheckpoint = await this.serde.loadsTyped("json", decodeSerializedValue(checkpoint));
if (deserializedCheckpoint.v < 4 && parentCheckpointId !== undefined) {
await this.migratePendingSends(deserializedCheckpoint, threadId, checkpointNamespace, parentCheckpointId);
}
const pendingWrites = await Promise.all(Object.values(this.writes[key] || {}).map(async ([taskId, channel, value]) => [
taskId,
channel,
- await this.serde.loadsTyped("json", value),
+ await this.serde.loadsTyped("json", decodeSerializedValue(value)),
]));
const checkpointTuple = {
config,
checkpoint: deserializedCheckpoint,
- metadata: await this.serde.loadsTyped("json", metadata),
+ metadata: await this.serde.loadsTyped("json", decodeSerializedValue(metadata)),
pendingWrites,
};
if (parentCheckpointId !== undefined) {
@@ -130,7 +159,7 @@ export class FileCheckpointSaver extends BaseCheckpointSaver {
for (const [checkpointId, saved] of sortedCheckpoints) {
if (configCheckpointId && checkpointId !== configCheckpointId) continue;
if (before?.configurable?.checkpoint_id && checkpointId >= before.configurable.checkpoint_id) continue;
- const metadata = await this.serde.loadsTyped("json", saved[1]);
+ const metadata = await this.serde.loadsTyped("json", decodeSerializedValue(saved[1]));
if (filter && !Object.entries(filter).every(([key, value]) => metadata[key] === value)) continue;
if (limit !== undefined) {
if (limit <= 0) break;
@@ -161,8 +190,8 @@ export class FileCheckpointSaver extends BaseCheckpointSaver {
this.serde.dumpsTyped(metadata),
]);
this.storage[threadId][checkpointNamespace][checkpoint.id] = [
- serializedCheckpoint,
- serializedMetadata,
+ encodeSerializedValue(serializedCheckpoint),
+ encodeSerializedValue(serializedMetadata),
config.configurable?.checkpoint_id,
];
await this.persist();
@@ -190,7 +219,7 @@ export class FileCheckpointSaver extends BaseCheckpointSaver {
const innerKey = [taskId, WRITES_IDX_MAP[channel] || index];
const innerKeyStr = `${innerKey[0]},${innerKey[1]}`;
if (innerKey[1] >= 0 && existingWrites && innerKeyStr in existingWrites) return;
- this.writes[outerKey][innerKeyStr] = [taskId, channel, serializedValue];
+ this.writes[outerKey][innerKeyStr] = [taskId, channel, encodeSerializedValue(serializedValue)];
}));
await this.persist();
}
diff --git a/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts b/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts
index 8d84f11..d5ac6b8 100644
--- a/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts
+++ b/packages/core-lib/langgraph-runtime/sdk-agent-adapter.ts
@@ -1,13 +1,19 @@
import { query } from "@anthropic-ai/claude-agent-sdk";
-import { Codex } from "@openai/codex-sdk";
+import { execFile, spawn } from "child_process";
+import { createRequire } from "module";
+import readline from "readline";
import { mkdir, readFile, writeFile } from "fs/promises";
-import { dirname, extname, join, relative, resolve } from "path";
+import { basename, dirname, extname, join, relative, resolve } from "path";
import { createContentPreview, createContentSummary, formatStepOutputForPrompt } from "./artifacts";
const SDK_BACKENDS = {
CLAUDE: "claude",
CODEX: "codex",
};
+const require = createRequire(import.meta.url);
+const SHELL_ENV_MARKER = "__DEV_WORKFLOW_SHELL_ENV__";
+const SHELL_ENV_TIMEOUT_MS = 5000;
+const shellEnvCache = new Map();
function isPathInside(parent, child) {
const rel = relative(resolve(parent), resolve(child));
@@ -93,6 +99,18 @@ function buildPrompt({ step, state }) {
return parts.filter(Boolean).join("\n\n");
}
+function buildWorkspaceAccessPrompt(workspaceWrite, workFolder, taskDir) {
+ if (workspaceWrite) return "";
+ return [
+ "# Workspace access",
+ "",
+ "The project workspace is read-only for this step.",
+ `You may read files in: ${workFolder || ""}`,
+ `You may write only task artifact files under: ${taskDir || ""}`,
+ "Do not create, edit, move, or delete files in the project workspace.",
+ ].join("\n");
+}
+
function guessImageMediaType(filePath) {
const ext = extname(filePath).toLowerCase();
if (ext === ".jpg" || ext === ".jpeg") return "image/jpeg";
@@ -131,22 +149,70 @@ async function buildClaudePrompt(prompt, imagePaths) {
})();
}
-function buildCodexInput(prompt, imagePaths) {
- if (!imagePaths || imagePaths.length === 0) return prompt;
- const input = [];
- if (prompt) input.push({ type: "text", text: prompt });
- for (const imagePath of imagePaths) input.push({ type: "local_image", path: imagePath });
- return input;
+function getShellArgs(shell, command) {
+ const name = basename(shell || "");
+ if (name.includes("bash") || name.includes("zsh")) return ["-lc", command];
+ if (name.includes("fish")) return ["-lc", command];
+ return ["-lc", command];
}
-function buildCodexEnv() {
+function parseShellEnv(raw) {
+ const entries = String(raw || "").split("\0");
+ const markerIndex = entries.indexOf(SHELL_ENV_MARKER);
+ if (markerIndex < 0) return {};
+ const envEntries = entries.slice(markerIndex + 1);
+ const env = {};
+ for (const entry of envEntries) {
+ const index = entry.indexOf("=");
+ if (index <= 0) continue;
+ env[entry.slice(0, index)] = entry.slice(index + 1);
+ }
+ return env;
+}
+
+function readUserShellEnv(workFolder) {
+ const shell = process.env.SHELL || "/bin/sh";
+ const cwd = workFolder || process.cwd();
+ const cacheKey = `${shell}:${cwd}`;
+ if (shellEnvCache.has(cacheKey)) return shellEnvCache.get(cacheKey);
+
+ const env = { ...process.env };
+ delete env.NODE_OPTIONS;
+ const command = `printf '${SHELL_ENV_MARKER}\\0'; env -0`;
+ const promise = new Promise((resolve) => {
+ execFile(shell, getShellArgs(shell, command), {
+ cwd,
+ env,
+ maxBuffer: 1024 * 1024,
+ timeout: SHELL_ENV_TIMEOUT_MS,
+ }, (error, stdout) => {
+ if (error) {
+ resolve({});
+ return;
+ }
+ resolve(parseShellEnv(stdout));
+ });
+ });
+ shellEnvCache.set(cacheKey, promise);
+ return promise;
+}
+
+async function buildCodexEnv(workFolder) {
const env = {};
for (const [key, value] of Object.entries(process.env)) {
+ if (value !== undefined && key !== "CODEX_API_KEY" && key !== "NODE_OPTIONS") env[key] = value;
+ }
+ const shellEnv = await readUserShellEnv(workFolder);
+ for (const [key, value] of Object.entries(shellEnv)) {
if (value !== undefined && key !== "CODEX_API_KEY") env[key] = value;
}
return env;
}
+function getCodexCliPath() {
+ return require.resolve("@openai/codex/bin/codex.js");
+}
+
function getToolFilePath(input) {
return input?.file_path || input?.path || input?.notebook_path || "";
}
@@ -268,71 +334,98 @@ async function streamClaudeSdk({ prompt, workFolder, taskDir, sessionId, imagePa
return { sessionId: nextSessionId || sessionId || "", fallbackResult };
}
-async function streamCodexSdk({ prompt, workFolder, sessionId, imagePaths, abortController, workspaceWrite, agent, onText, onTool, onSession }) {
- const codex = new Codex({
- env: buildCodexEnv(),
- ...(agent.options?.client || {}),
+async function streamCodexCli({ prompt, workFolder, taskDir, sessionId, imagePaths, abortController, workspaceWrite, agent, onText, onTool, onSession }) {
+ const workingDirectory = workspaceWrite ? workFolder : taskDir;
+ const args = [
+ "exec",
+ "--json",
+ "--sandbox",
+ workspaceWrite ? "danger-full-access" : "workspace-write",
+ "--cd",
+ workingDirectory,
+ "--skip-git-repo-check",
+ "--config",
+ "approval_policy=\"never\"",
+ "--config",
+ "sandbox_workspace_write.network_access=true",
+ ];
+
+ const modelReasoningEffort = agent.options?.thread?.modelReasoningEffort || "";
+ if (agent.model) args.push("--model", agent.model);
+ if (modelReasoningEffort) args.push("--config", `model_reasoning_effort="${modelReasoningEffort}"`);
+ for (const imagePath of imagePaths || []) args.push("--image", imagePath);
+ if (sessionId) args.push("resume", sessionId);
+
+ const child = spawn(getCodexCliPath(), args, {
+ env: await buildCodexEnv(workFolder),
+ signal: abortController.signal,
});
- const threadOptions = {
- workingDirectory: workFolder,
- sandboxMode: workspaceWrite ? "danger-full-access" : "read-only",
- skipGitRepoCheck: true,
- approvalPolicy: "never",
- networkAccessEnabled: true,
- model: agent.model || undefined,
- ...(agent.options?.thread || {}),
- };
- const thread = sessionId
- ? codex.resumeThread(sessionId, threadOptions)
- : codex.startThread(threadOptions);
- const input = buildCodexInput(prompt, imagePaths);
- const { events } = await thread.runStreamed(input, { signal: abortController.signal });
+ let spawnError = null;
+ const stderrChunks = [];
+ child.once("error", (error) => { spawnError = error; });
+ child.stderr?.on("data", (chunk) => stderrChunks.push(chunk));
+ if (!child.stdin) throw new Error("Codex CLI has no stdin");
+ if (!child.stdout) throw new Error("Codex CLI has no stdout");
+
+ child.stdin.write(prompt);
+ child.stdin.end();
+
+ const exitPromise = new Promise((resolve) => {
+ child.once("exit", (code, signal) => resolve({ code, signal }));
+ });
+ const events = readline.createInterface({ input: child.stdout, crlfDelay: Infinity });
const seenItemText = new Map();
const seenToolItems = new Set();
- let completed = false;
- for await (const event of events) {
- if (event.type === "turn.completed") {
- completed = true;
- break;
- }
+ try {
+ for await (const line of events) {
+ if (!String(line || "").trim()) continue;
+ const event = JSON.parse(line);
+ if (event.type === "turn.completed") break;
- if (event.type === "thread.started") {
- await onSession(event.thread_id);
- continue;
- }
+ if (event.type === "thread.started") {
+ await onSession(event.thread_id);
+ continue;
+ }
- if (event.type === "item.started" || event.type === "item.updated" || event.type === "item.completed") {
- const item = event.item;
- if (item.type === "agent_message") {
- const previous = seenItemText.get(item.id) || "";
- const next = item.text || "";
- if (next.startsWith(previous)) {
- const delta = next.slice(previous.length);
- if (delta) await onText(delta);
- } else if (next && next !== previous) {
- await onText(next);
+ if (event.type === "item.started" || event.type === "item.updated" || event.type === "item.completed") {
+ const item = event.item;
+ if (item.type === "agent_message") {
+ const previous = seenItemText.get(item.id) || "";
+ const next = item.text || "";
+ if (next.startsWith(previous)) {
+ const delta = next.slice(previous.length);
+ if (delta) await onText(delta);
+ } else if (next && next !== previous) {
+ await onText(next);
+ }
+ seenItemText.set(item.id, next);
+ continue;
+ }
+
+ if (
+ (item.type === "command_execution" || item.type === "mcp_tool_call" || item.type === "web_search" || item.type === "file_change" || item.type === "todo_list")
+ && !seenToolItems.has(item.id)
+ ) {
+ seenToolItems.add(item.id);
+ await onTool(formatCodexToolLog(item));
}
- seenItemText.set(item.id, next);
continue;
}
- if (
- (item.type === "command_execution" || item.type === "mcp_tool_call" || item.type === "web_search" || item.type === "file_change" || item.type === "todo_list")
- && !seenToolItems.has(item.id)
- ) {
- seenToolItems.add(item.id);
- await onTool(formatCodexToolLog(item));
- }
- continue;
+ if (event.type === "turn.failed") throw new Error(event.error?.message || "Codex CLI run failed");
+ if (event.type === "error") throw new Error(event.message || "Codex CLI run failed");
}
- if (event.type === "turn.failed") throw new Error(event.error?.message || "Codex run failed");
- if (event.type === "error") throw new Error(event.message || "Codex run failed");
+ if (spawnError) throw spawnError;
+ const { code, signal } = await exitPromise;
+ if (code !== 0 || signal) {
+ const detail = signal ? `signal ${signal}` : `code ${code ?? 1}`;
+ throw new Error(`Codex CLI exited with ${detail}: ${Buffer.concat(stderrChunks).toString("utf8")}`);
+ }
+ } finally {
+ events.close();
}
-
- if (thread.id) await onSession(thread.id);
- return { sessionId: thread.id || sessionId || "", completed };
}
function resolveOutputPath(taskDir, step, state) {
@@ -375,7 +468,10 @@ export function createSdkAgentAdapter(options = {}) {
];
const abortController = options.abortController || new AbortController();
const workspaceWrite = canWriteWorkspace(step, agent);
- const prompt = buildPrompt({ step, state });
+ const prompt = [
+ buildWorkspaceAccessPrompt(workspaceWrite, workFolder, taskDir),
+ buildPrompt({ step, state }),
+ ].filter(Boolean).join("\n\n");
let content = "";
let nextSessionId = sessionId || "";
@@ -409,9 +505,10 @@ export function createSdkAgentAdapter(options = {}) {
nextSessionId = result.sessionId || nextSessionId;
if (!content && result.fallbackResult) content = result.fallbackResult;
} else if (backend === SDK_BACKENDS.CODEX) {
- const result = await streamCodexSdk({
+ await streamCodexCli({
prompt,
workFolder,
+ taskDir,
sessionId,
imagePaths,
abortController,
@@ -419,7 +516,6 @@ export function createSdkAgentAdapter(options = {}) {
agent,
...callbacks,
});
- nextSessionId = result.sessionId || nextSessionId;
} else {
throw new Error(`unsupported SDK agent backend ${backend}`);
}
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index b84fddf..768cbe6 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -23,9 +23,6 @@ importers:
'@openai/codex':
specifier: ^0.128.0
version: 0.128.0
- '@openai/codex-sdk':
- specifier: ^0.128.0
- version: 0.128.0
cors:
specifier: ^2.8.5
version: 2.8.6
@@ -1087,10 +1084,6 @@ packages:
'@octokit/types@6.41.0':
resolution: {integrity: sha512-eJ2jbzjdijiL3B4PrSQaSjuF2sPEQPVCPzBvTHJD9Nz+9dw2SGH4K4xeQJ77YfTq5bRQ+bD8wT11JbeDPmxmGg==}
- '@openai/codex-sdk@0.128.0':
- resolution: {integrity: sha512-Eao0LLA5x90qwU6SXYd21h4KxdCef1WpCvHFgKdbqzWMJ79lUvguGDGvx1RheP+zTdKGxJfJ6dulI5wSXoUBhQ==}
- engines: {node: '>=18'}
-
'@openai/codex@0.128.0':
resolution: {integrity: sha512-+xp6ODmFfBNnexIWRHApEaPXot2j6gyM8A5we/5IS/uY4eYHj4arETct4hQ5M4eO+MK7JY3ZU4xhuobhlysr0A==}
engines: {node: '>=16'}
@@ -5859,10 +5852,6 @@ snapshots:
dependencies:
'@octokit/openapi-types': 12.11.0
- '@openai/codex-sdk@0.128.0':
- dependencies:
- '@openai/codex': 0.128.0
-
'@openai/codex@0.128.0':
optionalDependencies:
'@openai/codex-darwin-arm64': '@openai/codex@0.128.0-darwin-arm64'
diff --git a/scripts/dev-desktop-instance.ts b/scripts/dev-desktop-instance.ts
index 8359cec..e9bc177 100644
--- a/scripts/dev-desktop-instance.ts
+++ b/scripts/dev-desktop-instance.ts
@@ -1,18 +1,22 @@
import { spawn } from "child_process";
-import { join } from "path";
+import net from "net";
+import { basename, join } from "path";
import { mkdirSync } from "fs";
type Options = {
name: string;
+ customName: boolean;
portOffset: number;
+ autoPortOffset: boolean;
};
const BASE_RENDERER_PORT = 5173;
const BASE_LOCAL_SERVER_PORT = 3000;
const BASE_BACKEND_PORT = 8787;
+const MAX_PORT_OFFSET = 200;
function printUsage() {
- console.log("usage: pnpm dev:desktop:instance --name
[--port-offset ]");
+ console.log("usage: pnpm dev:desktop:instance [--name ] [--port-offset ]");
}
function normalizeName(value: string) {
@@ -25,7 +29,9 @@ function normalizeName(value: string) {
function parseArgs(argv: string[]): Options {
let name = "";
- let portOffset = Number(process.env.PORT_OFFSET || "0");
+ let customName = false;
+ let portOffset = process.env.PORT_OFFSET ? Number(process.env.PORT_OFFSET) : 0;
+ let autoPortOffset = !process.env.PORT_OFFSET;
for (let index = 0; index < argv.length; index++) {
const arg = argv[index];
@@ -35,42 +41,91 @@ function parseArgs(argv: string[]): Options {
}
if (arg === "--name") {
name = argv[++index] || "";
+ customName = true;
continue;
}
if (arg.startsWith("--name=")) {
name = arg.slice("--name=".length);
+ customName = true;
continue;
}
if (arg === "--port-offset") {
portOffset = Number(argv[++index] || "");
+ autoPortOffset = false;
continue;
}
if (arg.startsWith("--port-offset=")) {
portOffset = Number(arg.slice("--port-offset=".length));
+ autoPortOffset = false;
continue;
}
throw new Error(`unknown argument: ${arg}`);
}
- const normalizedName = normalizeName(name);
+ const normalizedName = normalizeName(name || basename(process.cwd()) || "desktop");
if (!normalizedName) {
- throw new Error("--name is required");
+ throw new Error("instance name is required");
}
if (!Number.isInteger(portOffset) || portOffset < 0) {
throw new Error("--port-offset must be a non-negative integer");
}
- return { name: normalizedName, portOffset };
+ return { name: normalizedName, customName, portOffset, autoPortOffset };
+}
+
+function isPortOpen(port: number, host = "127.0.0.1") {
+ return new Promise((resolve) => {
+ const socket = net.createConnection({ port, host });
+ socket.once("connect", () => {
+ socket.destroy();
+ resolve(true);
+ });
+ socket.once("error", () => {
+ resolve(false);
+ });
+ });
+}
+
+function getPorts(portOffset: number) {
+ return {
+ rendererPort: BASE_RENDERER_PORT + portOffset,
+ localServerPort: BASE_LOCAL_SERVER_PORT + portOffset,
+ backendPort: BASE_BACKEND_PORT + portOffset,
+ };
+}
+
+async function isPortOffsetAvailable(portOffset: number) {
+ const ports = getPorts(portOffset);
+ const occupied = await Promise.all([
+ isPortOpen(ports.rendererPort),
+ isPortOpen(ports.localServerPort),
+ isPortOpen(ports.backendPort),
+ ]);
+ return occupied.every((value) => !value);
+}
+
+async function findAvailablePortOffset(startOffset: number) {
+ for (let offset = startOffset; offset <= MAX_PORT_OFFSET; offset += 1) {
+ if (await isPortOffsetAvailable(offset)) return offset;
+ }
+ throw new Error(`no available port offset found between ${startOffset} and ${MAX_PORT_OFFSET}`);
}
async function main() {
- const { name, portOffset } = parseArgs(process.argv.slice(2));
- const rendererPort = BASE_RENDERER_PORT + portOffset;
- const localServerPort = BASE_LOCAL_SERVER_PORT + portOffset;
- const backendPort = BASE_BACKEND_PORT + portOffset;
- const userDataDir = join(process.cwd(), ".dev-instances", name);
+ const options = parseArgs(process.argv.slice(2));
+ const portOffset = options.autoPortOffset
+ ? await findAvailablePortOffset(options.portOffset)
+ : options.portOffset;
+ if (!options.autoPortOffset && !(await isPortOffsetAvailable(portOffset))) {
+ throw new Error(`port offset ${portOffset} is already in use`);
+ }
+ const { rendererPort, localServerPort, backendPort } = getPorts(portOffset);
+ const name = options.name;
+ const userDataDir = options.customName
+ ? join(process.cwd(), ".dev-instances", name)
+ : process.env.DEV_WORKFLOW_USER_DATA_DIR || "";
- mkdirSync(userDataDir, { recursive: true });
+ if (userDataDir) mkdirSync(userDataDir, { recursive: true });
const env = {
...process.env,
@@ -85,16 +140,17 @@ async function main() {
DEVICE_ID: `desktop-${name}`,
DEVICE_NAME: `Desktop ${name}`,
DEV_WORKFLOW_INSTANCE_NAME: name,
- DEV_WORKFLOW_USER_DATA_DIR: userDataDir,
};
+ if (userDataDir) env.DEV_WORKFLOW_USER_DATA_DIR = userDataDir;
console.log(`[dev-instance] name: ${name}`);
+ console.log(`[dev-instance] port offset: ${portOffset}${options.autoPortOffset ? " (auto)" : ""}`);
console.log(`[dev-instance] renderer: http://127.0.0.1:${rendererPort}`);
console.log(`[dev-instance] local server: http://127.0.0.1:${localServerPort}`);
console.log(`[dev-instance] backend: http://127.0.0.1:${backendPort}`);
- console.log(`[dev-instance] user data: ${userDataDir}`);
+ console.log(`[dev-instance] user data: ${userDataDir || "default worktree-scoped directory"}`);
- const child = spawn("pnpm", ["run", "dev:all"], {
+ const child = spawn("pnpm", ["run", "dev:all:raw"], {
env,
stdio: "inherit",
shell: false,