Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions apps/twig/src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,23 +431,10 @@ process.on("SIGTERM", () => handleShutdownSignal("SIGTERM"));
process.on("SIGINT", () => handleShutdownSignal("SIGINT"));
process.on("SIGHUP", () => handleShutdownSignal("SIGHUP"));

// Handle uncaught exceptions to attempt cleanup before crash
process.on("uncaughtException", async (_error) => {
try {
const lifecycleService = container.get<AppLifecycleService>(
MAIN_TOKENS.AppLifecycleService,
);
await lifecycleService.shutdown();
} catch (_cleanupErr) {}
process.exit(1);
process.on("uncaughtException", (_error) => {
// TODO: Report to error tracking service
});

process.on("unhandledRejection", async (_reason) => {
try {
const lifecycleService = container.get<AppLifecycleService>(
MAIN_TOKENS.AppLifecycleService,
);
await lifecycleService.shutdown();
} catch (_cleanupErr) {}
process.exit(1);
process.on("unhandledRejection", (_reason, _promise) => {
// TODO: Report to error tracking service
});
55 changes: 41 additions & 14 deletions apps/twig/src/main/services/agent/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,25 @@ function createTappedReadableStream(

return new ReadableStream<Uint8Array>({
async pull(controller) {
const { value, done } = await reader.read();
if (done) {
try {
const { value, done } = await reader.read();
if (done) {
controller.close();
return;
}
tap.process(value);
controller.enqueue(value);
} catch (err) {
// Stream may be closed if subprocess crashed - close gracefully
log.warn("Stream read failed (subprocess may have crashed)", {
error: err,
});
controller.close();
return;
}
tap.process(value);
controller.enqueue(value);
},
cancel() {
// Release the reader when stream is cancelled
reader.releaseLock();
},
});
}
Expand All @@ -91,19 +103,34 @@ function createTappedWritableStream(
return new WritableStream<Uint8Array>({
async write(chunk) {
tap.process(chunk);
const writer = underlying.getWriter();
await writer.write(chunk);
writer.releaseLock();
try {
const writer = underlying.getWriter();
await writer.write(chunk);
writer.releaseLock();
} catch (err) {
// Stream may be closed if subprocess crashed - log but don't throw
log.warn("Stream write failed (subprocess may have crashed)", {
error: err,
});
}
},
async close() {
const writer = underlying.getWriter();
await writer.close();
writer.releaseLock();
try {
const writer = underlying.getWriter();
await writer.close();
writer.releaseLock();
} catch {
// Stream may already be closed
}
},
async abort(reason) {
const writer = underlying.getWriter();
await writer.abort(reason);
writer.releaseLock();
try {
const writer = underlying.getWriter();
await writer.abort(reason);
writer.releaseLock();
} catch {
// Stream may already be closed
}
},
});
}
Expand Down
149 changes: 118 additions & 31 deletions apps/twig/src/renderer/features/sessions/stores/sessionStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ interface SessionActions {
customInput?: string,
) => Promise<void>;
cancelPermission: (taskId: string, toolCallId: string) => Promise<void>;
clearSessionError: (taskId: string) => void;
clearSessionError: (taskId: string) => Promise<void>;
}

interface AuthCredentials {
Expand Down Expand Up @@ -651,38 +651,52 @@ const useStore = create<SessionStore>()(
addSession(session);
subscribeToChannel(taskRunId);

const result = await trpcVanilla.agent.reconnect.mutate({
taskId,
taskRunId,
repoPath,
apiKey: auth.apiKey,
apiHost: auth.apiHost,
projectId: auth.projectId,
logUrl,
sdkSessionId,
});
try {
const result = await trpcVanilla.agent.reconnect.mutate({
taskId,
taskRunId,
repoPath,
apiKey: auth.apiKey,
apiHost: auth.apiHost,
projectId: auth.projectId,
logUrl,
sdkSessionId,
});

if (result) {
updateSession(taskRunId, { status: "connected" });
if (persistedMode) {
try {
await trpcVanilla.agent.setMode.mutate({
sessionId: taskRunId,
modeId: persistedMode,
});
} catch (error) {
log.warn("Failed to restore persisted mode after reconnect", {
taskId,
error,
});
if (result) {
updateSession(taskRunId, { status: "connected" });
if (persistedMode) {
try {
await trpcVanilla.agent.setMode.mutate({
sessionId: taskRunId,
modeId: persistedMode,
});
} catch (error) {
log.warn("Failed to restore persisted mode after reconnect", {
taskId,
error,
});
}
}
} else {
unsubscribeFromChannel(taskRunId);
updateSession(taskRunId, {
status: "error",
errorMessage:
"Failed to reconnect to the agent. Please restart the task.",
});
}
} else {
} catch (error) {
// Handle reconnection errors - session already added, just update status
unsubscribeFromChannel(taskRunId);
const errorMessage =
error instanceof Error ? error.message : String(error);
log.error("Failed to reconnect to session", { taskId, error });
updateSession(taskRunId, {
status: "error",
errorMessage:
"Failed to reconnect to the agent. Please restart the task.",
errorMessage ||
"Failed to reconnect to the agent. Please try again.",
});
}
};
Expand Down Expand Up @@ -790,11 +804,51 @@ const useStore = create<SessionStore>()(
sessionId: session.taskRunId,
prompt: blocks,
});
} catch (error) {
// Check if this is a fatal error that means the session is dead
const errorMessage =
error instanceof Error ? error.message : String(error);
const errorDetails = (error as { data?: { details?: string } }).data
?.details;

const isFatalError =
errorMessage.includes("Internal error") ||
errorDetails?.includes("process exited") ||
errorDetails?.includes("Session did not end") ||
errorDetails?.includes("not ready for writing") ||
errorDetails?.includes("Session not found");

if (isFatalError) {
log.error("Fatal prompt error, setting session to error state", {
taskRunId: session.taskRunId,
errorMessage,
errorDetails,
});
updateSession(session.taskRunId, {
status: "error",
errorMessage:
errorDetails ||
"Session connection lost. Please retry or start a new task.",
isPromptPending: false,
promptStartedAt: null,
});
} else {
updateSession(session.taskRunId, {
isPromptPending: false,
promptStartedAt: null,
});
}

throw error;
} finally {
updateSession(session.taskRunId, {
isPromptPending: false,
promptStartedAt: null,
});
// Only clear pending state if not already done in catch
const currentSession = get().sessions[session.taskRunId];
if (currentSession?.isPromptPending) {
updateSession(session.taskRunId, {
isPromptPending: false,
promptStartedAt: null,
});
}
}
};

Expand Down Expand Up @@ -960,6 +1014,22 @@ const useStore = create<SessionStore>()(
const session = getSessionByTaskId(taskId);
if (!session) throw new Error("No active session for task");

// Don't send if session is not connected
if (session.status !== "connected") {
if (session.status === "error") {
throw new Error(
session.errorMessage ||
"Session is in error state. Please retry or start a new task.",
);
}
if (session.status === "connecting") {
throw new Error(
"Session is still connecting. Please wait and try again.",
);
}
throw new Error(`Session is not ready (status: ${session.status})`);
}

let blocks: ContentBlock[] =
typeof prompt === "string"
? [{ type: "text", text: prompt }]
Expand Down Expand Up @@ -1234,9 +1304,26 @@ const useStore = create<SessionStore>()(
}
},

clearSessionError: (taskId: string) => {
clearSessionError: async (taskId: string) => {
const session = getSessionByTaskId(taskId);
if (session) {
// Cancel the agent session on the main process to clean up the dead subprocess
try {
await trpcVanilla.agent.cancel.mutate({
sessionId: session.taskRunId,
});
log.info("Cancelled agent session for retry", {
taskId,
taskRunId: session.taskRunId,
});
} catch (error) {
// Ignore errors - session may already be cleaned up
log.warn("Failed to cancel agent session during error clear", {
taskId,
error,
});
}
unsubscribeFromChannel(session.taskRunId);
removeSession(session.taskRunId);
}
connectAttempts.delete(taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {

const { appendUserShellExecute } = useSessionActions();

const handleRetry = useCallback(() => {
const handleRetry = useCallback(async () => {
if (!repoPath) return;
clearSessionError(taskId);
await clearSessionError(taskId);
connectToTask({ task, repoPath });
}, [taskId, repoPath, task, clearSessionError, connectToTask]);

Expand Down
31 changes: 22 additions & 9 deletions packages/agent/src/utils/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,33 @@ export function createTappedWritableStream(
onMessage(line);
}

const writer = underlying.getWriter();
await writer.write(chunk);
writer.releaseLock();
try {
const writer = underlying.getWriter();
await writer.write(chunk);
writer.releaseLock();
} catch (err) {
// Stream may be closed if subprocess crashed - log but don't throw
logger?.error("ACP write error", err);
}
},
async close() {
const writer = underlying.getWriter();
await writer.close();
writer.releaseLock();
try {
const writer = underlying.getWriter();
await writer.close();
writer.releaseLock();
} catch {
// Stream may already be closed
}
},
async abort(reason: unknown) {
logger?.warn("Tapped stream aborted", { reason });
const writer = underlying.getWriter();
await writer.abort(reason);
writer.releaseLock();
try {
const writer = underlying.getWriter();
await writer.abort(reason);
writer.releaseLock();
} catch {
// Stream may already be closed
}
},
});
}
Loading