-
Notifications
You must be signed in to change notification settings - Fork 10
fix(workflow-executor): operation activity log targets the acted record (PRD-442 #1) #1628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Scra3
merged 21 commits into
feat/prd-214-server-step-mapper
from
fix/prd-442-activity-log-target
Jun 8, 2026
+1,495
−523
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
948d33e
fix(workflow-executor): operation activity log targets the acted reco…
fb7662d
refactor(workflow): extract OperationStepExecutor and migrate MCP to …
0ef60ae
refactor(workflow): address review findings on activity-log operation
4632412
refactor(workflow): extract activity-log audit into AgentWithLog
981cdd3
chore(workflow): address AgentWithLog review nits
63318a2
fix(workflow): fail loud when a collection schema is not cached
d9f5c31
refactor(workflow): fold schema cache-or-fetch into getOrLoad
8ba46f1
refactor(workflow): replace SchemaCache.getOrLoad with per-run Schema…
6d1bae2
refactor(workflow): drop errorMessage from ActivityLogPort.markFailed
f3fdeb4
test(workflow): cover beforeCall-throws in AgentWithLog audit
b69167d
refactor(workflow): drop dead schemaCache field from ExecutionContext
8ac4573
docs(workflow): drop markFailed comment
3eb8f30
refactor(workflow): build AgentWithLog in the factory, hide raw agent…
3b44f36
docs(workflow): drop comments leaking AgentWithLog internals
68e07c5
Merge remote-tracking branch 'origin/feat/prd-214-server-step-mapper'…
a7f536a
refactor(workflow): drop dead raw ports from ExecutionContext
6efee0f
docs(workflow): flag getActionFormInfo as unaudited, fix stale agentP…
ec949b6
Merge remote-tracking branch 'origin/feat/prd-214-server-step-mapper'…
e4e4186
refactor(workflow): thread WriteOptions through AgentWithLog audit
37f7048
refactor(workflow): extract ActivityLogger, decouple audit from Agent…
e41c488
refactor(workflow): rename ActivityLogger→ActivityLog, run→track, nam…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| import type { ActivityLogPort, CreateActivityLogArgs } from '../ports/activity-log-port'; | ||
| import type { StepUser } from '../types/execution-context'; | ||
|
|
||
| // The activity-log target minus renderingId, which track() stamps centrally. | ||
| export type AuditTarget = Omit<CreateActivityLogArgs, 'renderingId'>; | ||
|
|
||
| export type TrackOptions<T> = { | ||
| operation: () => Promise<T>; | ||
| // Runs between createPending and the operation — the executor's write-ahead marker. Optional: | ||
| // read operations have no marker to persist. | ||
| beforeCall?: () => Promise<void>; | ||
| }; | ||
|
|
||
| // Runs an operation while recording an activity-log entry around it (pending → success/failed). | ||
| // It both executes `operation` and owns the activity-log transitions, so callers never touch the | ||
| // ActivityLogPort directly. `beforeCall` runs after createPending, just before the operation, so | ||
| // an audit-creation failure never leaves an orphan write-ahead marker. | ||
| export default class ActivityLog { | ||
| private readonly activityLogPort: ActivityLogPort; | ||
|
|
||
| private readonly user: StepUser; | ||
|
|
||
| constructor(activityLogPort: ActivityLogPort, user: StepUser) { | ||
| this.activityLogPort = activityLogPort; | ||
| this.user = user; | ||
| } | ||
|
|
||
| async track<T>(target: AuditTarget, { operation, beforeCall }: TrackOptions<T>): Promise<T> { | ||
| const handle = await this.activityLogPort.createPending({ | ||
| renderingId: this.user.renderingId, | ||
| ...target, | ||
| }); | ||
|
|
||
| try { | ||
| if (beforeCall) await beforeCall(); | ||
| const result = await operation(); | ||
| void this.activityLogPort.markSucceeded(handle); | ||
|
|
||
| return result; | ||
| } catch (err) { | ||
| // The step error is logged/surfaced by base-step-executor when rethrown, so the audit | ||
| // transition only needs the handle. | ||
| void this.activityLogPort.markFailed(handle); | ||
| throw err; | ||
| } | ||
| } | ||
| } |
106 changes: 106 additions & 0 deletions
106
packages/workflow-executor/src/executors/agent-with-log.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| import type ActivityLog from './activity-log'; | ||
| import type { | ||
| AgentPort, | ||
| ExecuteActionQuery, | ||
| GetActionFormInfoQuery, | ||
| GetRecordQuery, | ||
| GetRelatedDataQuery, | ||
| GetSingleRelatedDataQuery, | ||
| UpdateRecordQuery, | ||
| } from '../ports/agent-port'; | ||
| import type SchemaResolver from '../schema-resolver'; | ||
| import type { StepUser } from '../types/execution-context'; | ||
| import type { RecordData } from '../types/validated/collection'; | ||
|
|
||
| type WriteOptions = { beforeCall: () => Promise<void> }; | ||
|
|
||
| export interface AgentWithLogDeps { | ||
| agentPort: AgentPort; | ||
| schemaResolver: SchemaResolver; | ||
| user: StepUser; | ||
| activityLog: ActivityLog; | ||
| } | ||
|
|
||
| // Wraps AgentPort and runs each data-access call through the ActivityLog so it records an | ||
| // activity-log entry. The audit target is derived from the call: the numeric collectionId is | ||
| // resolved from the call's collection name, the recordId from its id. Idempotency stays in the | ||
| // executors: write methods forward a `beforeCall` thunk (the executor's write-ahead marker). | ||
| export default class AgentWithLog { | ||
| private readonly agentPort: AgentPort; | ||
|
|
||
| private readonly schemaResolver: SchemaResolver; | ||
|
|
||
| private readonly user: StepUser; | ||
|
|
||
| private readonly activityLog: ActivityLog; | ||
|
|
||
| constructor(deps: AgentWithLogDeps) { | ||
| this.agentPort = deps.agentPort; | ||
| this.schemaResolver = deps.schemaResolver; | ||
| this.user = deps.user; | ||
| this.activityLog = deps.activityLog; | ||
| } | ||
|
|
||
| async getRecord(query: GetRecordQuery): Promise<RecordData> { | ||
| const collectionId = await this.resolveCollectionId(query.collection); | ||
|
|
||
| return this.activityLog.track( | ||
| { action: 'index', type: 'read', collectionId, recordId: query.id }, | ||
| { operation: () => this.agentPort.getRecord(query, this.user) }, | ||
| ); | ||
| } | ||
|
|
||
| async getRelatedData(query: GetRelatedDataQuery): Promise<RecordData[]> { | ||
| const collectionId = await this.resolveCollectionId(query.collection); | ||
|
|
||
| return this.activityLog.track( | ||
| { action: 'listRelatedData', type: 'read', collectionId, recordId: query.id }, | ||
| { operation: () => this.agentPort.getRelatedData(query, this.user) }, | ||
| ); | ||
| } | ||
|
|
||
| async getSingleRelatedData(query: GetSingleRelatedDataQuery): Promise<RecordData | null> { | ||
| const collectionId = await this.resolveCollectionId(query.collection); | ||
|
|
||
| return this.activityLog.track( | ||
| { action: 'listRelatedData', type: 'read', collectionId, recordId: query.id }, | ||
| { operation: () => this.agentPort.getSingleRelatedData(query, this.user) }, | ||
| ); | ||
| } | ||
|
|
||
| async updateRecord(query: UpdateRecordQuery, opts: WriteOptions): Promise<RecordData> { | ||
| const collectionId = await this.resolveCollectionId(query.collection); | ||
|
|
||
| return this.activityLog.track( | ||
| { action: 'update', type: 'write', collectionId, recordId: query.id }, | ||
| { | ||
| operation: () => this.agentPort.updateRecord(query, this.user), | ||
| beforeCall: opts.beforeCall, | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| async executeAction(query: ExecuteActionQuery, opts: WriteOptions): Promise<unknown> { | ||
| const collectionId = await this.resolveCollectionId(query.collection); | ||
|
|
||
| return this.activityLog.track( | ||
| { action: 'action', type: 'write', collectionId, recordId: query.id }, | ||
| { | ||
| operation: () => this.agentPort.executeAction(query, this.user), | ||
| beforeCall: opts.beforeCall, | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| // Unaudited passthrough: form-info is a read-only probe (does this action have a form?), | ||
| // not a data access, so unlike the methods above it records NO activity-log entry. | ||
| getActionFormInfo(query: GetActionFormInfoQuery): Promise<{ hasForm: boolean }> { | ||
| return this.agentPort.getActionFormInfo(query, this.user); | ||
| } | ||
|
|
||
| private async resolveCollectionId(collectionName: string): Promise<string> { | ||
| const schema = await this.schemaResolver.resolve(collectionName); | ||
|
|
||
| return schema.collectionId; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless