Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 100 additions & 31 deletions apps/array/src/main/services/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import { logger } from "../lib/logger";

const log = logger.scope("session-manager");

function isAuthError(error: unknown): boolean {
return (
error instanceof Error &&
error.message.startsWith("Authentication required")
);
}

type MessageCallback = (message: unknown) => void;

class NdJsonTap {
Expand Down Expand Up @@ -135,6 +142,7 @@ export interface ManagedSession {
createdAt: number;
lastActivityAt: number;
mockNodeDir: string;
config: SessionConfig;
}

function getClaudeCliPath(): string {
Expand All @@ -146,7 +154,7 @@ function getClaudeCliPath(): string {

export class SessionManager {
private sessions = new Map<string, ManagedSession>();
private sessionTokens = new Map<string, string>();
private currentToken: string | null = null;
private getMainWindow: () => BrowserWindow | null;
private onLog: OnLogCallback;

Expand All @@ -155,23 +163,20 @@ export class SessionManager {
this.onLog = onLog;
}

public updateSessionToken(taskRunId: string, newToken: string): void {
this.sessionTokens.set(taskRunId, newToken);
log.info("Session token updated", { taskRunId });
public updateToken(newToken: string): void {
this.currentToken = newToken;
log.info("Session token updated");
}

private getSessionToken(taskRunId: string, fallback: string): string {
return this.sessionTokens.get(taskRunId) || fallback;
private getToken(fallback: string): string {
return this.currentToken || fallback;
}

private buildMcpServers(
credentials: PostHogCredentials,
taskRunId: string,
): AcpMcpServer[] {
private buildMcpServers(credentials: PostHogCredentials): AcpMcpServer[] {
const servers: AcpMcpServer[] = [];

const mcpUrl = this.getPostHogMcpUrl(credentials.apiHost);
const token = this.getSessionToken(taskRunId, credentials.apiKey);
const token = this.getToken(credentials.apiKey);

servers.push({
name: "posthog",
Expand Down Expand Up @@ -211,6 +216,7 @@ export class SessionManager {
private async getOrCreateSession(
config: SessionConfig,
isReconnect: boolean,
isRetry = false,
): Promise<ManagedSession | null> {
const {
taskId,
Expand All @@ -222,9 +228,11 @@ export class SessionManager {
model,
} = config;

const existing = this.sessions.get(taskRunId);
if (existing) {
return existing;
if (!isRetry) {
const existing = this.sessions.get(taskRunId);
if (existing) {
return existing;
}
}

const channel = `agent-event:${taskRunId}`;
Expand All @@ -234,7 +242,7 @@ export class SessionManager {
const agent = new Agent({
workingDirectory: repoPath,
posthogApiUrl: credentials.apiHost,
posthogApiKey: credentials.apiKey,
getPosthogApiKey: () => this.getToken(credentials.apiKey),
posthogProjectId: credentials.projectId,
debug: !app.isPackaged,
onLog: this.onLog,
Expand All @@ -256,11 +264,9 @@ export class SessionManager {
clientCapabilities: {},
});

const mcpServers = this.buildMcpServers(credentials, taskRunId);
const mcpServers = this.buildMcpServers(credentials);

if (isReconnect) {
// Use our custom extension method to resume without replaying history.
// Client fetches history from S3 directly.
await connection.extMethod("_posthog/session/resume", {
sessionId: taskRunId,
cwd: repoPath,
Expand Down Expand Up @@ -290,38 +296,82 @@ export class SessionManager {
createdAt: Date.now(),
lastActivityAt: Date.now(),
mockNodeDir,
config,
};

this.sessions.set(taskRunId, session);
if (isRetry) {
log.info("Session created after auth retry", { taskRunId });
}
return session;
} catch (err) {
this.cleanupMockNodeEnvironment(mockNodeDir);
if (!isRetry && isAuthError(err)) {
log.warn(
`Auth error during ${isReconnect ? "reconnect" : "create"}, retrying`,
{ taskRunId },
);
return this.getOrCreateSession(config, isReconnect, true);
}
log.error(
`Failed to ${isReconnect ? "reconnect" : "create"} session`,
`Failed to ${isReconnect ? "reconnect" : "create"} session${isRetry ? " after retry" : ""}`,
err,
);
if (isReconnect) return null;
throw err;
}
}

private async recreateSession(taskRunId: string): Promise<ManagedSession> {
const existing = this.sessions.get(taskRunId);
if (!existing) {
throw new Error(`Session not found for recreation: ${taskRunId}`);
}

log.info("Recreating session due to auth error", { taskRunId });

// Store config and cleanup old session
const config = existing.config;
this.cleanupSession(taskRunId);

// Reconnect to preserve Claude context via sdkSessionId
const newSession = await this.getOrCreateSession(config, true);
if (!newSession) {
throw new Error(`Failed to recreate session: ${taskRunId}`);
}

return newSession;
}

async prompt(
taskRunId: string,
prompt: ContentBlock[],
): Promise<{ stopReason: string }> {
const session = this.sessions.get(taskRunId);
let session = this.sessions.get(taskRunId);
if (!session) {
throw new Error(`Session not found: ${taskRunId}`);
}

session.lastActivityAt = Date.now();

const result = await session.connection.prompt({
sessionId: taskRunId, // Use taskRunId as ACP sessionId
prompt,
});

return { stopReason: result.stopReason };
try {
const result = await session.connection.prompt({
sessionId: taskRunId,
prompt,
});
return { stopReason: result.stopReason };
} catch (err) {
if (isAuthError(err)) {
log.warn("Auth error during prompt, recreating session", { taskRunId });
session = await this.recreateSession(taskRunId);
const result = await session.connection.prompt({
sessionId: taskRunId,
prompt,
});
return { stopReason: result.stopReason };
}
throw err;
}
}

async cancelSession(taskRunId: string): Promise<boolean> {
Expand Down Expand Up @@ -398,11 +448,12 @@ export class SessionManager {
credentials: PostHogCredentials,
mockNodeDir: string,
): void {
const token = this.getToken(credentials.apiKey);
const newPath = `${mockNodeDir}:${process.env.PATH || ""}`;
process.env.PATH = newPath;
process.env.POSTHOG_AUTH_HEADER = `Bearer ${credentials.apiKey}`;
process.env.ANTHROPIC_API_KEY = credentials.apiKey;
process.env.ANTHROPIC_AUTH_TOKEN = credentials.apiKey;
process.env.POSTHOG_AUTH_HEADER = `Bearer ${token}`;
process.env.ANTHROPIC_API_KEY = token;
process.env.ANTHROPIC_AUTH_TOKEN = token;

const llmGatewayUrl =
process.env.LLM_GATEWAY_URL ||
Expand Down Expand Up @@ -508,6 +559,24 @@ export class SessionManager {
// No-op: session/update notifications are captured by the stream tap
// and forwarded as acp_message events to avoid duplication
},

extNotification: async (
method: string,
params: Record<string, unknown>,
): Promise<void> => {
if (method === "_posthog/sdk_session") {
const { sessionId, sdkSessionId } = params as {
sessionId: string;
sdkSessionId: string;
};
// Store sdkSessionId in session config for recreation/reconnection
const session = this.sessions.get(sessionId);
if (session) {
session.config.sdkSessionId = sdkSessionId;
log.info("SDK session ID captured", { sessionId, sdkSessionId });
}
}
},
};

// Create client-side connection with tapped streams (bidirectional)
Expand Down Expand Up @@ -665,10 +734,10 @@ export function registerAgentIpc(
"agent-token-refresh",
async (
_event: IpcMainInvokeEvent,
taskRunId: string,
_taskRunId: string,
newToken: string,
): Promise<void> => {
sessionManager.updateSessionToken(taskRunId, newToken);
sessionManager.updateToken(newToken);
},
);

Expand Down
7 changes: 7 additions & 0 deletions apps/array/src/renderer/features/auth/stores/authStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ export const useAuthStore = create<AuthState>()(
...(projectId && { projectId }),
});

// Notify main process of token refresh for active agent sessions
window.electronAPI
.agentTokenRefresh("", tokenResponse.access_token)
.catch((err) => {
log.warn("Failed to update agent token:", err);
});

get().scheduleTokenRefresh();
},

Expand Down
11 changes: 4 additions & 7 deletions packages/agent/example-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import type { SessionPersistenceConfig } from "./src/session-store.js";
import { Logger } from "./src/utils/logger.js";

// PostHog configuration - set via env vars
const POSTHOG_API_KEY = process.env.POSTHOG_API_KEY || "";
const POSTHOG_CONFIG = {
apiUrl: process.env.POSTHOG_API_URL || "",
apiKey: process.env.POSTHOG_API_KEY || "",
getApiKey: () => POSTHOG_API_KEY,
projectId: parseInt(process.env.POSTHOG_PROJECT_ID || "0", 10),
};

Expand Down Expand Up @@ -250,11 +251,7 @@ async function main() {
}
} else if (!existingSessionId) {
// Create new Task/TaskRun for new sessions (only if PostHog is configured)
if (
POSTHOG_CONFIG.apiUrl &&
POSTHOG_CONFIG.apiKey &&
POSTHOG_CONFIG.projectId
) {
if (POSTHOG_CONFIG.apiUrl && POSTHOG_API_KEY && POSTHOG_CONFIG.projectId) {
console.log("🔗 Connecting to PostHog...");
const posthogClient = new PostHogAPIClient(POSTHOG_CONFIG);

Expand Down Expand Up @@ -299,7 +296,7 @@ async function main() {
logger.log(level, message, data, scope);
},
...(POSTHOG_CONFIG.apiUrl && { posthogApiUrl: POSTHOG_CONFIG.apiUrl }),
...(POSTHOG_CONFIG.apiKey && { posthogApiKey: POSTHOG_CONFIG.apiKey }),
...(POSTHOG_API_KEY && { getPosthogApiKey: POSTHOG_CONFIG.getApiKey }),
...(POSTHOG_CONFIG.projectId && {
posthogProjectId: POSTHOG_CONFIG.projectId,
}),
Expand Down
3 changes: 2 additions & 1 deletion packages/agent/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ async function testAgent() {
});
}

const apiKey = process.env.POSTHOG_API_KEY || "";
const agent = new Agent({
workingDirectory: REPO_PATH,
posthogApiUrl: process.env.POSTHOG_API_URL || "http://localhost:8010",
posthogApiKey: process.env.POSTHOG_API_KEY,
getPosthogApiKey: () => apiKey,
posthogProjectId: process.env.POSTHOG_PROJECT_ID
? parseInt(process.env.POSTHOG_PROJECT_ID, 10)
: 1,
Expand Down
8 changes: 4 additions & 4 deletions packages/agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ export class Agent {

// Add auth if API key provided
const headers: Record<string, string> = {};
if (config.posthogApiKey) {
headers.Authorization = `Bearer ${config.posthogApiKey}`;
if (config.getPosthogApiKey) {
headers.Authorization = `Bearer ${config.getPosthogApiKey()}`;
}

const defaultMcpServers = {
Expand Down Expand Up @@ -93,12 +93,12 @@ export class Agent {

if (
config.posthogApiUrl &&
config.posthogApiKey &&
config.getPosthogApiKey &&
config.posthogProjectId
) {
this.posthogAPI = new PostHogAPIClient({
apiUrl: config.posthogApiUrl,
apiKey: config.posthogApiKey,
getApiKey: config.getPosthogApiKey,
projectId: config.posthogProjectId,
});

Expand Down
4 changes: 2 additions & 2 deletions packages/agent/src/posthog-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class PostHogAPIClient {

private get headers(): Record<string, string> {
return {
Authorization: `Bearer ${this.config.apiKey}`,
Authorization: `Bearer ${this.config.getApiKey()}`,
"Content-Type": "application/json",
};
}
Expand Down Expand Up @@ -84,7 +84,7 @@ export class PostHogAPIClient {
}

getApiKey(): string {
return this.config.apiKey;
return this.config.getApiKey();
}

getLlmGatewayUrl(): string {
Expand Down
4 changes: 2 additions & 2 deletions packages/agent/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export interface AgentConfig {

// PostHog API configuration (optional - enables PostHog integration when provided)
posthogApiUrl?: string;
posthogApiKey?: string;
getPosthogApiKey?: () => string;
posthogProjectId?: number;

// PostHog MCP configuration
Expand All @@ -219,7 +219,7 @@ export interface AgentConfig {

export interface PostHogAPIConfig {
apiUrl: string;
apiKey: string;
getApiKey: () => string;
projectId: number;
}

Expand Down