Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions server/src/__tests__/heartbeat-sticky-error-recovery.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import { randomUUID } from "node:crypto";
import { eq } from "drizzle-orm";
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
import {
activityLog,
agents,
agentRuntimeState,
agentWakeupRequests,
companies,
createDb,
heartbeatRunEvents,
heartbeatRuns,
} from "@paperclipai/db";
import {
getEmbeddedPostgresTestSupport,
startEmbeddedPostgresTestDatabase,
} from "./helpers/embedded-postgres.js";
import {
STICKY_ERROR_RECOVERY_MIN_AGE_MS,
heartbeatService,
} from "../services/heartbeat.ts";

const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;

if (!embeddedPostgresSupport.supported) {
console.warn(
`Skipping embedded Postgres sticky-error recovery tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`,
);
}

describeEmbeddedPostgres("heartbeat sticky-error recovery sweep", () => {
let db!: ReturnType<typeof createDb>;
let heartbeat!: ReturnType<typeof heartbeatService>;
let tempDb: Awaited<ReturnType<typeof startEmbeddedPostgresTestDatabase>> | null = null;

beforeAll(async () => {
tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-sticky-error-recovery-");
db = createDb(tempDb.connectionString);
heartbeat = heartbeatService(db);
}, 20_000);

afterEach(async () => {
await db.delete(heartbeatRunEvents);
await db.delete(heartbeatRuns);
await db.delete(agentWakeupRequests);
await db.delete(agentRuntimeState);
await db.delete(activityLog);
await db.delete(agents);
await db.delete(companies);
});

afterAll(async () => {
await tempDb?.cleanup();
});

async function seedCompany() {
const id = randomUUID();
await db.insert(companies).values({
id,
name: "Paperclip",
issuePrefix: `T${id.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
requireBoardApprovalForNewAgents: false,
});
return id;
}

async function seedAgent(input: {
companyId: string;
status: "idle" | "error" | "paused" | "terminated" | "running" | "pending_approval";
lastHeartbeatAt: Date | null;
pauseReason?: string | null;
}) {
const id = randomUUID();
await db.insert(agents).values({
id,
companyId: input.companyId,
name: `Agent-${id.slice(0, 8)}`,
role: "engineer",
status: input.status,
adapterType: "claude_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
lastHeartbeatAt: input.lastHeartbeatAt,
pauseReason: input.pauseReason ?? null,
pausedAt: input.pauseReason ? new Date() : null,
});
return id;
}

it("flips error agents older than the recovery floor back to idle", async () => {
const companyId = await seedCompany();
const now = new Date("2026-05-30T20:00:00.000Z");
const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000));
const stuckId = await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale });

const result = await heartbeat.recoverErroredAgents(now);

expect(result).toEqual({ candidates: 1, recovered: 1 });
const [after] = await db.select().from(agents).where(eq(agents.id, stuckId));
expect(after?.status).toBe("idle");
expect(after?.pauseReason).toBeNull();
expect(after?.pausedAt).toBeNull();
// lastHeartbeatAt is intentionally NOT bumped — preserves the original
// failure timestamp so operators can audit how long the agent was stuck.
expect(after?.lastHeartbeatAt?.toISOString()).toBe(stale.toISOString());
});

it("leaves error agents inside the recovery floor untouched", async () => {
const companyId = await seedCompany();
const now = new Date("2026-05-30T20:00:00.000Z");
const recent = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS - 60_000));
const id = await seedAgent({ companyId, status: "error", lastHeartbeatAt: recent });

const result = await heartbeat.recoverErroredAgents(now);

expect(result).toEqual({ candidates: 0, recovered: 0 });
const [after] = await db.select().from(agents).where(eq(agents.id, id));
expect(after?.status).toBe("error");
});

it("does not disturb non-error agents even when they are stale", async () => {
const companyId = await seedCompany();
const now = new Date("2026-05-30T20:00:00.000Z");
const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 3_600_000));
const pausedId = await seedAgent({
companyId,
status: "paused",
lastHeartbeatAt: stale,
pauseReason: "manual pause",
});
const terminatedId = await seedAgent({ companyId, status: "terminated", lastHeartbeatAt: stale });
const idleId = await seedAgent({ companyId, status: "idle", lastHeartbeatAt: stale });

const result = await heartbeat.recoverErroredAgents(now);

expect(result).toEqual({ candidates: 0, recovered: 0 });
const rows = await db.select().from(agents);
const byId = new Map(rows.map((row) => [row.id, row.status]));
expect(byId.get(pausedId)).toBe("paused");
expect(byId.get(terminatedId)).toBe("terminated");
expect(byId.get(idleId)).toBe("idle");
});

it("treats missing lastHeartbeatAt by falling back to updatedAt", async () => {
const companyId = await seedCompany();
const now = new Date("2026-05-30T20:00:00.000Z");
const id = await seedAgent({ companyId, status: "error", lastHeartbeatAt: null });
// Force updatedAt past the recovery floor so the coalesce branch triggers.
const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000));
await db.update(agents).set({ updatedAt: stale }).where(eq(agents.id, id));

const result = await heartbeat.recoverErroredAgents(now);

expect(result).toEqual({ candidates: 1, recovered: 1 });
const [after] = await db.select().from(agents).where(eq(agents.id, id));
expect(after?.status).toBe("idle");
});

it("respects the configured batch limit", async () => {
const companyId = await seedCompany();
const now = new Date("2026-05-30T20:00:00.000Z");
const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000));
for (let i = 0; i < 5; i += 1) {
await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale });
}

const result = await heartbeat.recoverErroredAgents(now, { limit: 2 });

expect(result).toEqual({ candidates: 2, recovered: 2 });
const remaining = await db.select().from(agents).where(eq(agents.status, "error"));
expect(remaining).toHaveLength(3);
});

it("writes an activity-log entry per recovered agent", async () => {
const companyId = await seedCompany();
const now = new Date("2026-05-30T20:00:00.000Z");
const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000));
const id = await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale });

await heartbeat.recoverErroredAgents(now);

const entries = await db
.select()
.from(activityLog)
.where(eq(activityLog.agentId, id));
expect(entries).toHaveLength(1);
expect(entries[0]).toMatchObject({
action: "agent.recovered_from_error",
actorType: "system",
actorId: "sticky_error_recovery",
entityType: "agent",
entityId: id,
});
});

it("is idempotent across back-to-back sweeps", async () => {
const companyId = await seedCompany();
const now = new Date("2026-05-30T20:00:00.000Z");
const stale = new Date(now.getTime() - (STICKY_ERROR_RECOVERY_MIN_AGE_MS + 60_000));
await seedAgent({ companyId, status: "error", lastHeartbeatAt: stale });

const first = await heartbeat.recoverErroredAgents(now);
const second = await heartbeat.recoverErroredAgents(now);

expect(first).toEqual({ candidates: 1, recovered: 1 });
expect(second).toEqual({ candidates: 0, recovered: 0 });
});
});
18 changes: 18 additions & 0 deletions server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,24 @@ export async function startServer(): Promise<StartedServer> {
.catch((err) => {
logger.error({ err }, "periodic heartbeat recovery failed");
});

// Un-stick agents that flipped to `status: "error"` (typically on an
// upstream quota window) and have not been touched for at least the
// recovery floor. Runs independently of the run/issue-driven chain
// above so a slow reaper can't starve quota-recovered agents.
void heartbeat
.recoverErroredAgents(new Date())
.then((result) => {
if (result.recovered > 0) {
logger.warn(
{ ...result },
"periodic sticky-error recovery flipped agents back to idle",
);
}
})
.catch((err) => {
logger.error({ err }, "periodic sticky-error recovery failed");
});
}, config.heartbeatSchedulerIntervalMs);
}

Expand Down
87 changes: 87 additions & 0 deletions server/src/services/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ const MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP = 10;
const MAX_TURN_CONTINUATION_DEFAULT_DELAY_MS = 1_000;
const MAX_TURN_CONTINUATION_MAX_DELAY_MS = 5 * 60 * 1000;
const MAX_TURN_CONTINUATION_LIVE_RUN_STATUSES = ["scheduled_retry", "queued", "running"] as const;

// How long an agent must sit in `status: "error"` before the periodic recovery
// sweep flips it back to `idle`. Two hours comfortably outruns any single
// upstream quota window (Anthropic shared-subscription resets daily) while
// still being long enough that real (non-quota) failures stay visible.
export const STICKY_ERROR_RECOVERY_MIN_AGE_MS = 2 * 60 * 60 * 1000;
const STICKY_ERROR_RECOVERY_DEFAULT_BATCH_LIMIT = 50;
const STICKY_ERROR_RECOVERY_ACTOR_ID = "sticky_error_recovery";
type CodexTransientFallbackMode =
| "same_session"
| "safer_invocation"
Expand Down Expand Up @@ -10194,6 +10202,85 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})

buildRunOutputSilence,

recoverErroredAgents: async (
now: Date = new Date(),
opts: { minAgeMs?: number; limit?: number } = {},
) => {
const minAgeMs = Math.max(0, opts.minAgeMs ?? STICKY_ERROR_RECOVERY_MIN_AGE_MS);
const limit = Math.max(1, Math.min(opts.limit ?? STICKY_ERROR_RECOVERY_DEFAULT_BATCH_LIMIT, 500));
const cutoff = new Date(now.getTime() - minAgeMs);

const candidates = await db
.select({
id: agents.id,
companyId: agents.companyId,
lastHeartbeatAt: agents.lastHeartbeatAt,
updatedAt: agents.updatedAt,
})
.from(agents)
.where(
and(
eq(agents.status, "error"),
// Use lastHeartbeatAt as the staleness baseline; fall back to
// updatedAt for agents that errored before ever heartbeating.
sql`coalesce(${agents.lastHeartbeatAt}, ${agents.updatedAt}) <= ${cutoff.toISOString()}::timestamptz`,
),
)
.limit(limit);

let recovered = 0;
for (const candidate of candidates) {
const [updated] = await db
.update(agents)
.set({
status: "idle",
pauseReason: null,
pausedAt: null,
updatedAt: now,
})
// Recheck status in the WHERE clause so a concurrent /resume or
// reverse-quota-window heartbeat can't be clobbered by this sweep.
.where(and(eq(agents.id, candidate.id), eq(agents.status, "error")))
.returning();
if (!updated) continue;
recovered += 1;

await logActivity(db, {
companyId: updated.companyId,
actorType: "system",
actorId: STICKY_ERROR_RECOVERY_ACTOR_ID,
agentId: updated.id,
runId: null,
action: "agent.recovered_from_error",
entityType: "agent",
entityId: updated.id,
details: {
source: "heartbeat.recoverErroredAgents",
minAgeMs,
cutoffIso: cutoff.toISOString(),
previousLastHeartbeatAt: candidate.lastHeartbeatAt
? new Date(candidate.lastHeartbeatAt).toISOString()
: null,
},
});

publishLiveEvent({
companyId: updated.companyId,
type: "agent.status",
payload: {
agentId: updated.id,
status: updated.status,
lastHeartbeatAt: updated.lastHeartbeatAt
? new Date(updated.lastHeartbeatAt).toISOString()
: null,
outcome: "sticky_error_recovered",
},
});
}

return { candidates: candidates.length, recovered };
},

tickTimers: async (now = new Date()) => {
const allAgents = await db.select().from(agents);
let checked = 0;
Expand Down
Loading