Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class CreateAgentMessageEventsTable1768000000000 implements MigrationInterface {
name = 'CreateAgentMessageEventsTable1768000000000';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE IF NOT EXISTS "agent_message_events" (
"id" uuid NOT NULL DEFAULT uuid_generate_v4(),
"agent_id" uuid NOT NULL,
"correlation_id" character varying(64) NOT NULL,
"sequence" integer NOT NULL,
"kind" character varying(64) NOT NULL,
"payload" jsonb NOT NULL,
"event_timestamp" timestamptz NOT NULL,
"created_at" TIMESTAMP NOT NULL DEFAULT now(),
"updated_at" TIMESTAMP NOT NULL DEFAULT now(),
CONSTRAINT "PK_agent_message_events_id" PRIMARY KEY ("id"),
CONSTRAINT "FK_agent_message_events_agent_id" FOREIGN KEY ("agent_id") REFERENCES "agents"("id") ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT "UQ_agent_message_events_agent_corr_seq" UNIQUE ("agent_id", "correlation_id", "sequence")
);
`);

await queryRunner.query(`
CREATE INDEX IF NOT EXISTS "IDX_agent_message_events_agent_corr" ON "agent_message_events" ("agent_id", "correlation_id");
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_agent_message_events_agent_corr";`);
await queryRunner.query(`DROP TABLE IF EXISTS "agent_message_events";`);
}
}
2 changes: 2 additions & 0 deletions apps/backend-agent-manager/src/typeorm.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
AgentEntity,
AgentEnvironmentVariableEntity,
AgentMessageEntity,
AgentMessageEventEntity,
DeploymentConfigurationEntity,
DeploymentRunEntity,
} from '@forepath/framework/backend';
Expand All @@ -26,6 +27,7 @@ export const typeormConfig: DataSourceOptions = {
AgentEntity,
AgentEnvironmentVariableEntity,
AgentMessageEntity,
AgentMessageEventEntity,
DeploymentConfigurationEntity,
DeploymentRunEntity,
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ sequenceDiagram

rect rgb(250, 255, 240)
Note over WSClient,RemoteWS: Forward Event (with Auto-Login)
Note over WSClient: Examples: "chat" (with payload {message, model?}), "fileUpdate", "logout", etc.
Note over WSClient: Examples: "chat" (payload {message, model?, correlationId?, responseMode?}), "fileUpdate", "logout", etc.
WSClient->>Gateway: forward<br/>{event: "chat" | "fileUpdate" | ..., payload: {...}, agentId?}
Gateway->>Gateway: clientId = selectedClientBySocket.get(socket.id)
alt No client selected
Expand Down Expand Up @@ -126,11 +126,11 @@ sequenceDiagram
Note over Gateway: Forwards event to remote agent-manager WebSocket
end
Gateway-->>WSClient: forwardAck<br/>{received: true, event}
Note over RemoteWS: Remote processes event and emits responses<br/>After login, chat history is automatically<br/>restored via chatMessage events<br/>For fileUpdate, broadcasts fileUpdateNotification to all clients
Note over RemoteWS: Remote processes event and emits responses<br/>After login, chat history is automatically<br/>restored via chatMessage events. When streaming/tool/question events are enabled, remote also emits chatEvent frames.<br/>For fileUpdate, broadcasts fileUpdateNotification to all clients
RemoteWS->>Gateway: onAny(event, ...args)
Gateway->>Gateway: socket.emit(event, ...args)
Gateway-->>WSClient: event<br/>{...args}
Note over WSClient: Receives forwarded responses from remote<br/>(e.g., loginSuccess, chatMessage, fileUpdateNotification events)
Note over WSClient: Receives forwarded responses from remote<br/>(e.g., loginSuccess, chatMessage, chatEvent, fileUpdateNotification events)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ components:
When event is "closeTerminal", the payload should contain {sessionId: string}.
When event is "enhanceChat", the payload should contain { message: string, correlationId: string, model?: string }.
When event is "generateTicketBody", the payload should contain { title: string, correlationId: string, model?: string }.
When event is "chat", the payload may include { correlationId?: string, responseMode?: \"single\"|\"stream\" } to enable streaming/tool events.
Response events forwarded from the agent-manager include "containerStats" (payload has status.running and optional stats).
agentId:
type: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Agent events (streaming, tools, questions)

The agent-manager websocket gateway emits **two** parallel streams for chat:

- **`chatMessage`**: legacy transcript events (user message + final assistant message), used for history restore and backwards compatibility.
- **`chatEvent`**: structured event stream for **streaming deltas**, **tool call lifecycles**, and **questions back to the user**.

This design allows existing clients/providers to keep working while enabling OpenCode-style UX where tools and questions are rendered explicitly.

## `chatEvent` envelope

Each `chatEvent` event carries a `SuccessResponse<AgentEventEnvelope>` payload (see `spec/asyncapi.yaml`).

Fields:

- **`eventId`**: UUID for the event.
- **`agentId`**: agent UUID the event belongs to.
- **`correlationId`**: groups events that belong to the same user request.
- **`sequence`**: monotonic integer scoped to `correlationId` (enables deterministic ordering).
- **`timestamp`**: ISO timestamp for the event.
- **`kind`**: one of:
- `userMessage`
- `thinking` (placeholder after the user message until deltas/tools arrive)
- `assistantDelta`
- `assistantMessage`
- `toolCall`
- `toolResult`
- `question`
- `status`
- `error`
- **`payload`**: kind-specific payload (JSON object).

## Persistence

Transcript messages are persisted to `agent_messages` as before.

Structured events are optionally persisted to **`agent_message_events`**:

- Stored: `userMessage`, `thinking`, `assistantMessage`, `toolCall`, `toolResult`, `question`, `status`, `error`
- Skipped by default: `assistantDelta` (high volume)

## Provider support

Providers expose capabilities via `getCapabilities()`:

- Providers like `cursor` and `opencode` can support streaming and structured events.
- Providers like `openclaw` intentionally do **not** support chat and should keep capabilities disabled.

## Mermaid

See `docs/agent-events.mmd` for a per-request event lifecycle diagram.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
sequenceDiagram
participant UI as UI_Client
participant Controller as AgentController_WS
participant Manager as AgentManager_WS
participant Provider as AgentProvider
participant Docker as DockerService

Note over UI,Docker: Structured chat events per correlationId

UI->>Controller: forward {event:"chat", payload:{message, responseMode:"stream", correlationId}}
Controller->>Manager: chat {message, responseMode:"stream", correlationId}

Manager->>Manager: emit chatMessage (user)
Manager-->>UI: chatMessage {from:"user", text, timestamp}

Manager->>Manager: emit chatEvent (userMessage)
Manager-->>UI: chatEvent {correlationId, sequence:0, kind:"userMessage", payload:{text}}

Manager->>Manager: emit chatEvent (thinking)
Manager-->>UI: chatEvent {kind:"thinking", payload:{}}

Manager->>Provider: sendMessageStream(...)
Provider->>Docker: execCommandStream(...)

loop while agent is responding
Docker-->>Provider: stdout chunk(s)
Provider-->>Manager: streamed chunk(s)
Manager->>Manager: parse -> AgentEventEnvelope
alt delta
Manager-->>UI: chatEvent {kind:"assistantDelta", payload:{delta}}
else tool call
Manager-->>UI: chatEvent {kind:"toolCall", payload:{toolCallId,name,args,status}}
Manager-->>UI: chatEvent {kind:"toolResult", payload:{toolCallId,name,result,isError}}
else question
Manager-->>UI: chatEvent {kind:"question", payload:{questionId,prompt,options}}
else final message frame
Manager-->>UI: chatEvent {kind:"assistantMessage", payload:{text}}
end
end

Manager->>Manager: aggregate final assistant transcript
Manager-->>UI: chatMessage {from:"agent", response:{type:"result", result:text}, timestamp}

Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ sequenceDiagram

rect rgb(250, 255, 240)
Note over WSClient,Docker: Chat Message Flow
WSClient->>Gateway: chat<br/>{message: "Hello", model?: "custom-model"}
WSClient->>Gateway: chat<br/>{message: "Hello", model?: "custom-model", correlationId?: "c1", responseMode?: "stream"}
Gateway->>Gateway: uuid = authenticatedClients.get(socket.id)
alt Not authenticated
Gateway-->>WSClient: error<br/>{success: false, error: {message, code}, timestamp}
Expand Down Expand Up @@ -219,6 +219,8 @@ sequenceDiagram

Gateway->>Gateway: server.emit('chatMessage')
Gateway-->>WSClient: chatMessage<br/>{success: true, data: {from: "agent", response: {...}, timestamp}, timestamp}
Note over Gateway: Additionally emits chatEvent frames when streaming/tool/question events are available
Gateway-->>WSClient: chatEvent<br/>{success: true, data: {eventId, correlationId, sequence, kind, payload, ...}, timestamp}
end
else JSON invalid
Note over Gateway,FilterFactory: Apply Outgoing Filters
Expand Down Expand Up @@ -255,6 +257,8 @@ sequenceDiagram

Gateway->>Gateway: server.emit('chatMessage')
Gateway-->>WSClient: chatMessage<br/>{success: true, data: {from: "agent", response: toParse, timestamp}, timestamp}
Note over Gateway: Additionally emits chatEvent frames when streaming/tool/question events are available
Gateway-->>WSClient: chatEvent<br/>{success: true, data: {eventId, correlationId, sequence, kind, payload, ...}, timestamp}
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ channels:
chatBroadcast:
$ref: '#/components/messages/ChatMessage'
description: Server broadcasts chat messages to all connected clients. Also used for chat history restoration after successful login (messages are emitted in chronological order to the authenticated client only).
agents/chatEvent:
address: agents/chatEvent
messages:
chatEventBroadcast:
$ref: '#/components/messages/ChatEvent'
description: >
Server broadcasts structured chat events (streaming deltas, tool calls/results, questions, and final assistant messages)
to all connected clients authenticated to the same agent. This is additive and does not replace chatMessage.
agents/messageFilterResult:
address: agents/messageFilterResult
messages:
Expand Down Expand Up @@ -200,6 +208,12 @@ operations:
$ref: '#/channels/agents~1chatMessage'
messages:
- $ref: '#/channels/agents~1chatMessage/messages/chatBroadcast'
serverBroadcastsChatEvent:
action: receive
channel:
$ref: '#/channels/agents~1chatEvent'
messages:
- $ref: '#/channels/agents~1chatEvent/messages/chatEventBroadcast'
serverBroadcastsMessageFilterResult:
action: receive
channel:
Expand Down Expand Up @@ -368,6 +382,13 @@ components:
model:
type: string
description: Optional model identifier (e.g., gpt-4o) forwarded to the agent container to override the default model for this chat request
correlationId:
type: string
description: Optional client-supplied correlation id for streaming/tool events; if omitted the server will generate one.
responseMode:
type: string
enum: [single, stream]
description: Optional response mode preference. When stream, the server may emit chatEvent deltas before the final chatMessage.
EnhanceChat:
name: EnhanceChat
title: Prompt enhancement command
Expand Down Expand Up @@ -569,6 +590,67 @@ components:
timestamp:
type: string
description: ISO timestamp
ChatEvent:
name: ChatEvent
title: Structured chat event broadcast
contentType: application/json
payload:
type: object
required: [success, data, timestamp]
properties:
success:
type: boolean
enum: [true]
data:
type: object
required:
[
eventId,
agentId,
correlationId,
sequence,
kind,
payload,
timestamp,
]
properties:
eventId:
type: string
description: Unique event id (uuid)
agentId:
type: string
description: Agent UUID
correlationId:
type: string
description: Correlates events that belong to a single user request
sequence:
type: integer
minimum: 0
description: Monotonic sequence number scoped to correlationId
timestamp:
type: string
format: date-time
description: Event timestamp (ISO)
kind:
type: string
enum:
[
userMessage,
thinking,
assistantDelta,
assistantMessage,
toolCall,
toolResult,
question,
status,
error,
]
payload:
type: object
description: Kind-specific payload (see documentation)
timestamp:
type: string
description: ISO timestamp
MessageFilterResult:
name: MessageFilterResult
title: Message filter result event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export * from './lib/dto/update-environment-variable.dto';
export * from './lib/dto/write-file.dto';
export * from './lib/entities/agent-environment-variable.entity';
export * from './lib/entities/agent-message.entity';
export * from './lib/entities/agent-message-event.entity';
export * from './lib/entities/agent.entity';
export * from './lib/entities/deployment-configuration.entity';
export * from './lib/entities/deployment-run.entity';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {
Column,
CreateDateColumn,
Entity,
Index,
JoinColumn,
ManyToOne,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from 'typeorm';
import { AgentEntity } from './agent.entity';

@Entity('agent_message_events')
@Index(['agentId', 'correlationId', 'sequence'], { unique: true })
export class AgentMessageEventEntity {
@PrimaryGeneratedColumn('uuid', { name: 'id' })
id!: string;

@Column({ type: 'uuid', name: 'agent_id' })
agentId!: string;

@ManyToOne(() => AgentEntity, { onDelete: 'CASCADE', onUpdate: 'CASCADE' })
@JoinColumn({ name: 'agent_id' })
agent!: AgentEntity;

@Column({ type: 'varchar', length: 64, name: 'correlation_id' })
correlationId!: string;

@Column({ type: 'int', name: 'sequence' })
sequence!: number;

@Column({ type: 'varchar', length: 64, name: 'kind' })
kind!: string;

@Column({ type: 'jsonb', name: 'payload' })
payload!: unknown;

@Column({ type: 'timestamptz', name: 'event_timestamp' })
eventTimestamp!: Date;

@CreateDateColumn({ name: 'created_at' })
createdAt!: Date;

@UpdateDateColumn({ name: 'updated_at' })
updatedAt!: Date;
}
Loading
Loading