diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index abb1beed63..5ce53ec59d 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -88,7 +88,7 @@ src/ - **Boundary errors** (`extends Error`) — Thrown outside step execution, at the HTTP or Runner layer (e.g. `RunNotFoundError`, `PendingDataNotFoundError`, `ConfigurationError`). Caught by the HTTP server and translated into HTTP status codes (404, 400, etc.). These intentionally do NOT extend `WorkflowExecutorError` to prevent `base-step-executor` from catching them as step failures. - **Dual error messages** — `WorkflowExecutorError` carries two messages: `message` (technical, for dev logs) and `userMessage` (human-readable, surfaced to the Forest Admin UI via `stepOutcome.error`). The mapping happens in a single place: `base-step-executor.ts` uses `error.userMessage` when building the error outcome. When adding a new error subclass, always provide a distinct `userMessage` oriented toward end-users (no collection names, field names, or AI internals). If `userMessage` is omitted in the constructor call, it falls back to `message`. - **displayName in AI tools** — All `DynamicStructuredTool` schemas and system message prompts must use `displayName`, never `fieldName`. `displayName` is a Forest Admin frontend feature that replaces the technical field/relation/action name with a product-oriented label configured by the Forest Admin admin. End users write their workflow prompts using these display names, not the underlying technical names. After an AI tool call returns display names, map them back to `fieldName`/`name` before using them in datasource operations (e.g. filtering record values, calling `getRecord`). -- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` is called before `runWithActivityLog()` so neither cache hits nor uncertain-state errors emit activity log entries. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe. +- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` runs before `doExecute()` so neither cache hits nor uncertain-state errors reach the activity log emitted by `AgentWithLog`. The `executing` write-ahead marker is saved in the `beforeCall` thunk the executor passes to `AgentWithLog`'s write methods (run after `createPending`, just before the side effect) so an activity-log creation failure never leaves an orphan `executing` marker. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe. - **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getAvailableRuns()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightRuns` (keyed by `runId`, to avoid running the same run twice concurrently; the key is the run, not the step, because a chain advances the `stepId` between iterations). - **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry. - **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded **technical names** (`fieldName`/`fieldNames`/`actionName`/`relationName`) directly instead of invoking the AI — the orchestrator→executor wire references fields/relations/actions by their stable technical name, never by the mutable, non-unique `displayName`. The `displayName` persisted in the RunStore is always resolved from the live schema at execution time (still persisted for the AI and for the front — see "displayName in AI tools"). Technical names are matched exactly against the schema (`findFieldByTechnicalName` / the exact action lookup) — the displayName + fuzzy tolerances of `findField` are reserved for AI-returned names, so a technical name can't resolve to a different field whose displayName collides. Each record step type has its own typed `preRecordedArgs` shape. An unresolvable name throws `FieldNotFoundError` / `ActionNotFoundError` / `RelationNotFoundError` (read-record instead throws `NoResolvedFieldsError`, only when *no* field resolves — individual misses are surfaced per-field). Malformed arg shapes — e.g. `fieldName` without `value`, or an out-of-range `selectedRecordStepIndex` — throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. diff --git a/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts b/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts index f72ed8089e..feb270f96f 100644 --- a/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts +++ b/packages/workflow-executor/src/adapters/forestadmin-client-activity-log-port.ts @@ -5,10 +5,7 @@ import type { CreateActivityLogArgs, } from '../ports/activity-log-port'; import type { Logger } from '../ports/logger-port'; -import type { - ActivityLogAction, - ActivityLogsServiceInterface, -} from '@forestadmin/forestadmin-client'; +import type { ActivityLogsServiceInterface } from '@forestadmin/forestadmin-client'; import { serializeRecordId } from './record-id-serializer'; import withRetry from './with-retry'; @@ -30,7 +27,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort this.service.createActivityLog({ forestServerToken: this.forestServerToken, renderingId: String(args.renderingId), - action: args.action as ActivityLogAction, + action: args.action, type: args.type, // The lib writes this value verbatim into relationships.collection.data.id // (JSON:API). The Forest server audit-trail API expects the numeric collectionId. @@ -76,7 +73,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort }); } - async markFailed(handle: ActivityLogHandle, errorMessage: string): Promise { + async markFailed(handle: ActivityLogHandle): Promise { return this.drainer.track(async () => { try { await withRetry( @@ -92,7 +89,6 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort } catch (err) { this.logger.error('activity log mark-as-failed failed', { handleId: handle.id, - stepErrorMessage: errorMessage, error: extractErrorMessage(err), }); } diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 70af3e8abe..3dea5306f6 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -259,6 +259,17 @@ export class AgentPortError extends WorkflowExecutorError { } } +// Invariant guard: the agent port reads a collection's schema (for its primary keys) from the +// cache, which the executor must populate via getCollectionSchema before any record access. +export class SchemaNotCachedError extends WorkflowExecutorError { + constructor(collectionName: string) { + super( + `Collection schema for "${collectionName}" was not loaded before access — call getCollectionSchema first`, + 'An error occurred while accessing your data. Please try again.', + ); + } +} + export class WorkflowPortError extends WorkflowExecutorError { constructor(operation: string, cause: unknown) { super( diff --git a/packages/workflow-executor/src/executors/activity-log.ts b/packages/workflow-executor/src/executors/activity-log.ts new file mode 100644 index 0000000000..4749fb2bd5 --- /dev/null +++ b/packages/workflow-executor/src/executors/activity-log.ts @@ -0,0 +1,47 @@ +import type { ActivityLogPort, CreateActivityLogArgs } from '../ports/activity-log-port'; +import type { StepUser } from '../types/execution-context'; + +// The activity-log target minus renderingId, which track() stamps centrally. +export type AuditTarget = Omit; + +export type TrackOptions = { + operation: () => Promise; + // Runs between createPending and the operation — the executor's write-ahead marker. Optional: + // read operations have no marker to persist. + beforeCall?: () => Promise; +}; + +// Runs an operation while recording an activity-log entry around it (pending → success/failed). +// It both executes `operation` and owns the activity-log transitions, so callers never touch the +// ActivityLogPort directly. `beforeCall` runs after createPending, just before the operation, so +// an audit-creation failure never leaves an orphan write-ahead marker. +export default class ActivityLog { + private readonly activityLogPort: ActivityLogPort; + + private readonly user: StepUser; + + constructor(activityLogPort: ActivityLogPort, user: StepUser) { + this.activityLogPort = activityLogPort; + this.user = user; + } + + async track(target: AuditTarget, { operation, beforeCall }: TrackOptions): Promise { + const handle = await this.activityLogPort.createPending({ + renderingId: this.user.renderingId, + ...target, + }); + + try { + if (beforeCall) await beforeCall(); + const result = await operation(); + void this.activityLogPort.markSucceeded(handle); + + return result; + } catch (err) { + // The step error is logged/surfaced by base-step-executor when rethrown, so the audit + // transition only needs the handle. + void this.activityLogPort.markFailed(handle); + throw err; + } + } +} diff --git a/packages/workflow-executor/src/executors/agent-with-log.ts b/packages/workflow-executor/src/executors/agent-with-log.ts new file mode 100644 index 0000000000..b48dace014 --- /dev/null +++ b/packages/workflow-executor/src/executors/agent-with-log.ts @@ -0,0 +1,106 @@ +import type ActivityLog from './activity-log'; +import type { + AgentPort, + ExecuteActionQuery, + GetActionFormInfoQuery, + GetRecordQuery, + GetRelatedDataQuery, + GetSingleRelatedDataQuery, + UpdateRecordQuery, +} from '../ports/agent-port'; +import type SchemaResolver from '../schema-resolver'; +import type { StepUser } from '../types/execution-context'; +import type { RecordData } from '../types/validated/collection'; + +type WriteOptions = { beforeCall: () => Promise }; + +export interface AgentWithLogDeps { + agentPort: AgentPort; + schemaResolver: SchemaResolver; + user: StepUser; + activityLog: ActivityLog; +} + +// Wraps AgentPort and runs each data-access call through the ActivityLog so it records an +// activity-log entry. The audit target is derived from the call: the numeric collectionId is +// resolved from the call's collection name, the recordId from its id. Idempotency stays in the +// executors: write methods forward a `beforeCall` thunk (the executor's write-ahead marker). +export default class AgentWithLog { + private readonly agentPort: AgentPort; + + private readonly schemaResolver: SchemaResolver; + + private readonly user: StepUser; + + private readonly activityLog: ActivityLog; + + constructor(deps: AgentWithLogDeps) { + this.agentPort = deps.agentPort; + this.schemaResolver = deps.schemaResolver; + this.user = deps.user; + this.activityLog = deps.activityLog; + } + + async getRecord(query: GetRecordQuery): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.activityLog.track( + { action: 'index', type: 'read', collectionId, recordId: query.id }, + { operation: () => this.agentPort.getRecord(query, this.user) }, + ); + } + + async getRelatedData(query: GetRelatedDataQuery): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.activityLog.track( + { action: 'listRelatedData', type: 'read', collectionId, recordId: query.id }, + { operation: () => this.agentPort.getRelatedData(query, this.user) }, + ); + } + + async getSingleRelatedData(query: GetSingleRelatedDataQuery): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.activityLog.track( + { action: 'listRelatedData', type: 'read', collectionId, recordId: query.id }, + { operation: () => this.agentPort.getSingleRelatedData(query, this.user) }, + ); + } + + async updateRecord(query: UpdateRecordQuery, opts: WriteOptions): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.activityLog.track( + { action: 'update', type: 'write', collectionId, recordId: query.id }, + { + operation: () => this.agentPort.updateRecord(query, this.user), + beforeCall: opts.beforeCall, + }, + ); + } + + async executeAction(query: ExecuteActionQuery, opts: WriteOptions): Promise { + const collectionId = await this.resolveCollectionId(query.collection); + + return this.activityLog.track( + { action: 'action', type: 'write', collectionId, recordId: query.id }, + { + operation: () => this.agentPort.executeAction(query, this.user), + beforeCall: opts.beforeCall, + }, + ); + } + + // Unaudited passthrough: form-info is a read-only probe (does this action have a form?), + // not a data access, so unlike the methods above it records NO activity-log entry. + getActionFormInfo(query: GetActionFormInfoQuery): Promise<{ hasForm: boolean }> { + return this.agentPort.getActionFormInfo(query, this.user); + } + + private async resolveCollectionId(collectionName: string): Promise { + const schema = await this.schemaResolver.resolve(collectionName); + + return schema.collectionId; + } +} diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index d658897af5..be790f1035 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -1,5 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; -import type { AgentPort } from '../ports/agent-port'; import type { ExecutionContext, IStepExecutor, @@ -35,11 +33,8 @@ export default abstract class BaseStepExecutor; - protected readonly agentPort: AgentPort; - constructor(context: ExecutionContext) { this.context = context; - this.agentPort = context.agentPort; } async execute(): Promise { @@ -51,8 +46,8 @@ export default abstract class BaseStepExecutor { - const args = this.buildActivityLogArgs(); - if (!args) return this.runWithTimeout(); - - const handle = await this.context.activityLogPort.createPending(args); - - let result: StepExecutionResult; - - try { - result = await this.runWithTimeout(); - } catch (err) { - // Use userMessage (not the technical message) — errorMessage is rendered to end-users - // in the Forest Admin UI. Privacy: no collection/field/AI internals in the audit trail. - const errorMessage = - err instanceof WorkflowExecutorError ? err.userMessage : 'Unexpected error'; - void this.context.activityLogPort.markFailed(handle, errorMessage); - throw err; - } - - if (result.stepOutcome.status === 'error') { - void this.context.activityLogPort.markFailed( - handle, - result.stepOutcome.error ?? 'Step failed', - ); - } else { - void this.context.activityLogPort.markSucceeded(handle); - } - - return result; - } - // Promise.race doesn't abort the losing branch — it keeps running in the background. The .catch() // on execPromise must be attached BEFORE the race so a late rejection doesn't trigger // UnhandledPromiseRejection. Late resolutions are silently discarded. diff --git a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts index 540b6b6d52..2996f0ae69 100644 --- a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { LoadRelatedRecordCandidate, @@ -56,16 +55,6 @@ interface RelationTarget extends RelationRef { } export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'listRelatedData', - type: 'read', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected async doExecute(): Promise { // Branch A -- Re-entry after pending execution found in RunStore const pending = await this.patchAndReloadPendingData( @@ -276,16 +265,13 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor { - return this.agentPort.getRelatedData( - { - collection: target.selectedRecordRef.collectionName, - id: target.selectedRecordRef.recordId, - relation: target.name, - relatedSchema, - limit, - }, - this.context.user, - ); + return this.context.agent.getRelatedData({ + collection: target.selectedRecordRef.collectionName, + id: target.selectedRecordRef.recordId, + relation: target.name, + relatedSchema, + limit, + }); } /** Persists the loaded record ref and returns a success outcome. */ diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 8d7c157b1a..f37cfd4fc2 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { ExecutionContext, StepExecutionResult } from '../types/execution-context'; import type { McpStepExecutionData, McpToolCall } from '../types/step-execution-data'; import type { McpStepDefinition } from '../types/validated/step-definition'; @@ -46,16 +45,6 @@ export default class McpStepExecutor extends BaseStepExecutor }; } - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'action', - type: 'write', - collectionId: this.context.collectionId, - label: this.context.stepDefinition.mcpServerId, - }; - } - protected buildOutcomeResult(outcome: { status: RecordStepStatus; error?: string; @@ -126,20 +115,31 @@ export default class McpStepExecutor extends BaseStepExecutor const tool = tools.find(t => t.base.name === target.name && t.sourceId === target.sourceId); if (!tool) throw new McpToolNotFoundError(target.name); - await this.context.runStore.saveStepExecution(this.context.runId, { - ...existingExecution, - type: 'mcp', - stepIndex: this.context.stepIndex, - idempotencyPhase: 'executing', - }); - - let toolResult: unknown; - - try { - toolResult = await tool.base.invoke(target.input); - } catch (cause) { - throw new McpToolInvocationError(target.name, cause); - } + const toolResult = await this.context.activityLog.track( + { + action: 'action', + type: 'write', + label: this.context.stepDefinition.mcpServerId, + collectionId: this.context.collectionId, + recordId: this.context.baseRecordRef.recordId, + }, + { + operation: async () => { + try { + return await tool.base.invoke(target.input); + } catch (cause) { + throw new McpToolInvocationError(target.name, cause); + } + }, + beforeCall: () => + this.context.runStore.saveStepExecution(this.context.runId, { + ...existingExecution, + type: 'mcp', + stepIndex: this.context.stepIndex, + idempotencyPhase: 'executing', + }), + }, + ); // 1. Persist raw result immediately — safe state before any further network calls const baseExecutionResult = { success: true as const, toolResult }; @@ -160,12 +160,15 @@ export default class McpStepExecutor extends BaseStepExecutor try { formattedResponse = await this.formatToolResult(target, toolResult); } catch (cause) { - this.context.logger.error('Failed to format MCP tool result, using generic fallback', { - runId: this.context.runId, - stepIndex: this.context.stepIndex, - toolName: target.name, - cause: cause instanceof Error ? cause.message : String(cause), - }); + this.context.logger.error( + 'Failed to format MCP tool result, persisting raw result without summary', + { + runId: this.context.runId, + stepIndex: this.context.stepIndex, + toolName: target.name, + cause: cause instanceof Error ? cause.message : String(cause), + }, + ); } if (formattedResponse) { diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index 2e0f0f060c..8451151b8f 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { FieldReadResult } from '../types/step-execution-data'; import type { CollectionSchema, FieldSchema } from '../types/validated/collection'; @@ -19,16 +18,6 @@ Important rules: - Do not refer to yourself as "I" in the response, use a passive formulation instead.`; export default class ReadRecordStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'index', - type: 'read', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected async doExecute(): Promise { const { stepDefinition: step } = this.context; const { preRecordedArgs } = step; @@ -55,14 +44,11 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor s.requested)); } - const recordData = await this.agentPort.getRecord( - { - collection: selectedRecordRef.collectionName, - id: selectedRecordRef.recordId, - fields: resolvedFieldNames, - }, - this.context.user, - ); + const recordData = await this.context.agent.getRecord({ + collection: selectedRecordRef.collectionName, + id: selectedRecordRef.recordId, + fields: resolvedFieldNames, + }); const fieldResults = this.formatFieldResults(recordData.values, selectedFields); await this.context.runStore.saveStepExecution(this.context.runId, { diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index 6fc8524a96..6a68f8d7bc 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -71,17 +71,8 @@ export default abstract class RecordStepExecutor< return [this.context.baseRecordRef, ...relatedRecords]; } - protected async getCollectionSchema(collectionName: string): Promise { - const cached = this.context.schemaCache.get(collectionName); - if (cached) return cached; - - const schema = await this.context.workflowPort.getCollectionSchema( - collectionName, - this.context.runId, - ); - this.context.schemaCache.set(collectionName, schema); - - return schema; + protected getCollectionSchema(collectionName: string): Promise { + return this.context.schemaResolver.resolve(collectionName); } protected findFieldByTechnicalName( diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index 3fd1533df9..446a27dc3c 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -23,6 +23,9 @@ import type { } from '../types/validated/step-definition'; import { StepStateError, causeMessage, extractErrorMessage } from '../errors'; +import SchemaResolver from '../schema-resolver'; +import ActivityLog from './activity-log'; +import AgentWithLog from './agent-with-log'; import ConditionStepExecutor from './condition-step-executor'; import GuidanceStepExecutor from './guidance-step-executor'; import LoadRelatedRecordStepExecutor from './load-related-record-step-executor'; @@ -124,6 +127,9 @@ export default class StepExecutorFactory { activityLogPort: ActivityLogPort, incomingPendingData?: unknown, ): ExecutionContext { + const schemaResolver = new SchemaResolver(cfg.schemaCache, cfg.workflowPort, step.runId); + const activityLog = new ActivityLog(activityLogPort, step.user); + return { runId: step.runId, stepId: step.stepId, @@ -134,15 +140,19 @@ export default class StepExecutorFactory { previousSteps: step.previousSteps, user: step.user, model: cfg.aiModelPort.getModel(step.stepDefinition.aiConfigName), - agentPort: cfg.agentPort, - workflowPort: cfg.workflowPort, + agent: new AgentWithLog({ + agentPort: cfg.agentPort, + schemaResolver, + user: step.user, + activityLog, + }), + activityLog, runStore: cfg.runStore, - schemaCache: cfg.schemaCache, + schemaResolver, logger: cfg.logger, incomingPendingData, stepTimeoutMs: cfg.stepTimeoutMs, aiInvokeTimeoutMs: cfg.aiInvokeTimeoutMs, - activityLogPort, }; } } diff --git a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts index 54dfb58564..90af5e8127 100644 --- a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts +++ b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { ActionRef, TriggerRecordActionStepExecutionData } from '../types/step-execution-data'; import type { ActionSchema, CollectionSchema, RecordRef } from '../types/validated/collection'; @@ -29,22 +28,6 @@ interface ActionTarget extends ActionRef { } export default class TriggerRecordActionStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - // Skip when the frontend executes the action itself (non fully-automated mode). - // The front logs on its side via the standard agent activity flow. - if (this.context.stepDefinition.executionType !== StepExecutionMode.FullyAutomated) { - return null; - } - - return { - renderingId: this.context.user.renderingId, - action: 'action', - type: 'write', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected override async checkIdempotency(): Promise { const existing = await this.findPendingExecution( 'trigger-action', @@ -126,14 +109,11 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< // handle forms (no UI to fill them). Reject form-bearing actions here. When the // frontend is in the loop (Branch C), it handles the form natively so no check. if (step.executionType === StepExecutionMode.FullyAutomated) { - const { hasForm } = await this.agentPort.getActionFormInfo( - { - collection: selectedRecordRef.collectionName, - action: target.name, - id: selectedRecordRef.recordId, - }, - this.context.user, - ); + const { hasForm } = await this.context.agent.getActionFormInfo({ + collection: selectedRecordRef.collectionName, + action: target.name, + id: selectedRecordRef.recordId, + }); if (hasForm) throw new UnsupportedActionFormError(target.displayName); return this.executeOnExecutor(target); @@ -150,24 +130,25 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< return this.buildOutcomeResult({ status: 'awaiting-input' }); } - /** Branch B — executor runs the action via agentPort, then persists the result. */ + /** Branch B — executor runs the action via the audited agent, then persists the result. */ private async executeOnExecutor(target: ActionTarget): Promise { const { selectedRecordRef, displayName, name } = target; - await this.context.runStore.saveStepExecution(this.context.runId, { - type: 'trigger-action', - stepIndex: this.context.stepIndex, - selectedRecordRef, - idempotencyPhase: 'executing', - }); - - const actionResult = await this.agentPort.executeAction( + const actionResult = await this.context.agent.executeAction( { collection: selectedRecordRef.collectionName, action: name, id: selectedRecordRef.recordId, }, - this.context.user, + { + beforeCall: () => + this.context.runStore.saveStepExecution(this.context.runId, { + type: 'trigger-action', + stepIndex: this.context.stepIndex, + selectedRecordRef, + idempotencyPhase: 'executing', + }), + }, ); await this.context.runStore.saveStepExecution(this.context.runId, { diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 9b0f8c28ad..cfff409d62 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -1,4 +1,3 @@ -import type { CreateActivityLogArgs } from '../ports/activity-log-port'; import type { StepExecutionResult } from '../types/execution-context'; import type { FieldWithValue, UpdateRecordStepExecutionData } from '../types/step-execution-data'; import type { CollectionSchema, FieldSchema, RecordRef } from '../types/validated/collection'; @@ -127,16 +126,6 @@ interface UpdateTarget extends FieldWithValue { } export default class UpdateRecordStepExecutor extends RecordStepExecutor { - protected override buildActivityLogArgs(): CreateActivityLogArgs | null { - return { - renderingId: this.context.user.renderingId, - action: 'update', - type: 'write', - collectionId: this.context.collectionId, - recordId: this.context.baseRecordRef.recordId, - }; - } - protected override async checkIdempotency(): Promise { const existing = await this.findPendingExecution( 'update-record', @@ -263,21 +252,22 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor { const { selectedRecordRef, displayName, name, value } = target; - await this.context.runStore.saveStepExecution(this.context.runId, { - ...existingExecution, - type: 'update-record', - stepIndex: this.context.stepIndex, - selectedRecordRef, - idempotencyPhase: 'executing', - }); - - const updated = await this.agentPort.updateRecord( + const updated = await this.context.agent.updateRecord( { collection: selectedRecordRef.collectionName, id: selectedRecordRef.recordId, values: { [name]: value }, }, - this.context.user, + { + beforeCall: () => + this.context.runStore.saveStepExecution(this.context.runId, { + ...existingExecution, + type: 'update-record', + stepIndex: this.context.stepIndex, + selectedRecordRef, + idempotencyPhase: 'executing', + }), + }, ); await this.context.runStore.saveStepExecution(this.context.runId, { diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 14435b4850..7fa349a083 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -122,6 +122,7 @@ export { default as Runner } from './runner'; export type { RunnerConfig, RunnerState } from './runner'; export { default as validateSecrets } from './validate-secrets'; export { default as SchemaCache } from './schema-cache'; +export { default as SchemaResolver } from './schema-resolver'; export { default as InMemoryStore } from './stores/in-memory-store'; export { default as DatabaseStore } from './stores/database-store'; export type { DatabaseStoreOptions } from './stores/database-store'; diff --git a/packages/workflow-executor/src/ports/activity-log-port.ts b/packages/workflow-executor/src/ports/activity-log-port.ts index d12ab9ce63..33a9956a51 100644 --- a/packages/workflow-executor/src/ports/activity-log-port.ts +++ b/packages/workflow-executor/src/ports/activity-log-port.ts @@ -1,10 +1,12 @@ import type { RecordId } from '../types/validated/collection'; +import type { ActivityLogAction, ActivityLogType } from '@forestadmin/forestadmin-client'; export interface CreateActivityLogArgs { renderingId: number; - action: string; - type: 'read' | 'write'; - collectionId?: string; + action: ActivityLogAction; + type: ActivityLogType; + // Numeric Forest collection id; the adapter forwards it to the lib's `collectionName` param. + collectionId: string; recordId?: RecordId; label?: string; } @@ -15,11 +17,11 @@ export interface ActivityLogHandle { } // Per-run scoped port: token baked into the adapter's constructor. markSucceeded/markFailed -// retry transient failures internally and are invoked with `void` from base-step-executor. +// retry transient failures internally and are invoked with `void` from AgentWithLog. export interface ActivityLogPort { createPending(args: CreateActivityLogArgs): Promise; markSucceeded(handle: ActivityLogHandle): Promise; - markFailed(handle: ActivityLogHandle, errorMessage: string): Promise; + markFailed(handle: ActivityLogHandle): Promise; } // Produces per-run ActivityLogPort instances and exposes drain() at the process level so the diff --git a/packages/workflow-executor/src/schema-resolver.ts b/packages/workflow-executor/src/schema-resolver.ts new file mode 100644 index 0000000000..61212bd383 --- /dev/null +++ b/packages/workflow-executor/src/schema-resolver.ts @@ -0,0 +1,30 @@ +import type { WorkflowPort } from './ports/workflow-port'; +import type SchemaCache from './schema-cache'; +import type { CollectionSchema } from './types/validated/collection'; + +// Per-run schema resolution: binds the shared SchemaCache, the orchestrator port and the +// current runId once, so callers never thread a loader. Writes into the SAME SchemaCache +// instance AgentClientAgentPort reads from (get/iterate): the resolver always populates an +// entry before the agent-port reads it, so the agent-port's SchemaNotCachedError guard does +// not fire under normal TTLs (a TTL shorter than a single step's round-trip could still evict). +export default class SchemaResolver { + private readonly cache: SchemaCache; + private readonly workflowPort: WorkflowPort; + private readonly runId: string; + + constructor(cache: SchemaCache, workflowPort: WorkflowPort, runId: string) { + this.cache = cache; + this.workflowPort = workflowPort; + this.runId = runId; + } + + async resolve(collectionName: string): Promise { + const cached = this.cache.get(collectionName); + if (cached) return cached; + + const schema = await this.workflowPort.getCollectionSchema(collectionName, this.runId); + this.cache.set(collectionName, schema); + + return schema; + } +} diff --git a/packages/workflow-executor/src/types/execution-context.ts b/packages/workflow-executor/src/types/execution-context.ts index 28be4ce61f..b811af4496 100644 --- a/packages/workflow-executor/src/types/execution-context.ts +++ b/packages/workflow-executor/src/types/execution-context.ts @@ -1,9 +1,8 @@ -import type { ActivityLogPort } from '../ports/activity-log-port'; -import type { AgentPort } from '../ports/agent-port'; +import type ActivityLog from '../executors/activity-log'; +import type AgentWithLog from '../executors/agent-with-log'; import type { Logger } from '../ports/logger-port'; import type { RunStore } from '../ports/run-store'; -import type { WorkflowPort } from '../ports/workflow-port'; -import type SchemaCache from '../schema-cache'; +import type SchemaResolver from '../schema-resolver'; import type { RecordRef } from './validated/collection'; import type { AvailableStepExecution, Step, StepUser } from './validated/execution'; import type { StepDefinition } from './validated/step-definition'; @@ -30,15 +29,14 @@ export interface ExecutionContext readonly baseRecordRef: RecordRef; readonly stepDefinition: TStep; readonly model: BaseChatModel; - readonly agentPort: AgentPort; - readonly workflowPort: WorkflowPort; + readonly agent: AgentWithLog; + readonly activityLog: ActivityLog; readonly runStore: RunStore; readonly user: StepUser; - readonly schemaCache: SchemaCache; + readonly schemaResolver: SchemaResolver; readonly previousSteps: ReadonlyArray>; readonly logger: Logger; readonly incomingPendingData?: unknown; readonly stepTimeoutMs?: number; readonly aiInvokeTimeoutMs?: number; - readonly activityLogPort: ActivityLogPort; } diff --git a/packages/workflow-executor/src/types/validated/collection.ts b/packages/workflow-executor/src/types/validated/collection.ts index ba19381a0d..a972745b19 100644 --- a/packages/workflow-executor/src/types/validated/collection.ts +++ b/packages/workflow-executor/src/types/validated/collection.ts @@ -72,6 +72,7 @@ export type ActionSchema = z.infer; export const CollectionSchemaSchema = z .object({ collectionName: z.string().min(1), + collectionId: z.string().min(1), // null when the rendering has no explicit displayName configured — normalized to collectionName. collectionDisplayName: z.string().nullable(), primaryKeyFields: z.array(z.string().min(1)).min(1), diff --git a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts index 7066aefc0e..adb534881c 100644 --- a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts +++ b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts @@ -50,6 +50,7 @@ describe('AgentClientAgentPort', () => { const schemaCache = new SchemaCache(); schemaCache.set('users', { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -63,6 +64,7 @@ describe('AgentClientAgentPort', () => { }); schemaCache.set('orders', { collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', primaryKeyFields: ['tenantId', 'orderId'], fields: [ @@ -73,6 +75,7 @@ describe('AgentClientAgentPort', () => { }); schemaCache.set('posts', { collectionName: 'posts', + collectionId: 'col-posts', collectionDisplayName: 'Posts', primaryKeyFields: ['id'], fields: [ @@ -249,6 +252,7 @@ describe('AgentClientAgentPort', () => { describe('getRelatedData', () => { const postsSchema = { collectionName: 'posts', + collectionId: 'col-posts', collectionDisplayName: 'Posts', primaryKeyFields: ['id'], fields: [ @@ -458,6 +462,7 @@ describe('AgentClientAgentPort', () => { // that jsonapi-serializer emits as a nested object on the parent. const ordersSchema = { collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', primaryKeyFields: ['id'], fields: [ @@ -730,6 +735,7 @@ describe('AgentClientAgentPort', () => { const schemaCache = new SchemaCache(); schemaCache.set('users', { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [{ fieldName: 'id', displayName: 'id', isRelationship: false }], diff --git a/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts b/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts index 30c8a4eaca..56cd44385f 100644 --- a/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts +++ b/packages/workflow-executor/test/adapters/forest-server-workflow-port.test.ts @@ -591,6 +591,7 @@ describe('ForestServerWorkflowPort', () => { describe('getCollectionSchema', () => { const collectionSchema: CollectionSchema = { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [], @@ -626,6 +627,7 @@ describe('ForestServerWorkflowPort', () => { // Shape invalide : fields[0] manque fieldName (violation FieldSchema.fieldName.min(1)). mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [{ displayName: 'Email', isRelationship: false }], @@ -638,6 +640,7 @@ describe('ForestServerWorkflowPort', () => { it('strips unknown extra fields on the wire (orchestrator drift tolerance)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], referenceField: 'name', @@ -671,6 +674,7 @@ describe('ForestServerWorkflowPort', () => { it('defaults actions to [] when the orchestrator omits it', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [], @@ -684,6 +688,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts a field without type (omitted by the orchestrator)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [{ fieldName: 'email', displayName: 'Email', isRelationship: false }], @@ -700,6 +705,7 @@ describe('ForestServerWorkflowPort', () => { async displayName => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: displayName, primaryKeyFields: ['id'], fields: [], @@ -715,6 +721,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts relationType BelongsToMany (many-to-many relation)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -738,6 +745,7 @@ describe('ForestServerWorkflowPort', () => { it("strips the target key from relatedCollectionName (Forest 'collection.key' reference)", async () => { mockQuery.mockResolvedValue({ collectionName: 'accounts', + collectionId: 'col-accounts', collectionDisplayName: 'Accounts', primaryKeyFields: ['id'], fields: [ @@ -761,6 +769,7 @@ describe('ForestServerWorkflowPort', () => { it('leaves relatedCollectionName unchanged when it carries no target key (no dot)', async () => { mockQuery.mockResolvedValue({ collectionName: 'accounts', + collectionId: 'col-accounts', collectionDisplayName: 'Accounts', primaryKeyFields: ['id'], fields: [ @@ -784,6 +793,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts type File (Forest Admin extension)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -800,6 +810,7 @@ describe('ForestServerWorkflowPort', () => { it('accepts type [File] (array of files)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -821,6 +832,7 @@ describe('ForestServerWorkflowPort', () => { it('rejects enumValues: [] (empty enum is invalid)', async () => { mockQuery.mockResolvedValue({ collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ @@ -1033,6 +1045,7 @@ describe('ForestServerWorkflowPort', () => { it('getCollectionSchema retries on HTTP 408 (timeout)', async () => { const validSchema: CollectionSchema = { collectionName: 'users', + collectionId: 'col-users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], fields: [ diff --git a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts index fd5a89e465..3ad407023f 100644 --- a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts +++ b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port-factory.test.ts @@ -21,7 +21,12 @@ describe('ForestadminClientActivityLogPortFactory', () => { const factory = new ForestadminClientActivityLogPortFactory(service, makeLogger()); const port = factory.forRun('token-42'); - await port.createPending({ renderingId: 1, action: 'update', type: 'write' }); + await port.createPending({ + renderingId: 1, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); expect(port).toBeInstanceOf(ForestadminClientActivityLogPort); expect(service.createActivityLog).toHaveBeenCalledWith( diff --git a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts index 428206a2d9..dd11300c76 100644 --- a/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts +++ b/packages/workflow-executor/test/adapters/forestadmin-client-activity-log-port.test.ts @@ -54,7 +54,12 @@ describe('ForestadminClientActivityLogPort', () => { }); const port = makePort(service); - const handle = await port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const handle = await port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); expect(handle).toEqual({ id: 'log-1', index: '0' }); expect(service.createActivityLog).toHaveBeenCalledWith( @@ -89,7 +94,12 @@ describe('ForestadminClientActivityLogPort', () => { const logger = makeLogger(); const port = makePort(service, { logger }); - const promise = port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const promise = port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); await jest.advanceTimersByTimeAsync(100); const handle = await promise; @@ -106,7 +116,12 @@ describe('ForestadminClientActivityLogPort', () => { service.createActivityLog.mockRejectedValue(makeHttpError(502)); const port = makePort(service); - const promise = port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const promise = port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); const settled = promise.catch(err => err); await jest.advanceTimersByTimeAsync(2_600); const err = await settled; @@ -121,7 +136,12 @@ describe('ForestadminClientActivityLogPort', () => { const port = makePort(service); await expect( - port.createPending({ renderingId: 5, action: 'update', type: 'write' }), + port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }), ).rejects.toBeInstanceOf(ActivityLogCreationError); expect(service.createActivityLog).toHaveBeenCalledTimes(1); }); @@ -132,7 +152,12 @@ describe('ForestadminClientActivityLogPort', () => { const port = makePort(service); await expect( - port.createPending({ renderingId: 5, action: 'update', type: 'write' }), + port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }), ).rejects.toBeInstanceOf(ActivityLogCreationError); expect(service.createActivityLog).toHaveBeenCalledTimes(1); }); @@ -145,7 +170,12 @@ describe('ForestadminClientActivityLogPort', () => { .mockResolvedValueOnce({ id: 'log-3', attributes: { index: '2' } }); const port = makePort(service); - const promise = port.createPending({ renderingId: 5, action: 'update', type: 'write' }); + const promise = port.createPending({ + renderingId: 5, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); await jest.advanceTimersByTimeAsync(100); await expect(promise).resolves.toEqual({ id: 'log-3', index: '2' }); }); @@ -158,7 +188,12 @@ describe('ForestadminClientActivityLogPort', () => { }); const port = makePort(service); - await port.createPending({ renderingId: 42, action: 'update', type: 'write' }); + await port.createPending({ + renderingId: 42, + action: 'update', + type: 'write', + collectionId: 'col-1', + }); expect(service.createActivityLog).toHaveBeenCalledWith( expect.objectContaining({ renderingId: '42' }), @@ -232,14 +267,14 @@ describe('ForestadminClientActivityLogPort', () => { }); describe('markFailed', () => { - it('sends status: failed (no errorMessage — server schema rejects unknown fields) and retries on 503', async () => { + it('sends status: failed to the server without the local errorMessage, and retries on 503', async () => { const service = makeService(); service.updateActivityLogStatus .mockRejectedValueOnce(makeHttpError(503)) .mockResolvedValueOnce(undefined); const port = makePort(service); - const promise = port.markFailed({ id: 'log-1', index: '0' }, 'boom'); + const promise = port.markFailed({ id: 'log-1', index: '0' }); await jest.advanceTimersByTimeAsync(100); await promise; @@ -254,21 +289,18 @@ describe('ForestadminClientActivityLogPort', () => { ); }); - it('swallows errors after retries are exhausted (fire-and-forget) and logs with stepErrorMessage', async () => { + it('swallows errors after retries are exhausted (fire-and-forget) and logs the failure', async () => { const service = makeService(); service.updateActivityLogStatus.mockRejectedValue(makeHttpError(503)); const logger = makeLogger(); const port = makePort(service, { logger }); - const promise = port.markFailed({ id: 'log-1', index: '0' }, 'step-error-msg'); + const promise = port.markFailed({ id: 'log-1', index: '0' }); await jest.advanceTimersByTimeAsync(2_600); await expect(promise).resolves.toBeUndefined(); expect(logger.error).toHaveBeenCalledWith( 'activity log mark-as-failed failed', - expect.objectContaining({ - handleId: 'log-1', - stepErrorMessage: 'step-error-msg', - }), + expect.objectContaining({ handleId: 'log-1' }), ); }); @@ -279,7 +311,7 @@ describe('ForestadminClientActivityLogPort', () => { .mockResolvedValueOnce(undefined); const port = makePort(service); - const promise = port.markFailed({ id: 'log-1', index: '0' }, 'boom'); + const promise = port.markFailed({ id: 'log-1', index: '0' }); await jest.advanceTimersByTimeAsync(100); await expect(promise).resolves.toBeUndefined(); expect(service.updateActivityLogStatus).toHaveBeenCalledTimes(2); @@ -327,7 +359,7 @@ describe('ForestadminClientActivityLogPort', () => { const drainer = new ActivityLogDrainer(); const port = makePort(service, { drainer }); - const markPromise = port.markFailed({ id: 'log-1', index: '0' }, 'boom'); + const markPromise = port.markFailed({ id: 'log-1', index: '0' }); let drainResolved = false; const drainPromise = drainer.drain().then(() => { diff --git a/packages/workflow-executor/test/executors/activity-log.test.ts b/packages/workflow-executor/test/executors/activity-log.test.ts new file mode 100644 index 0000000000..ae40901dfb --- /dev/null +++ b/packages/workflow-executor/test/executors/activity-log.test.ts @@ -0,0 +1,131 @@ +import type { AuditTarget } from '../../src/executors/activity-log'; +import type { StepUser } from '../../src/types/execution-context'; + +import { NoRecordsError } from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; + +function makeUser(): StepUser { + return { + id: 1, + email: 'test@example.com', + firstName: 'Test', + lastName: 'User', + team: 'admin', + renderingId: 1, + role: 'admin', + permissionLevel: 'admin', + tags: {}, + } as StepUser; +} + +function makeActivityLogPort() { + return { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; +} + +const TARGET: AuditTarget = { + action: 'action', + type: 'write', + label: 'my-mcp-server', + collectionId: 'col-1', + recordId: [7], +}; + +describe('ActivityLog', () => { + describe('track', () => { + it('stamps renderingId and emits pending → succeeded around the operation', async () => { + const port = makeActivityLogPort(); + const activityLog = new ActivityLog(port, makeUser()); + + const result = await activityLog.track(TARGET, { operation: async () => 'done' }); + + expect(result).toBe('done'); + expect(port.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'action', + type: 'write', + label: 'my-mcp-server', + collectionId: 'col-1', + recordId: [7], + }); + expect(port.markSucceeded).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(port.markFailed).not.toHaveBeenCalled(); + }); + + it('runs beforeCall between createPending and the operation', async () => { + const order: string[] = []; + const port = makeActivityLogPort(); + (port.createPending as jest.Mock).mockImplementation(async () => { + order.push('createPending'); + + return { id: 'log-1', index: '0' }; + }); + const activityLog = new ActivityLog(port, makeUser()); + + await activityLog.track(TARGET, { + operation: async () => { + order.push('operation'); + + return 'x'; + }, + beforeCall: async () => { + order.push('beforeCall'); + }, + }); + + expect(order).toEqual(['createPending', 'beforeCall', 'operation']); + }); + + it('does not run beforeCall or the operation when createPending throws', async () => { + const port = makeActivityLogPort(); + (port.createPending as jest.Mock).mockRejectedValue(new Error('audit down')); + const beforeCall = jest.fn().mockResolvedValue(undefined); + const operation = jest.fn().mockResolvedValue('x'); + const activityLog = new ActivityLog(port, makeUser()); + + await expect(activityLog.track(TARGET, { operation, beforeCall })).rejects.toThrow( + 'audit down', + ); + expect(beforeCall).not.toHaveBeenCalled(); + expect(operation).not.toHaveBeenCalled(); + }); + + it('marks failed and rethrows when beforeCall throws — the operation never runs', async () => { + const port = makeActivityLogPort(); + const operation = jest.fn().mockResolvedValue('x'); + const activityLog = new ActivityLog(port, makeUser()); + + await expect( + activityLog.track(TARGET, { + operation, + beforeCall: async () => { + throw new Error('marker save failed'); + }, + }), + ).rejects.toThrow('marker save failed'); + + expect(operation).not.toHaveBeenCalled(); + expect(port.markFailed).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(port.markSucceeded).not.toHaveBeenCalled(); + }); + + it('marks failed (not succeeded) and rethrows the original error', async () => { + const port = makeActivityLogPort(); + const activityLog = new ActivityLog(port, makeUser()); + + await expect( + activityLog.track(TARGET, { + operation: async () => { + throw new NoRecordsError(); + }, + }), + ).rejects.toBeInstanceOf(NoRecordsError); + + expect(port.markFailed).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(port.markSucceeded).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/workflow-executor/test/executors/agent-with-log.test.ts b/packages/workflow-executor/test/executors/agent-with-log.test.ts new file mode 100644 index 0000000000..48608fc270 --- /dev/null +++ b/packages/workflow-executor/test/executors/agent-with-log.test.ts @@ -0,0 +1,190 @@ +import type { AgentWithLogDeps } from '../../src/executors/agent-with-log'; +import type { AgentPort } from '../../src/ports/agent-port'; +import type SchemaResolver from '../../src/schema-resolver'; +import type { StepUser } from '../../src/types/execution-context'; +import type { CollectionSchema } from '../../src/types/validated/collection'; + +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; + +function makeUser(): StepUser { + return { + id: 1, + email: 'test@example.com', + firstName: 'Test', + lastName: 'User', + team: 'admin', + renderingId: 1, + role: 'admin', + permissionLevel: 'admin', + tags: {}, + } as StepUser; +} + +function makeSchema(collectionId = 'col-customers'): CollectionSchema { + return { + collectionName: 'customers', + collectionId, + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + referenceField: null, + fields: [], + actions: [], + }; +} + +function makeActivityLogPort() { + return { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; +} + +function makeDeps(overrides: Partial = {}) { + const activityLogPort = makeActivityLogPort(); + const agentPort = { + getRecord: jest + .fn() + .mockResolvedValue({ collectionName: 'customers', recordId: [42], values: {} }), + updateRecord: jest + .fn() + .mockResolvedValue({ collectionName: 'customers', recordId: [42], values: {} }), + getRelatedData: jest.fn().mockResolvedValue([]), + getSingleRelatedData: jest.fn().mockResolvedValue(null), + executeAction: jest.fn().mockResolvedValue({ ok: true }), + getActionFormInfo: jest.fn().mockResolvedValue({ hasForm: true }), + } as unknown as AgentPort; + const schemaResolver = { + resolve: jest.fn().mockResolvedValue(makeSchema()), + } as unknown as SchemaResolver; + + const deps = { + agentPort, + schemaResolver, + user: makeUser(), + activityLog: new ActivityLog(activityLogPort, makeUser()), + ...overrides, + }; + + return { deps, agentPort, activityLogPort, schemaResolver }; +} + +describe('AgentWithLog', () => { + describe('read methods', () => { + it('logs getRecord as index/read against the call target and returns the data', async () => { + const { deps, agentPort, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + const result = await agent.getRecord({ collection: 'customers', id: [42], fields: ['name'] }); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'index', + type: 'read', + collectionId: 'col-customers', + recordId: [42], + }); + expect(activityLogPort.markSucceeded).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(agentPort.getRecord).toHaveBeenCalledWith( + { collection: 'customers', id: [42], fields: ['name'] }, + expect.objectContaining({ id: 1 }), + ); + expect(result).toEqual({ collectionName: 'customers', recordId: [42], values: {} }); + }); + + it('logs getRelatedData as listRelatedData/read', async () => { + const { deps, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + await agent.getRelatedData({ + collection: 'customers', + id: [42], + relation: 'orders', + relatedSchema: makeSchema('col-orders'), + limit: 50, + }); + + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ action: 'listRelatedData', type: 'read', recordId: [42] }), + ); + }); + + it('logs getSingleRelatedData as listRelatedData/read (xToOne)', async () => { + const { deps, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + await agent.getSingleRelatedData({ + collection: 'customers', + id: [42], + relation: 'order', + relatedSchema: makeSchema('col-orders'), + }); + + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ action: 'listRelatedData', type: 'read', recordId: [42] }), + ); + }); + }); + + describe('write methods', () => { + it('logs updateRecord as update/write and forwards beforeCall before the side effect', async () => { + const order: string[] = []; + const { deps, agentPort, activityLogPort } = makeDeps(); + (agentPort.updateRecord as jest.Mock).mockImplementation(async () => { + order.push('updateRecord'); + + return { collectionName: 'customers', recordId: [42], values: {} }; + }); + const agent = new AgentWithLog(deps); + + await agent.updateRecord( + { collection: 'customers', id: [42], values: { name: 'X' } }, + { + beforeCall: async () => { + order.push('beforeCall'); + }, + }, + ); + + expect(order).toEqual(['beforeCall', 'updateRecord']); + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ action: 'update', type: 'write', recordId: [42] }), + ); + }); + + it('logs executeAction as action/write', async () => { + const { deps, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + await agent.executeAction( + { collection: 'customers', action: 'send-email', id: [42] }, + { beforeCall: async () => undefined }, + ); + + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ action: 'action', type: 'write', recordId: [42] }), + ); + }); + }); + + describe('getActionFormInfo (unaudited passthrough)', () => { + it('forwards to the agent port with the injected user and emits no activity log', async () => { + const { deps, agentPort, activityLogPort } = makeDeps(); + const agent = new AgentWithLog(deps); + + const result = await agent.getActionFormInfo({ + collection: 'customers', + action: 'send-email', + id: [42], + }); + + expect(agentPort.getActionFormInfo).toHaveBeenCalledWith( + { collection: 'customers', action: 'send-email', id: [42] }, + expect.objectContaining({ id: 1 }), + ); + expect(result).toEqual({ hasForm: true }); + expect(activityLogPort.createPending).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 0b210265f7..1e2b3c66ae 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -1,6 +1,9 @@ /* eslint-disable max-classes-per-file */ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; +import type { AgentPort } from '../../src/ports/agent-port'; import type { Logger } from '../../src/ports/logger-port'; import type { RunStore } from '../../src/ports/run-store'; +import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext, StepExecutionResult } from '../../src/types/execution-context'; import type { StepExecutionData } from '../../src/types/step-execution-data'; import type { RecordRef } from '../../src/types/validated/collection'; @@ -19,10 +22,12 @@ import { NoRecordsError, RunStorePortError, StepStateError, - WorkflowExecutorError, } from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import BaseStepExecutor from '../../src/executors/base-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; /** Concrete subclass that exposes protected methods for testing. */ @@ -96,7 +101,7 @@ function makeMockLogger(): Logger { return { info: jest.fn(), warn: jest.fn(), error: jest.fn() }; } -function makeMockActivityLogPort(): ExecutionContext['activityLogPort'] { +function makeMockActivityLogPort(): ActivityLogPort { return { createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), markSucceeded: jest.fn().mockResolvedValue(undefined), @@ -104,9 +109,20 @@ function makeMockActivityLogPort(): ExecutionContext['activityLogPort'] { }; } -function makeContext(overrides: Partial = {}): ExecutionContext { - return { - runId: 'run-1', +function makeContext( + overrides: Partial & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, +): ExecutionContext { + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? ({} as WorkflowPort); + const schemaCache = new SchemaCache(); + + const base: Omit = { + runId, stepId: 'step-0', stepIndex: 0, collectionId: 'col-1', @@ -122,8 +138,6 @@ function makeContext(overrides: Partial = {}): ExecutionContex prompt: 'Pick one', }, model: {} as ExecutionContext['model'], - agentPort: {} as ExecutionContext['agentPort'], - workflowPort: {} as ExecutionContext['workflowPort'], runStore: makeMockRunStore(), user: { id: 1, @@ -136,13 +150,28 @@ function makeContext(overrides: Partial = {}): ExecutionContex permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: makeMockLogger(), - - activityLogPort: makeMockActivityLogPort(), ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog(overrides.activityLogPort ?? makeMockActivityLogPort(), base.user); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: overrides.agentPort ?? ({} as AgentPort), + schemaResolver: base.schemaResolver, + user: base.user, + activityLog, + }), + }; } describe('BaseStepExecutor', () => { @@ -644,181 +673,6 @@ describe('BaseStepExecutor', () => { }); }); - describe('activity log lifecycle', () => { - class LoggedExecutor extends BaseStepExecutor { - constructor(context: ExecutionContext, private readonly errorToThrow?: unknown) { - super(context); - } - - protected override buildActivityLogArgs() { - return { - renderingId: 1, - action: 'update', - type: 'write' as const, - collectionId: 'col-1', - recordId: [42], - }; - } - - protected async doExecute(): Promise { - if (this.errorToThrow !== undefined) throw this.errorToThrow; - - return this.buildOutcomeResult({ status: 'success' }); - } - - protected buildOutcomeResult(outcome: { - status: BaseStepStatus; - error?: string; - }): StepExecutionResult { - return { - stepOutcome: { - type: 'record', - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - status: outcome.status, - ...(outcome.error !== undefined && { error: outcome.error }), - }, - }; - } - } - - it('creates pending log, runs doExecute, then marks succeeded on success', async () => { - const context = makeContext(); - const executor = new LoggedExecutor(context); - - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('success'); - expect(context.activityLogPort.createPending).toHaveBeenCalledWith( - expect.objectContaining({ - action: 'update', - type: 'write', - collectionId: 'col-1', - }), - ); - expect(context.activityLogPort.markSucceeded).toHaveBeenCalledWith({ - id: 'log-1', - index: '0', - }); - expect(context.activityLogPort.markFailed).not.toHaveBeenCalled(); - }); - - it('marks failed when doExecute throws a WorkflowExecutorError', async () => { - const context = makeContext(); - const executor = new LoggedExecutor(context, new NoRecordsError()); - - await executor.execute(); - - expect(context.activityLogPort.markFailed).toHaveBeenCalledWith( - { id: 'log-1', index: '0' }, - 'No records available', - ); - expect(context.activityLogPort.markSucceeded).not.toHaveBeenCalled(); - }); - - it('fails the step and does NOT run doExecute when createPending throws ActivityLogCreationError', async () => { - // eslint-disable-next-line @typescript-eslint/no-var-requires, global-require - const { ActivityLogCreationError } = require('../../src/errors'); - const context = makeContext(); - (context.activityLogPort.createPending as jest.Mock).mockRejectedValue( - new ActivityLogCreationError(new Error('net')), - ); - const doExecuteSpy = jest.fn().mockResolvedValue({ - stepOutcome: { type: 'record', stepId: 'x', stepIndex: 0, status: 'success' }, - }); - - class NeverRunExecutor extends LoggedExecutor { - protected override async doExecute(): Promise { - return doExecuteSpy(); - } - } - - const executor = new NeverRunExecutor(context); - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(result.stepOutcome.error).toBe( - 'Could not record this step in the audit log. Please try again, or contact your administrator if the problem persists.', - ); - expect(doExecuteSpy).not.toHaveBeenCalled(); - }); - - it('does NOT create pending log when buildActivityLogArgs returns null (default)', async () => { - const context = makeContext(); - const executor = new TestableExecutor(context); - - await executor.execute(); - - expect(context.activityLogPort.createPending).not.toHaveBeenCalled(); - expect(context.activityLogPort.markSucceeded).not.toHaveBeenCalled(); - }); - - it('calls markFailed with userMessage (not the technical message) on WorkflowExecutorError', async () => { - class DualMessageError extends WorkflowExecutorError { - constructor() { - super( - 'Internal: datasource "customers" returned no record for pk=42', - 'The record no longer exists.', - ); - } - } - const context = makeContext(); - const executor = new LoggedExecutor(context, new DualMessageError()); - - await executor.execute(); - - expect(context.activityLogPort.markFailed).toHaveBeenCalledWith( - { id: 'log-1', index: '0' }, - 'The record no longer exists.', - ); - }); - - it('marks failed when doExecute returns an error outcome without throwing', async () => { - class ErrorOutcomeExecutor extends BaseStepExecutor { - protected override buildActivityLogArgs() { - return { - renderingId: 1, - action: 'update', - type: 'write' as const, - collectionName: 'customers', - recordId: [42], - }; - } - - protected async doExecute(): Promise { - return this.buildOutcomeResult({ status: 'error', error: 'soft failure' }); - } - - protected buildOutcomeResult(outcome: { - status: BaseStepStatus; - error?: string; - }): StepExecutionResult { - return { - stepOutcome: { - type: 'record', - stepId: this.context.stepId, - stepIndex: this.context.stepIndex, - status: outcome.status, - ...(outcome.error !== undefined && { error: outcome.error }), - }, - }; - } - } - - const context = makeContext(); - const executor = new ErrorOutcomeExecutor(context); - - const result = await executor.execute(); - - expect(result.stepOutcome.status).toBe('error'); - expect(context.activityLogPort.markFailed).toHaveBeenCalledWith( - { id: 'log-1', index: '0' }, - 'soft failure', - ); - expect(context.activityLogPort.markSucceeded).not.toHaveBeenCalled(); - }); - }); - describe('invokeWithTool', () => { function makeMockModel(response: unknown) { const invoke = jest.fn().mockResolvedValue(response); diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index 83135f3378..63df5b8b11 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -1,12 +1,18 @@ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; +import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; +import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { RecordRef } from '../../src/types/validated/collection'; import type { ConditionStepDefinition } from '../../src/types/validated/step-definition'; import type { ConditionStepOutcome } from '../../src/types/validated/step-outcome'; import { RunStorePortError } from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import ConditionStepExecutor from '../../src/executors/condition-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep(overrides: Partial = {}): ConditionStepDefinition { @@ -42,10 +48,19 @@ function makeMockModel(toolCallArgs?: Record) { } function makeContext( - overrides: Partial> = {}, + overrides: Partial> & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, ): ExecutionContext { - return { - runId: 'run-1', + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? ({} as WorkflowPort); + const schemaCache = new SchemaCache(); + + const base: Omit, 'agent' | 'activityLog'> = { + runId, stepId: 'cond-1', stepIndex: 0, collectionId: 'col-1', @@ -56,8 +71,6 @@ function makeContext( } as RecordRef, stepDefinition: makeStep(), model: makeMockModel().model, - agentPort: {} as ExecutionContext['agentPort'], - workflowPort: {} as ExecutionContext['workflowPort'], runStore: makeMockRunStore(), user: { id: 1, @@ -70,17 +83,35 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, - - activityLogPort: { - createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), - markSucceeded: jest.fn().mockResolvedValue(undefined), - markFailed: jest.fn().mockResolvedValue(undefined), - }, ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog( + overrides.activityLogPort ?? { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }, + base.user, + ); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: overrides.agentPort ?? ({} as AgentPort), + schemaResolver: base.schemaResolver, + user: base.user, + activityLog, + }), + }; } describe('ConditionStepExecutor', () => { diff --git a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts index aa5d0b09ee..6a836d0fa4 100644 --- a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts @@ -1,11 +1,17 @@ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; +import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; +import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { RecordRef } from '../../src/types/validated/collection'; import type { GuidanceStepDefinition } from '../../src/types/validated/step-definition'; import type { GuidanceStepOutcome } from '../../src/types/validated/step-outcome'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import GuidanceStepExecutor from '../../src/executors/guidance-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeMockRunStore(overrides: Partial = {}): RunStore { @@ -19,10 +25,19 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { } function makeContext( - overrides: Partial> = {}, + overrides: Partial> & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, ): ExecutionContext { - return { - runId: 'run-1', + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? ({} as WorkflowPort); + const schemaCache = new SchemaCache(); + + const base: Omit, 'agent' | 'activityLog'> = { + runId, stepId: 'guidance-1', stepIndex: 0, collectionId: 'col-1', @@ -33,8 +48,6 @@ function makeContext( } as RecordRef, stepDefinition: { type: StepType.Guidance, executionType: StepExecutionMode.Manual }, model: {} as ExecutionContext['model'], - agentPort: {} as ExecutionContext['agentPort'], - workflowPort: {} as ExecutionContext['workflowPort'], runStore: makeMockRunStore(), user: { id: 1, @@ -47,17 +60,35 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, - - activityLogPort: { - createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), - markSucceeded: jest.fn().mockResolvedValue(undefined), - markFailed: jest.fn().mockResolvedValue(undefined), - }, ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog( + overrides.activityLogPort ?? { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }, + base.user, + ); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: overrides.agentPort ?? ({} as AgentPort), + schemaResolver: base.schemaResolver, + user: base.user, + activityLog, + }), + }; } describe('GuidanceStepExecutor', () => { diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index b02648e9e3..643f782ac5 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -1,3 +1,4 @@ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; @@ -8,8 +9,11 @@ import type { Step } from '../../src/types/validated/execution'; import type { LoadRelatedRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError } from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import LoadRelatedRecordStepExecutor from '../../src/executors/load-related-record-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep( @@ -73,6 +77,7 @@ function makeMockAgentPort(relatedData: RecordData[] = [makeRelatedRecordData()] function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -139,18 +144,25 @@ function makeMockModel(toolCallArgs?: Record, toolName = 'selec } function makeContext( - overrides: Partial> = {}, + overrides: Partial> & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, ): ExecutionContext { - return { - runId: 'run-1', + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + + const base: Omit, 'agent' | 'activityLog'> = { + runId, stepId: 'load-1', stepIndex: 0, collectionId: 'col-1', baseRecordRef: makeRecordRef(), stepDefinition: makeStep(), model: makeMockModel({ relationName: 'Order', reasoning: 'User requested order' }).model, - agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), user: { id: 1, @@ -163,17 +175,35 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, - - activityLogPort: { - createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), - markSucceeded: jest.fn().mockResolvedValue(undefined), - markFailed: jest.fn().mockResolvedValue(undefined), - }, ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog( + overrides.activityLogPort ?? { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }, + base.user, + ); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: overrides.agentPort ?? makeMockAgentPort(), + schemaResolver: base.schemaResolver, + user: base.user, + activityLog, + }), + }; } /** Builds a valid pending execution for Branch A tests. */ @@ -742,6 +772,58 @@ describe('LoadRelatedRecordStepExecutor', () => { }); }); + describe('operation activity log (PRD-442 #1)', () => { + it('logs listRelatedData against the source record and its collection, not the trigger', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const { model } = makeMockModel({ relationName: 'Order', reasoning: 'r' }); + const context = makeContext({ + model, + runStore, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new LoadRelatedRecordStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'listRelatedData', + type: 'read', + collectionId: 'col-customers', + recordId: [42], + }); + }); + + it('logs the relation read once on the awaiting-input (Branch C) path', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const { model } = makeMockModel({ relationName: 'Order', reasoning: 'r' }); + const context = makeContext({ model, runStore, activityLogPort }); + + const result = await new LoadRelatedRecordStepExecutor(context).execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(activityLogPort.createPending).toHaveBeenCalledTimes(1); + expect(activityLogPort.createPending).toHaveBeenCalledWith( + expect.objectContaining({ + action: 'listRelatedData', + type: 'read', + collectionId: 'col-customers', + recordId: [42], + }), + ); + }); + }); + describe('without executionType=FullyAutomated: awaiting-input (Branch C)', () => { it('saves AI suggestion in pendingData and returns awaiting-input (single record — no field/record AI calls)', async () => { const agentPort = makeMockAgentPort(); // returns 1 record: orders #99 diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index b057b6394f..c44ab7a5fc 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -1,3 +1,5 @@ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; +import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; @@ -7,8 +9,11 @@ import type { McpStepDefinition } from '../../src/types/validated/step-definitio import RemoteTool from '@forestadmin/ai-proxy/src/remote-tool'; import { RunStorePortError, StepStateError } from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import McpStepExecutor from '../../src/executors/mcp-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; // --------------------------------------------------------------------------- @@ -85,23 +90,25 @@ function makeMockModel(toolName: string, toolArgs: Record) { } function makeContext( - overrides: Partial> = {}, + overrides: Partial> & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, ): ExecutionContext { - return { - runId: 'run-1', + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + + const base: Omit, 'agent' | 'activityLog'> = { + runId, stepId: 'mcp-1', stepIndex: 0, collectionId: 'col-1', baseRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, stepDefinition: makeStep(), model: makeMockModel('send_notification', { message: 'Hello' }).model, - agentPort: { - getRecord: jest.fn(), - updateRecord: jest.fn(), - getRelatedData: jest.fn(), - executeAction: jest.fn(), - } as unknown as ExecutionContext['agentPort'], - workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), user: { id: 1, @@ -114,17 +121,42 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, - - activityLogPort: { - createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), - markSucceeded: jest.fn().mockResolvedValue(undefined), - markFailed: jest.fn().mockResolvedValue(undefined), - }, ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog( + overrides.activityLogPort ?? { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }, + base.user, + ); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: + overrides.agentPort ?? + ({ + getRecord: jest.fn(), + updateRecord: jest.fn(), + getRelatedData: jest.fn(), + executeAction: jest.fn(), + } as unknown as AgentPort), + schemaResolver: base.schemaResolver, + user: base.user, + activityLog, + }), + }; } // --------------------------------------------------------------------------- @@ -267,7 +299,7 @@ describe('McpStepExecutor', () => { }), ); expect(logger.error).toHaveBeenCalledWith( - 'Failed to format MCP tool result, using generic fallback', + 'Failed to format MCP tool result, persisting raw result without summary', expect.objectContaining({ toolName: 'send_notification' }), ); }); @@ -951,7 +983,7 @@ describe('McpStepExecutor', () => { }); describe('activity log', () => { - it('creates activity log with collectionId, renderingId, action, type and mcpServerId as label', async () => { + it('logs against the run base record with collectionId, renderingId, action, type and mcpServerId as label', async () => { const tool = new MockRemoteTool({ name: 'send_notification', sourceId: 'mcp-server-1' }); const { model } = makeMockModel('send_notification', { message: 'Hello' }); const activityLogPort = { @@ -977,6 +1009,7 @@ describe('McpStepExecutor', () => { action: 'action', type: 'write', collectionId: 'col-1', + recordId: [42], label: 'my-mcp-server', }); }); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index bf71c0ff8b..bc0c3cf88b 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -1,3 +1,4 @@ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; @@ -7,8 +8,11 @@ import type { Step } from '../../src/types/validated/execution'; import type { ReadRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, NoRecordsError, RecordNotFoundError } from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import ReadRecordStepExecutor from '../../src/executors/read-record-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep(overrides: Partial = {}): ReadRecordStepDefinition { @@ -49,6 +53,7 @@ function makeMockAgentPort( function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -105,19 +110,36 @@ function makeMockModel( return { model, bindTools, invoke }; } +function makeMockActivityLogPort(): ActivityLogPort { + return { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; +} + function makeContext( - overrides: Partial> = {}, + overrides: Partial> & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, ): ExecutionContext { - return { - runId: 'run-1', + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + const schemaResolver = + overrides.schemaResolver ?? new SchemaResolver(schemaCache, workflowPort, runId); + + const base: Omit, 'agent' | 'activityLog'> = { + runId, stepId: 'read-1', stepIndex: 0, collectionId: 'col-1', baseRecordRef: makeRecordRef(), stepDefinition: makeStep(), model: makeMockModel({ fieldNames: ['email'] }).model, - agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), user: { id: 1, @@ -130,17 +152,28 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver, previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, - - activityLogPort: { - createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), - markSucceeded: jest.fn().mockResolvedValue(undefined), - markFailed: jest.fn().mockResolvedValue(undefined), - }, ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog(overrides.activityLogPort ?? makeMockActivityLogPort(), base.user); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: overrides.agentPort ?? makeMockAgentPort(), + schemaResolver, + user: base.user, + activityLog, + }), + }; } function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { @@ -478,6 +511,7 @@ describe('ReadRecordStepExecutor', () => { const ordersSchema = makeCollectionSchema({ collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], }); @@ -525,12 +559,18 @@ describe('ReadRecordStepExecutor', () => { const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } }, }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort, + activityLogPort, previousSteps: [makeLoadRelatedPreviousStep(2)], }); const executor = new ReadRecordStepExecutor(context); @@ -550,6 +590,13 @@ describe('ReadRecordStepExecutor', () => { }), }), ); + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'index', + type: 'read', + collectionId: 'col-orders', + recordId: [99], + }); }); it('includes step index in select-record tool schema when records have stepIndex', async () => { diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index b30aae0c90..5d41864336 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -1,3 +1,4 @@ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; @@ -8,8 +9,11 @@ import type { Step } from '../../src/types/validated/execution'; import type { TriggerActionStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import TriggerRecordActionStepExecutor from '../../src/executors/trigger-record-action-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep( @@ -45,6 +49,7 @@ function makeMockAgentPort(): AgentPort { function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -105,10 +110,19 @@ function makeMockModel(toolCallArgs?: Record, toolName = 'selec } function makeContext( - overrides: Partial> = {}, + overrides: Partial> & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, ): ExecutionContext { - return { - runId: 'run-1', + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + + const base: Omit, 'agent' | 'activityLog'> = { + runId, stepId: 'trigger-1', stepIndex: 0, collectionId: 'col-1', @@ -118,8 +132,6 @@ function makeContext( actionName: 'Send Welcome Email', reasoning: 'User requested welcome email', }).model, - agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), user: { id: 1, @@ -132,17 +144,35 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, - - activityLogPort: { - createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), - markSucceeded: jest.fn().mockResolvedValue(undefined), - markFailed: jest.fn().mockResolvedValue(undefined), - }, ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog( + overrides.activityLogPort ?? { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }, + base.user, + ); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: overrides.agentPort ?? makeMockAgentPort(), + schemaResolver: base.schemaResolver, + user: base.user, + activityLog, + }), + }; } function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { @@ -206,6 +236,118 @@ describe('TriggerRecordActionStepExecutor', () => { }); }); + describe('operation activity log (PRD-442 #1)', () => { + it('logs the action against the acted record and its collection, not the trigger', async () => { + const agentPort = makeMockAgentPort(); + (agentPort.executeAction as jest.Mock).mockResolvedValue({ ok: true }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + agentPort, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new TriggerRecordActionStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'action', + type: 'write', + collectionId: 'col-customers', + recordId: [42], + }); + }); + + it('logs against a related record in another collection (cross-collection)', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionId: 'col-orders', + collectionDisplayName: 'Orders', + actions: [ + { + name: 'cancel-order', + displayName: 'Cancel Order', + endpoint: '/forest/actions/cancel-order', + }, + ], + }); + + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 2 - Orders #99' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'select-action', + args: { actionName: 'Cancel Order', reasoning: 'r' }, + id: 'c2', + }, + ], + }); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'load-related-record', + stepIndex: 2, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: relatedRecord, + }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const agentPort = makeMockAgentPort(); + (agentPort.executeAction as jest.Mock).mockResolvedValue({ ok: true }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + baseRecordRef, + model, + runStore, + agentPort, + activityLogPort, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }), + previousSteps: [makeLoadRelatedPreviousStep(2)], + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new TriggerRecordActionStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'action', + type: 'write', + collectionId: 'col-orders', + recordId: [99], + }); + }); + }); + describe('without executionType=FullyAutomated: awaiting-input (Branch C)', () => { it('saves pendingAction and returns awaiting-input', async () => { const mockModel = makeMockModel({ @@ -244,8 +386,14 @@ describe('TriggerRecordActionStepExecutor', () => { actionName: 'Send Welcome Email', reasoning: 'User requested welcome email', }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; const context = makeContext({ model: mockModel.model, + activityLogPort, stepDefinition: makeStep({ executionType: StepExecutionMode.AutomatedWithConfirmation, }), @@ -254,9 +402,9 @@ describe('TriggerRecordActionStepExecutor', () => { await executor.execute(); - expect(context.activityLogPort.createPending).not.toHaveBeenCalled(); - expect(context.activityLogPort.markSucceeded).not.toHaveBeenCalled(); - expect(context.activityLogPort.markFailed).not.toHaveBeenCalled(); + expect(activityLogPort.createPending).not.toHaveBeenCalled(); + expect(activityLogPort.markSucceeded).not.toHaveBeenCalled(); + expect(activityLogPort.markFailed).not.toHaveBeenCalled(); }); }); @@ -1075,9 +1223,11 @@ describe('TriggerRecordActionStepExecutor', () => { it('skips AI action selection when actionName is pre-recorded', async () => { const mockModel = makeMockModel(); const runStore = makeMockRunStore(); + const agentPort = makeMockAgentPort(); const context = makeContext({ model: mockModel.model, runStore, + agentPort, stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated, preRecordedArgs: { actionName: 'send-welcome-email' }, @@ -1089,7 +1239,7 @@ describe('TriggerRecordActionStepExecutor', () => { expect(result.stepOutcome.status).toBe('success'); expect(mockModel.bindTools).not.toHaveBeenCalled(); - expect(context.agentPort.executeAction).toHaveBeenCalledWith( + expect(agentPort.executeAction).toHaveBeenCalledWith( expect.objectContaining({ action: 'send-welcome-email' }), context.user, ); @@ -1157,9 +1307,11 @@ describe('TriggerRecordActionStepExecutor', () => { ], }), }); + const agentPort = makeMockAgentPort(); const context = makeContext({ runStore, workflowPort, + agentPort, stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated, preRecordedArgs: { actionName: 'archive' }, @@ -1171,7 +1323,7 @@ describe('TriggerRecordActionStepExecutor', () => { expect(result.stepOutcome.status).toBe('success'); // Triggers action A ('archive'), not action B ('send' / displayName 'archive'). - expect(context.agentPort.executeAction).toHaveBeenCalledWith( + expect(agentPort.executeAction).toHaveBeenCalledWith( expect.objectContaining({ action: 'archive' }), context.user, ); diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index d0b0f44043..cfa10fae8b 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -1,3 +1,4 @@ +import type { ActivityLogPort } from '../../src/ports/activity-log-port'; import type { AgentPort } from '../../src/ports/agent-port'; import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; @@ -7,9 +8,17 @@ import type { CollectionSchema, RecordRef } from '../../src/types/validated/coll import type { Step } from '../../src/types/validated/execution'; import type { UpdateRecordStepDefinition } from '../../src/types/validated/step-definition'; -import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; +import { + ActivityLogCreationError, + AgentPortError, + RunStorePortError, + StepStateError, +} from '../../src/errors'; +import ActivityLog from '../../src/executors/activity-log'; +import AgentWithLog from '../../src/executors/agent-with-log'; import UpdateRecordStepExecutor from '../../src/executors/update-record-step-executor'; import SchemaCache from '../../src/schema-cache'; +import SchemaResolver from '../../src/schema-resolver'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; function makeStep(overrides: Partial = {}): UpdateRecordStepDefinition { @@ -48,6 +57,7 @@ function makeMockAgentPort( function makeCollectionSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -103,10 +113,19 @@ function makeMockModel(toolCallArgs?: Record, toolName = 'updat } function makeContext( - overrides: Partial> = {}, + overrides: Partial> & { + agentPort?: AgentPort; + activityLogPort?: ActivityLogPort; + activityLog?: ActivityLog; + workflowPort?: WorkflowPort; + } = {}, ): ExecutionContext { - return { - runId: 'run-1', + const runId = overrides.runId ?? 'run-1'; + const workflowPort = overrides.workflowPort ?? makeMockWorkflowPort(); + const schemaCache = new SchemaCache(); + + const base: Omit, 'agent' | 'activityLog'> = { + runId, stepId: 'update-1', stepIndex: 0, collectionId: 'col-1', @@ -115,8 +134,6 @@ function makeContext( model: makeMockModel({ input: { fieldName: 'Status', value: 'active', reasoning: 'User requested status change' }, }).model, - agentPort: makeMockAgentPort(), - workflowPort: makeMockWorkflowPort(), runStore: makeMockRunStore(), user: { id: 1, @@ -129,17 +146,35 @@ function makeContext( permissionLevel: 'admin', tags: {}, }, - schemaCache: new SchemaCache(), + schemaResolver: new SchemaResolver(schemaCache, workflowPort, runId), previousSteps: [], logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, - - activityLogPort: { - createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), - markSucceeded: jest.fn().mockResolvedValue(undefined), - markFailed: jest.fn().mockResolvedValue(undefined), - }, ...overrides, }; + + const activityLog = + overrides.activityLog ?? + new ActivityLog( + overrides.activityLogPort ?? { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }, + base.user, + ); + + return { + ...base, + activityLog, + agent: + overrides.agent ?? + new AgentWithLog({ + agentPort: overrides.agentPort ?? makeMockAgentPort(), + schemaResolver: base.schemaResolver, + user: base.user, + activityLog, + }), + }; } function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { @@ -199,6 +234,159 @@ describe('UpdateRecordStepExecutor', () => { }); }); + describe('operation activity log (PRD-442 #1)', () => { + it('logs the update against the acted record and its collection, not the trigger', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + model: makeMockModel({ input: { fieldName: 'Status', value: 'active', reasoning: 'r' } }) + .model, + runStore, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new UpdateRecordStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'update', + type: 'write', + collectionId: 'col-customers', + recordId: [42], + }); + }); + + it('does not log the update while only awaiting confirmation', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + model: makeMockModel({ input: { fieldName: 'Status', value: 'active', reasoning: 'r' } }) + .model, + runStore, + activityLogPort, + }); + + const result = await new UpdateRecordStepExecutor(context).execute(); + + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(activityLogPort.createPending).not.toHaveBeenCalled(); + }); + + it('logs against a related record in another collection (cross-collection)', async () => { + const baseRecordRef = makeRecordRef({ stepIndex: 1 }); + const relatedRecord = makeRecordRef({ + stepIndex: 2, + recordId: [99], + collectionName: 'orders', + }); + const ordersSchema = makeCollectionSchema({ + collectionName: 'orders', + collectionId: 'col-orders', + collectionDisplayName: 'Orders', + fields: [ + { fieldName: 'total', displayName: 'Total', isRelationship: false, type: 'Number' }, + ], + }); + + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 2 - Orders #99' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { + name: 'update-record-field', + args: { input: { fieldName: 'Total', value: 200, reasoning: 'r' } }, + id: 'c2', + }, + ], + }); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'load-related-record', + stepIndex: 2, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: relatedRecord, + }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema, + }), + activityLogPort, + previousSteps: [makeLoadRelatedPreviousStep(2)], + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + await new UpdateRecordStepExecutor(context).execute(); + + expect(activityLogPort.createPending).toHaveBeenCalledWith({ + renderingId: 1, + action: 'update', + type: 'write', + collectionId: 'col-orders', + recordId: [99], + }); + }); + + it('does not persist the executing marker when the activity log cannot be created', async () => { + const runStore = makeMockRunStore(); + const activityLogPort = { + createPending: jest + .fn() + .mockRejectedValue(new ActivityLogCreationError(new Error('audit down'))), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ + runStore, + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + const result = await new UpdateRecordStepExecutor(context).execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'Could not record this step in the audit log. Please try again, or contact your administrator if the problem persists.', + ); + expect(runStore.saveStepExecution).not.toHaveBeenCalledWith( + 'run-1', + expect.objectContaining({ idempotencyPhase: 'executing' }), + ); + }); + }); + describe('without executionType=FullyAutomated: awaiting-input (Branch C)', () => { it('saves execution and returns awaiting-input', async () => { const mockModel = makeMockModel({ @@ -413,13 +601,20 @@ describe('UpdateRecordStepExecutor', () => { const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([execution]), }); - const context = makeContext({ agentPort, runStore }); + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const context = makeContext({ agentPort, runStore, activityLogPort }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); expect(result.stepOutcome.status).toBe('success'); expect(agentPort.updateRecord).not.toHaveBeenCalled(); + // No side effect happened → no audit-log entry (PRD-442 #2: no premature/duplicate log). + expect(activityLogPort.createPending).not.toHaveBeenCalled(); expect(runStore.saveStepExecution).toHaveBeenCalledWith( 'run-1', expect.objectContaining({ @@ -1179,9 +1374,11 @@ describe('UpdateRecordStepExecutor', () => { it('skips AI field selection when fieldName and value are pre-recorded', async () => { const mockModel = makeMockModel(); const runStore = makeMockRunStore(); + const agentPort = makeMockAgentPort(); const context = makeContext({ model: mockModel.model, runStore, + agentPort, stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated, preRecordedArgs: { fieldName: 'status', value: 'active' }, @@ -1193,7 +1390,7 @@ describe('UpdateRecordStepExecutor', () => { expect(result.stepOutcome.status).toBe('success'); expect(mockModel.bindTools).not.toHaveBeenCalled(); - expect(context.agentPort.updateRecord).toHaveBeenCalledWith( + expect(agentPort.updateRecord).toHaveBeenCalledWith( expect.objectContaining({ values: { status: 'active' } }), context.user, ); @@ -1299,9 +1496,11 @@ describe('UpdateRecordStepExecutor', () => { fields: [{ fieldName: 'age', displayName: 'Age', isRelationship: false, type: 'Number' }], }), }); + const agentPort = makeMockAgentPort(); const context = makeContext({ runStore, workflowPort, + agentPort, stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated, preRecordedArgs: { fieldName: 'age', value: 42 }, @@ -1312,7 +1511,7 @@ describe('UpdateRecordStepExecutor', () => { const result = await executor.execute(); expect(result.stepOutcome.status).toBe('success'); - expect(context.agentPort.updateRecord).toHaveBeenCalledWith( + expect(agentPort.updateRecord).toHaveBeenCalledWith( expect.objectContaining({ values: { age: 42 } }), context.user, ); @@ -1335,9 +1534,11 @@ describe('UpdateRecordStepExecutor', () => { ], }), }); + const agentPort = makeMockAgentPort(); const context = makeContext({ runStore, workflowPort, + agentPort, stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated, preRecordedArgs: { fieldName: 'status', value: 'active' }, @@ -1349,7 +1550,7 @@ describe('UpdateRecordStepExecutor', () => { expect(result.stepOutcome.status).toBe('success'); // Writes field A ('status'), not field B ('note' / displayName 'status'). - expect(context.agentPort.updateRecord).toHaveBeenCalledWith( + expect(agentPort.updateRecord).toHaveBeenCalledWith( expect.objectContaining({ values: { status: 'active' } }), context.user, ); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 2089b18bb4..08f8b99514 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -36,6 +36,7 @@ const STEP_USER: StepUser = { const COLLECTION_SCHEMA: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -48,6 +49,7 @@ const COLLECTION_SCHEMA: CollectionSchema = { const COLLECTION_SCHEMA_WITH_STATUS: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -59,6 +61,7 @@ const COLLECTION_SCHEMA_WITH_STATUS: CollectionSchema = { const COLLECTION_SCHEMA_WITH_ACTIONS: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [{ fieldName: 'id', displayName: 'Id', isRelationship: false, type: 'Number' }], @@ -69,6 +72,7 @@ const COLLECTION_SCHEMA_WITH_ACTIONS: CollectionSchema = { const COLLECTION_SCHEMA_WITH_RELATION: CollectionSchema = { collectionName: 'customers', + collectionId: 'col-customers', collectionDisplayName: 'Customers', primaryKeyFields: ['id'], fields: [ @@ -86,6 +90,7 @@ const COLLECTION_SCHEMA_WITH_RELATION: CollectionSchema = { const ORDERS_SCHEMA: CollectionSchema = { collectionName: 'orders', + collectionId: 'col-orders', collectionDisplayName: 'Orders', primaryKeyFields: ['id'], fields: [ diff --git a/packages/workflow-executor/test/schema-cache.test.ts b/packages/workflow-executor/test/schema-cache.test.ts index 216721640d..a65c152176 100644 --- a/packages/workflow-executor/test/schema-cache.test.ts +++ b/packages/workflow-executor/test/schema-cache.test.ts @@ -5,6 +5,7 @@ import SchemaCache from '../src/schema-cache'; function makeSchema(collectionName: string): CollectionSchema { return { collectionName, + collectionId: `col-${collectionName}`, collectionDisplayName: collectionName, primaryKeyFields: ['id'], fields: [], diff --git a/packages/workflow-executor/test/schema-resolver.test.ts b/packages/workflow-executor/test/schema-resolver.test.ts new file mode 100644 index 0000000000..dd338d766a --- /dev/null +++ b/packages/workflow-executor/test/schema-resolver.test.ts @@ -0,0 +1,65 @@ +import type { WorkflowPort } from '../src/ports/workflow-port'; +import type { CollectionSchema } from '../src/types/validated/collection'; + +import SchemaCache from '../src/schema-cache'; +import SchemaResolver from '../src/schema-resolver'; + +function makeSchema(collectionName: string): CollectionSchema { + return { + collectionName, + collectionId: `col-${collectionName}`, + collectionDisplayName: collectionName, + primaryKeyFields: ['id'], + fields: [], + actions: [], + }; +} + +function makeWorkflowPort(schema: CollectionSchema) { + return { + getCollectionSchema: jest.fn().mockResolvedValue(schema), + } as unknown as WorkflowPort & { getCollectionSchema: jest.Mock }; +} + +describe('SchemaResolver', () => { + it('returns the cached schema without calling the orchestrator on a hit', async () => { + const cache = new SchemaCache(); + const schema = makeSchema('customers'); + cache.set('customers', schema); + const workflowPort = makeWorkflowPort(makeSchema('other')); + const resolver = new SchemaResolver(cache, workflowPort, 'run-1'); + + const result = await resolver.resolve('customers'); + + expect(result).toBe(schema); + expect(workflowPort.getCollectionSchema).not.toHaveBeenCalled(); + }); + + it('fetches with the bound runId, caches, and skips the fetch on a subsequent hit', async () => { + const cache = new SchemaCache(); + const schema = makeSchema('orders'); + const workflowPort = makeWorkflowPort(schema); + const resolver = new SchemaResolver(cache, workflowPort, 'run-42'); + + const result = await resolver.resolve('orders'); + + expect(result).toBe(schema); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('orders', 'run-42'); + + // second call hits the cache — orchestrator not queried again + await resolver.resolve('orders'); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + }); + + it('writes the fetched schema into the shared cache (read back by other consumers)', async () => { + const cache = new SchemaCache(); + const schema = makeSchema('products'); + const resolver = new SchemaResolver(cache, makeWorkflowPort(schema), 'run-1'); + + await resolver.resolve('products'); + + // The same shared SchemaCache instance is what AgentClientAgentPort reads via .get(). + expect(cache.get('products')).toBe(schema); + }); +});