Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { MigrationInterface, QueryRunner, Table, TableIndex, TableUnique } from 'typeorm';

/**
* Per-user read cursors for agent console environment chat notifications.
*/
export class CreateUserEnvironmentReadStateTable1774000000000 implements MigrationInterface {
name = 'CreateUserEnvironmentReadStateTable1774000000000';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: 'user_environment_read_state',
columns: [
{
name: 'id',
type: 'uuid',
isPrimary: true,
generationStrategy: 'uuid',
default: 'uuid_generate_v4()',
},
{
name: 'user_id',
type: 'varchar',
length: '64',
isNullable: false,
},
{
name: 'client_id',
type: 'uuid',
isNullable: false,
},
{
name: 'agent_id',
type: 'uuid',
isNullable: false,
},
{
name: 'last_read_at',
type: 'timestamptz',
isNullable: true,
},
{
name: 'last_read_agent_message_id',
type: 'uuid',
isNullable: true,
},
{
name: 'created_at',
type: 'timestamptz',
default: 'CURRENT_TIMESTAMP',
isNullable: false,
},
{
name: 'updated_at',
type: 'timestamptz',
default: 'CURRENT_TIMESTAMP',
isNullable: false,
},
],
}),
true,
);

await queryRunner.createUniqueConstraint(
'user_environment_read_state',
new TableUnique({
name: 'uq_user_environment_read_state_user_client_agent',
columnNames: ['user_id', 'client_id', 'agent_id'],
}),
);

