import type { Agent, InboundTransport, Logger, TransportSession, EncryptedMessage, AgentContext, } from '@aries-framework/core' import { AriesFrameworkError, TransportService, utils, MessageReceiver } from '@aries-framework/core' import WebSocket, { Server } from 'ws' // Workaround for types (https://github.com/DefinitelyTyped/DefinitelyTyped/issues/20780) interface ExtWebSocket extends WebSocket { isAlive: boolean } export class WsInboundTransport implements InboundTransport { private socketServer: Server private logger!: Logger // We're using a `socketId` just for the prevention of calling the connection handler twice. private socketIds: Record = {} public constructor({ server, port }: { server: Server; port?: undefined } | { server?: undefined; port: number }) { this.socketServer = server ?? new Server({ port }) } public async start(agent: Agent) { const transportService = agent.dependencyManager.resolve(TransportService) this.logger = agent.config.logger const wsEndpoint = agent.config.endpoints.find((e) => e.startsWith('ws')) this.logger.debug(`Starting WS inbound transport`, { endpoint: wsEndpoint, }) this.socketServer.on('connection', (socket: WebSocket) => { const socketId = utils.uuid() this.logger.debug(`Socket connected. Id: ${socketId}`) ;(socket as ExtWebSocket).isAlive = true if (!this.socketIds[socketId]) { this.logger.debug(`Saving new socket with id ${socketId}.`) this.socketIds[socketId] = socket const session = new WebSocketTransportSession(socketId, socket, this.logger) this.listenOnWebSocketMessages(agent, socket, session) socket.on('close', () => { this.logger.debug(`Socket closed for session ${session.id}. Connection Id: ${session.connectionId}`) transportService.removeSession(session) }) } else { this.logger.debug(`Socket with id ${socketId} already exists.`) } }) this.startHeartBeatPing() } public async stop() { this.logger.debug('Closing WebSocket Server') return new Promise((resolve, reject) => { this.socketServer.close((error) => { if (error) { reject(error) } resolve() }) }) } private startHeartBeatPing(interval?: number) { interval = interval ?? 3000 setInterval(() => { this.socketServer.clients.forEach((ws: WebSocket) => { if (!(ws as ExtWebSocket).isAlive) { this.logger.debug(`Client session closed by timeout.`) return ws.close() } ;(ws as ExtWebSocket).isAlive = false ws.ping(null, undefined) }) }, interval) } private listenOnWebSocketMessages(agent: Agent, socket: WebSocket, session: WebSocketTransportSession) { const messageReceiver = agent.dependencyManager.resolve(MessageReceiver) socket.addEventListener('pong', () => { ;(socket as ExtWebSocket).isAlive = true }) // eslint-disable-next-line @typescript-eslint/no-explicit-any socket.addEventListener('message', async (event: any) => { this.logger.debug('WebSocket message event received.', { url: event.target.url }) try { await messageReceiver.receiveMessage(JSON.parse(event.data), { session }) } catch (error) { this.logger.error('Error processing message') } }) } } export class WebSocketTransportSession implements TransportSession { public id: string public readonly type = 'WebSocket' public socket: WebSocket public logger: Logger public connectionId?: string public constructor(id: string, socket: WebSocket, logger: Logger) { this.id = id this.socket = socket this.logger = logger } public async send(agentContext: AgentContext, encryptedMessage: EncryptedMessage): Promise { if (this.socket.readyState !== WebSocket.OPEN) { throw new AriesFrameworkError(`${this.type} transport session has been closed.`) } this.socket.send(JSON.stringify(encryptedMessage)) } public async close(): Promise { this.logger.debug( `WebSocketTransport Session close requested. Session Id: ${this.id}. Connection Id: ${this.connectionId}` ) // Do not actually close socket. Leave heartbeat to do its job } }