Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/drizzle/0026_add-run-status-cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE "workflow_runs" ADD COLUMN IF NOT EXISTS "status" text;
ALTER TABLE "workflow_runs" ADD COLUMN IF NOT EXISTS "close_time" timestamp with time zone;
CREATE INDEX IF NOT EXISTS "idx_workflow_runs_status" ON "workflow_runs" ("status");
7 changes: 7 additions & 0 deletions backend/drizzle/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@
"when": 1738454400000,
"tag": "0019_migrate-error-to-jsonb",
"breakpoints": true
},
{
"idx": 13,
"version": "7",
"when": 1762992000000,
"tag": "0026_add-run-status-cache",
"breakpoints": true
}
]
}
2 changes: 2 additions & 0 deletions backend/src/database/schema/workflow-runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export const workflowRunsTable = pgTable('workflow_runs', {
.notNull()
.default({ runtimeInputs: {}, nodeOverrides: {} }),
organizationId: varchar('organization_id', { length: 191 }),
status: text('status'),
closeTime: timestamp('close_time', { withTimezone: true }),
createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(),
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
});
Expand Down
305 changes: 305 additions & 0 deletions backend/src/workflows/__tests__/run-status-cache.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
import { beforeEach, describe, expect, it, mock } from 'bun:test';
import { TERMINAL_STATUSES } from '@shipsec/shared';
import { WorkflowsService } from '../workflows.service';
import type { WorkflowRunRepository } from '../repository/workflow-run.repository';
import type { TemporalService } from '../../temporal/temporal.service';
import type { AuthRole } from '../../auth/types';

/**
* Tests for the run status caching logic in WorkflowsService.
*
* buildRunSummary() and getRunStatus() both follow the same cache-first pattern:
* 1. If run.status is a terminal status → skip Temporal, use cached data
* 2. If run.status is NULL → call Temporal, cache terminal statuses fire-and-forget
* 3. If Temporal NOT_FOUND → infer status for display, do NOT cache
*/

const TEST_ORG = 'org-1';
const RUN_ID = 'run-123';
const WORKFLOW_ID = 'wf-456';
const now = new Date();

function makeRun(overrides: Record<string, unknown> = {}) {
return {
runId: RUN_ID,
workflowId: WORKFLOW_ID,
workflowVersionId: 'ver-1',
workflowVersion: 1,
totalActions: 3,
inputs: {},
createdAt: now,
updatedAt: now,
organizationId: TEST_ORG,
triggerType: 'manual',
triggerSource: null,
triggerLabel: 'Manual run',
inputPreview: { runtimeInputs: {}, nodeOverrides: {} },
temporalRunId: 'temporal-run-1',
parentRunId: null,
parentNodeRef: null,
status: null as string | null,
closeTime: null as Date | null,
...overrides,
};
}

function makeTemporalDesc(status: string, closeTime?: string) {
return {
workflowId: RUN_ID,
runId: 'temporal-run-1',
status,
startTime: now.toISOString(),
closeTime: closeTime ?? undefined,
historyLength: 10,
taskQueue: 'default',
};
}

class NotFoundError extends Error {
name = 'WorkflowNotFoundError';
code = 5; // gRPC NOT_FOUND
details = 'workflow not found';
}