await queryRunner.createIndex(
'user_environment_read_state',
new TableIndex({
name: 'IDX_user_environment_read_state_user_id',
columnNames: ['user_id'],
}),
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropIndex('user_environment_read_state', 'IDX_user_environment_read_state_user_id');
await queryRunner.dropUniqueConstraint(
'user_environment_read_state',
'uq_user_environment_read_state_user_client_agent',
);
await queryRunner.dropTable('user_environment_read_state');
}
}
2 changes: 2 additions & 0 deletions apps/backend-agent-controller/src/typeorm.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
TicketBodyGenerationSessionEntity,
TicketCommentEntity,
TicketEntity,
UserEnvironmentReadStateEntity,
} from '@forepath/framework/backend';
import { CorrelationAwareTypeOrmLogger } from '@forepath/framework/backend/util-http-context';
import { ClientAgentCredentialEntity, ClientEntity, ClientUserEntity, UserEntity } from '@forepath/identity/backend';
Expand Down Expand Up @@ -95,6 +96,7 @@ export const typeormConfig: DataSourceOptions = {
AgentConsoleRegexFilterRuleEntity,
AgentConsoleRegexFilterRuleClientEntity,
AgentConsoleRegexFilterRuleSyncTargetEntity,
UserEnvironmentReadStateEntity,
],
migrations: [
'src/migrations/*.js',
Expand Down
Binary file not shown.
16 changes: 15 additions & 1 deletion docs/agenstra/features/websocket-communication.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ Agenstra uses WebSocket (Socket.IO) for real-time bidirectional communication. T

- **Frontend ↔ Controller (`clients` namespace)**: Workspace selection (`setClient`), `forward` to remote agent-managers, and controller-originated ticket hints for chat
- **Frontend ↔ Controller (`tickets` namespace)**: Ticket board and automation realtime for subscribers
- **Frontend ↔ Controller (`status` namespace)**: Per-user workspace/environment notification state (git dirty, unread chat) without `setClient`
- **Controller ↔ Manager**: Event forwarding to remote agent-managers
- **Manager ↔ Agent Containers**: Real-time chat and container communication

On the controller, **`clients`** and **`tickets`** share the same TCP port (`WEBSOCKET_PORT`); namespaces are selected in the Socket.IO client path.
On the controller, **`clients`**, **`tickets`**, **`pages`** (knowledge), and **`status`** share the same TCP port (`WEBSOCKET_PORT`); namespaces are selected in the Socket.IO client path.

## Authentication

Expand All @@ -23,6 +24,19 @@ WebSocket connections to the controller require authentication. Pass the `Author

Unauthenticated connections are rejected with `connect_error` "Unauthorized". The `setClient` operation enforces per-client authorization: only users with access to the requested client (global admin, client creator, or client_users entry) can set that client context. Unauthorized attempts emit an `error` event with message "You do not have access to this client".

### Agent console status (`status` namespace)

The agent console opens a dedicated Socket.IO connection to **`status`** (derived from `controller.websocketUrl` by replacing `/clients` with `/status`, or via `controller.statusWebsocketUrl`). Handshake auth matches other controller namespaces.

- **No `setClient`**: the stream is scoped to the authenticated user only.
- **On connect**: server emits **`statusSnapshot`** with all accessible workspaces/environments (git dirty + unread flags).
- **While connected**: server emits **`statusPatch`** for deltas; background polling (`STATUS_POLL_INTERVAL_MS`, default 30s) refreshes git state and catches unread when no `clients` socket is active. Successful VCS mutations proxied through the controller (stage, commit, fetch, pull, push including force, branch operations, conflict resolve, prepare-clean workspace) also emit **`statusPatch`** immediately to every user with access to that workspace.
- **Agent workspace changes**: agent-manager broadcasts **`gitStateChanged`** on the agents namespace after file writes, file-update notifications, workspace-affecting agent tool results, and local VCS/file mutations. The controller **`clients`** gateway listens for **`gitStateChanged`** and **`fileUpdateNotification`**, then pushes **`statusPatch`** on the **`status`** namespace to users with workspace access (same security model as VCS proxy hooks).
- **Client → server**: `markEnvironmentRead` `{ clientId, agentId }`, `setActiveEnvironment` `{ clientId, agentId | null }`.
- **Unread** includes agent chat replies and live ticket automation chat card updates; read cursors persist in `user_environment_read_state` on the controller database.

See `libs/domains/framework/backend/feature-agent-controller/spec/asyncapi.yaml` and `libs/domains/framework/frontend/data-access-agent-console/docs/notifications-state.mmd`.

### Billing manager (dashboard status)

The billing console can open a second Socket.IO connection to the **billing-manager** status gateway (default namespace `/billing`, separate TCP port from REST). Handshake auth matches HTTP (`Bearer` JWT for users or Keycloak). **Static API key** auth does not receive a user-scoped billing stream; `subscribeDashboardStatus` is rejected with an `error` event, consistent with REST returning "User not authenticated" for API-key-only requests.
Expand Down
12 changes: 12 additions & 0 deletions libs/domains/framework/backend/feature-agent-controller/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,18 @@ See the [OpenAPI specification](./spec/openapi.yaml) for detailed request/respon

## WebSocket Gateway

### Status notifications (`StatusGateway`)

- **Namespace**: `/status` (`STATUS_WEBSOCKET_NAMESPACE`, default `status`)
- **Port**: same as other controller namespaces (`WEBSOCKET_PORT`, default `8081`)
- **Auth**: handshake `Authorization` (no `setClient`)
- **On connect**: `statusSnapshot` (git dirty + unread per environment for all accessible workspaces)
- **Updates**: `statusPatch`; background poll via `STATUS_POLL_INTERVAL_MS` (default `30000`)
- **Client commands**: `markEnvironmentRead`, `setActiveEnvironment`
- **Diagram**: [agent-console-status-realtime.mmd](./docs/agent-console-status-realtime.mmd)

### Clients proxy (`ClientsGateway`)

The `ClientsGateway` provides WebSocket-based real-time event forwarding to remote agent-manager WebSocket endpoints:

- **Namespace**: `/clients`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
sequenceDiagram
participant FE as Frontend_notifications
participant SG as StatusGateway
participant Svc as AgentConsoleStatusService
participant DB as controller_DB
participant Mgr as agent_manager_HTTP

FE->>SG: connect (auth)
SG->>Svc: buildSnapshot(user)
Svc->>DB: read user_environment_read_state
Svc->>Mgr: latest agent message + vcs status
Svc-->>FE: statusSnapshot

Note over SG,Svc: chatMessage / automation upsert hook
Svc-->>FE: statusPatch (unicast)
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,20 @@ channels:
pagesKnowledgePageActivityCreatedEvent:
$ref: '#/components/messages/KnowledgePageActivityCreatedPayload'
description: Incremental page activity row for currently open page details.
status/statusSnapshot:
address: status/statusSnapshot
description: |
Full per-user notification state on connect (git dirty + unread per environment, client rollups, spacesHasAttention).
Unicast to the connecting socket only. Namespace **status** (STATUS_WEBSOCKET_NAMESPACE, default "status").
status/statusPatch:
address: status/statusPatch
description: Incremental notification deltas after connect (unicast).
status/markEnvironmentRead:
address: status/markEnvironmentRead
description: Client marks an environment chat as read; persists user_environment_read_state.
status/setActiveEnvironment:
address: status/setActiveEnvironment
description: Client reports the environment currently in view (suppresses unread for active user).
operations:
clientSendsSetClient:
action: send
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export * from './lib/entities/ticket-activity.entity';
export * from './lib/entities/ticket-body-generation-session.entity';
export * from './lib/entities/ticket-comment.entity';
export * from './lib/entities/ticket.entity';
export * from './lib/entities/user-environment-read-state.entity';
export * from './lib/entities/ticket.enums';
export * from './lib/entities/statistics-agent.entity';
export * from './lib/entities/statistics-chat-filter-drop.entity';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
export interface EnvironmentStatusPayload {
clientId: string;
agentId: string;
hasUnreadMessages: boolean;
gitDirty: boolean;
gitConflict: boolean;
}

export interface ClientStatusPayload {
clientId: string;
hasUnreadMessages: boolean;
gitDirty: boolean;
}

export interface StatusSnapshotPayload {
generatedAt: string;
environments: EnvironmentStatusPayload[];
clients: ClientStatusPayload[];
spacesHasAttention: boolean;
}

export interface StatusPatchPayload {
generatedAt: string;
environments?: EnvironmentStatusPayload[];
clients?: ClientStatusPayload[];
spacesHasAttention?: boolean;
}

export interface MarkEnvironmentReadPayload {
clientId: string;
agentId: string;
}

export interface SetActiveEnvironmentPayload {
clientId: string | null;
agentId: string | null;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { UserEnvironmentReadStateEntity } from './user-environment-read-state.entity';

describe('UserEnvironmentReadStateEntity', () => {
it('should create an instance', () => {
const entity = new UserEnvironmentReadStateEntity();

expect(entity).toBeDefined();
});

it('should have all required properties', () => {
const entity = new UserEnvironmentReadStateEntity();
const createdAt = new Date('2026-01-01T00:00:00.000Z');
const updatedAt = new Date('2026-01-02T00:00:00.000Z');
const lastReadAt = new Date('2026-01-01T12:00:00.000Z');

entity.id = 'read-state-uuid';
entity.userId = 'user-1';
entity.clientId = 'client-uuid';
entity.agentId = 'agent-uuid';
entity.lastReadAt = lastReadAt;
entity.lastReadAgentMessageId = 'message-uuid';
entity.createdAt = createdAt;
entity.updatedAt = updatedAt;

expect(entity.id).toBe('read-state-uuid');
expect(entity.userId).toBe('user-1');
expect(entity.clientId).toBe('client-uuid');
expect(entity.agentId).toBe('agent-uuid');
expect(entity.lastReadAt).toEqual(lastReadAt);
expect(entity.lastReadAgentMessageId).toBe('message-uuid');
expect(entity.createdAt).toBeInstanceOf(Date);
expect(entity.updatedAt).toBeInstanceOf(Date);
});

it('should allow nullable last-read fields', () => {
const entity = new UserEnvironmentReadStateEntity();

entity.lastReadAt = null;
entity.lastReadAgentMessageId = null;

expect(entity.lastReadAt).toBeNull();
expect(entity.lastReadAgentMessageId).toBeNull();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';

@Entity('user_environment_read_state')
@Index('IDX_user_environment_read_state_user_id', ['userId'])
@Index('uq_user_environment_read_state_user_client_agent', ['userId', 'clientId', 'agentId'], { unique: true })
export class UserEnvironmentReadStateEntity {
@PrimaryGeneratedColumn('uuid', { name: 'id' })
id!: string;

@Column({ type: 'varchar', length: 64, name: 'user_id' })
userId!: string;

@Column({ type: 'uuid', name: 'client_id' })
clientId!: string;

@Column({ type: 'uuid', name: 'agent_id' })
agentId!: string;

@Column({ type: 'timestamptz', name: 'last_read_at', nullable: true })
lastReadAt?: Date | null;

@Column({ type: 'uuid', name: 'last_read_agent_message_id', nullable: true })
lastReadAgentMessageId?: string | null;

@CreateDateColumn({ type: 'timestamptz', name: 'created_at' })
createdAt!: Date;

@UpdateDateColumn({ type: 'timestamptz', name: 'updated_at' })
updatedAt!: Date;
}
Loading
Loading