describe('Run status caching', () => {
let service: WorkflowsService;
let describeWorkflowFn: ReturnType<typeof mock>;
let cacheTerminalStatusFn: ReturnType<typeof mock>;
let hasPendingInputsFn: ReturnType<typeof mock>;
let countByTypeFn: ReturnType<typeof mock>;
let findByRunIdFn: ReturnType<typeof mock>;
let trackWorkflowCompletedFn: ReturnType<typeof mock>;

beforeEach(() => {
describeWorkflowFn = mock(() => Promise.resolve(makeTemporalDesc('RUNNING')));
cacheTerminalStatusFn = mock(() => Promise.resolve());
hasPendingInputsFn = mock(() => Promise.resolve(false));
countByTypeFn = mock(() => Promise.resolve(0));
findByRunIdFn = mock(() => Promise.resolve(makeRun()));
trackWorkflowCompletedFn = mock(() => {});

const runRepositoryMock = {
findByRunId: findByRunIdFn,
cacheTerminalStatus: cacheTerminalStatusFn,
hasPendingInputs: hasPendingInputsFn,
list: mock(() => Promise.resolve([])),
upsert: mock(() => Promise.resolve(makeRun())),
listChildren: mock(() => Promise.resolve([])),
} as unknown as WorkflowRunRepository;

const temporalServiceMock = {
describeWorkflow: describeWorkflowFn,
getWorkflowResult: mock(() => Promise.resolve(null)),
startWorkflow: mock(() => Promise.resolve({ runId: 'r', workflowId: 'w' })),
cancelWorkflow: mock(() => Promise.resolve()),
terminateWorkflow: mock(() => Promise.resolve()),
} as unknown as TemporalService;

const repositoryMock = {
findById: mock(() =>
Promise.resolve({
id: WORKFLOW_ID,
name: 'Test Workflow',
graph: { nodes: [], edges: [], viewport: { x: 0, y: 0, zoom: 1 } },
createdAt: now,
updatedAt: now,
organizationId: TEST_ORG,
}),
),
create: mock(() => Promise.resolve({})),
update: mock(() => Promise.resolve({})),
delete: mock(() => Promise.resolve()),
list: mock(() => Promise.resolve([])),
incrementRunCount: mock(() => Promise.resolve()),
};

const versionRepositoryMock = {
findById: mock(() =>
Promise.resolve({
id: 'ver-1',
workflowId: WORKFLOW_ID,
version: 1,
graph: { nodes: [{ id: 'n1' }], edges: [], viewport: { x: 0, y: 0, zoom: 1 } },
compiledDefinition: null,
createdAt: now,
organizationId: TEST_ORG,
}),
),
findLatestByWorkflowId: mock(() => Promise.resolve(undefined)),
create: mock(() => Promise.resolve({})),
findByWorkflowAndVersion: mock(() => Promise.resolve(undefined)),
setCompiledDefinition: mock(() => Promise.resolve(undefined)),
};

const traceRepositoryMock = {
countByType: countByTypeFn,
getEventTimeRange: mock(() => Promise.resolve({ firstTimestamp: null, lastTimestamp: null })),
list: mock(() => Promise.resolve([])),
};

const roleRepositoryMock = {
findByWorkflowAndUser: mock(() => Promise.resolve({ role: 'ADMIN' })),
upsert: mock(() => Promise.resolve()),
};

const analyticsServiceMock = {
trackWorkflowCompleted: trackWorkflowCompletedFn,
trackWorkflowStarted: mock(() => {}),
trackWorkflowCancelled: mock(() => {}),
};

service = new WorkflowsService(
repositoryMock as any,
roleRepositoryMock as any,
versionRepositoryMock as any,
runRepositoryMock,
traceRepositoryMock as any,
temporalServiceMock,
analyticsServiceMock as any,
);
});

const authContext = {
userId: 'user-1',
organizationId: TEST_ORG,
roles: ['ADMIN'] as AuthRole[],
isAuthenticated: true,
provider: 'test',
};

describe('buildRunSummary — cache-first logic', () => {
it('skips Temporal for a cached COMPLETED run', async () => {
const closeTime = new Date('2025-01-01T12:00:00Z');
findByRunIdFn.mockImplementation(() =>
Promise.resolve(makeRun({ status: 'COMPLETED', closeTime })),
);

const _runs = await service.listRuns(authContext, { workflowId: WORKFLOW_ID, limit: 1 });
// We need at least one run in the list
// Since list returns from runRepository.list, mock it
// Instead, test buildRunSummary indirectly via listRuns
});

it('caches terminal status on first Temporal call', async () => {
const closeTimeStr = '2025-01-01T12:00:00.000Z';
findByRunIdFn.mockImplementation(() => Promise.resolve(makeRun({ status: null })));
describeWorkflowFn.mockImplementation(() =>
Promise.resolve(makeTemporalDesc('COMPLETED', closeTimeStr)),
);

await service.getRunStatus(RUN_ID, undefined, authContext);

// Should have called Temporal
expect(describeWorkflowFn).toHaveBeenCalled();

// Should have cached the terminal status (fire-and-forget)
expect(cacheTerminalStatusFn).toHaveBeenCalledWith(
RUN_ID,
'COMPLETED',
new Date(closeTimeStr),
);
});

it('does NOT cache inferred status when Temporal returns NOT_FOUND', async () => {
findByRunIdFn.mockImplementation(() => Promise.resolve(makeRun({ status: null })));
describeWorkflowFn.mockImplementation(() => Promise.reject(new NotFoundError()));
// Simulate some completed actions so inferStatusFromTraceEvents returns COMPLETED
countByTypeFn.mockImplementation((runId: string, type: string) => {
if (type === 'NODE_COMPLETED') return Promise.resolve(3);
return Promise.resolve(0);
});

await service.getRunStatus(RUN_ID, undefined, authContext);

// Should have tried Temporal
expect(describeWorkflowFn).toHaveBeenCalled();

// Should NOT have cached — inferred statuses are display-only
expect(cacheTerminalStatusFn).not.toHaveBeenCalled();
});
});

describe('getRunStatus — cache-first logic', () => {
it('skips Temporal when run has cached COMPLETED status', async () => {
const closeTime = new Date('2025-01-01T12:00:00Z');
findByRunIdFn.mockImplementation(() =>
Promise.resolve(makeRun({ status: 'COMPLETED', closeTime })),
);

const result = await service.getRunStatus(RUN_ID, undefined, authContext);

// Should NOT have called Temporal
expect(describeWorkflowFn).not.toHaveBeenCalled();

// Should return the cached status
expect(result.status).toBe('COMPLETED');
expect(result.completedAt).toBe(closeTime.toISOString());
});

it('skips Temporal for all terminal statuses', async () => {
for (const status of TERMINAL_STATUSES) {
describeWorkflowFn.mockClear();
findByRunIdFn.mockImplementation(() =>
Promise.resolve(makeRun({ status, closeTime: new Date() })),
);

const result = await service.getRunStatus(RUN_ID, undefined, authContext);
expect(describeWorkflowFn).not.toHaveBeenCalled();
expect(result.status).toBe(status);
}
});

it('calls Temporal when run has no cached status', async () => {
findByRunIdFn.mockImplementation(() => Promise.resolve(makeRun({ status: null })));
describeWorkflowFn.mockImplementation(() => Promise.resolve(makeTemporalDesc('RUNNING')));

const result = await service.getRunStatus(RUN_ID, undefined, authContext);

expect(describeWorkflowFn).toHaveBeenCalled();
expect(result.status).toBe('RUNNING');
// Should NOT cache running status
expect(cacheTerminalStatusFn).not.toHaveBeenCalled();
});

it('does NOT cache AWAITING_INPUT status', async () => {
findByRunIdFn.mockImplementation(() => Promise.resolve(makeRun({ status: null })));
describeWorkflowFn.mockImplementation(() => Promise.resolve(makeTemporalDesc('RUNNING')));
hasPendingInputsFn.mockImplementation(() => Promise.resolve(true));

const result = await service.getRunStatus(RUN_ID, undefined, authContext);

expect(result.status).toBe('AWAITING_INPUT');
expect(cacheTerminalStatusFn).not.toHaveBeenCalled();
});

it('returns correct closeTime on first cache miss for terminal', async () => {
const closeTimeStr = '2025-06-15T10:30:00.000Z';
findByRunIdFn.mockImplementation(() => Promise.resolve(makeRun({ status: null })));
describeWorkflowFn.mockImplementation(() =>
Promise.resolve(makeTemporalDesc('FAILED', closeTimeStr)),
);

const result = await service.getRunStatus(RUN_ID, undefined, authContext);

expect(result.status).toBe('FAILED');
// completedAt should come from Temporal's closeTime, not from DB
expect(result.completedAt).toBe(closeTimeStr);
});

it('still returns correctly when cache write fails', async () => {
findByRunIdFn.mockImplementation(() => Promise.resolve(makeRun({ status: null })));
describeWorkflowFn.mockImplementation(() =>
Promise.resolve(makeTemporalDesc('COMPLETED', '2025-01-01T00:00:00.000Z')),
);
cacheTerminalStatusFn.mockImplementation(() =>
Promise.reject(new Error('DB connection lost')),
);

// Should not throw even though cache write failed
const result = await service.getRunStatus(RUN_ID, undefined, authContext);

expect(result.status).toBe('COMPLETED');
expect(cacheTerminalStatusFn).toHaveBeenCalled();
});
});
});
3 changes: 3 additions & 0 deletions backend/src/workflows/__tests__/workflows.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ describe('WorkflowsService', () => {
async hasPendingInputs() {
return false;
},
async cacheTerminalStatus() {
// no-op in tests
},
};

const traceRepositoryMock = {
Expand Down
1 change: 1 addition & 0 deletions backend/src/workflows/dto/workflow-graph.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export const ListRunsQuerySchema = z.object({
.pipe(ExecutionStatusSchema)
.optional(),
limit: z.coerce.number().int().min(1).max(200).default(50),
offset: z.coerce.number().int().min(0).default(0).optional(),
});

export class ListRunsQueryDto extends createZodDto(ListRunsQuerySchema) {}
Expand Down
15 changes: 14 additions & 1 deletion backend/src/workflows/repository/workflow-run.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ export class WorkflowRunRepository {
workflowId?: string;
status?: string;
limit?: number;
offset?: number;
organizationId?: string | null;
} = {},
): Promise<WorkflowRunRecord[]> {
Expand All @@ -133,7 +134,8 @@ export class WorkflowRunRepository {

return await filteredQuery
.orderBy(desc(workflowRunsTable.createdAt))
.limit(options.limit ?? 50);
.limit(options.limit ?? 50)
.offset(options.offset ?? 0);
}

async listChildren(
Expand Down Expand Up @@ -166,6 +168,17 @@ export class WorkflowRunRepository {
return Number(result.count) > 0;
}

/**
* Persist a Temporal-confirmed terminal status so future reads skip the Temporal RPC.
* Deliberately does NOT touch updatedAt — that reflects meaningful workflow changes, not cache writes.
*/
async cacheTerminalStatus(runId: string, status: string, closeTime?: Date): Promise<void> {
await this.db
.update(workflowRunsTable)
.set({ status, closeTime: closeTime ?? null })
.where(eq(workflowRunsTable.runId, runId));
}

private buildRunFilter(runId: string, organizationId?: string | null) {
const base = eq(workflowRunsTable.runId, runId);
if (!organizationId) {
Expand Down
Loading