From 1357d543f01de1fb9f7d6d9ffa795ab39c26d309 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emirhan=20Durmu=C5=9F?= Date: Thu, 20 Nov 2025 22:48:27 +0300 Subject: [PATCH 1/2] multi replica controller handle user/agent message forwarding on websocket sessions via amqp queues over default-router --- package-lock.json | 44 +- package.json | 7 +- src/services/router-connection-service.js | 311 ++++++++++++ src/services/websocket-queue-service.js | 391 ++++++++++++++++ src/websocket/server.js | 547 ++++++++++++++++++---- 5 files changed, 1210 insertions(+), 90 deletions(-) create mode 100644 src/services/router-connection-service.js create mode 100644 src/services/websocket-queue-service.js diff --git a/package-lock.json b/package-lock.json index 7116378e..e6d7aaac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@datasance/iofogcontroller", - "version": "3.5.8", + "version": "3.5.9", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@datasance/iofogcontroller", - "version": "3.5.8", + "version": "3.5.9", "hasInstallScript": true, "license": "EPL-2.0", "dependencies": { @@ -40,7 +40,7 @@ "https": "1.0.0", "is-elevated": "3.0.0", "jose": "^4.15.9", - "js-yaml": "4.1.0", + "js-yaml": "4.1.1", "jsonschema": "1.4.1", "keycloak-connect": "^26.1.1", "minimatch": "10.0.1", @@ -60,6 +60,7 @@ "portscanner": "2.2.0", "qs": "6.12.1", "retry-as-promised": "7.0.4", + "rhea": "^3.0.4", "sequelize": "6.37.7", "sqlite3": "^5.1.7", "string-format": "2.0.0", @@ -80,7 +81,7 @@ "chai-http": "4.4.0", "eslint": "9.28.0", "eslint-config-google": "0.14.0", - "js-yaml": "^4.1.0", + "js-yaml": "^4.1.1", "mocha": "10.6.0", "mocha-junit-reporter": "2.2.1", "newman": "^6.2.1", @@ -8204,9 +8205,9 @@ "dev": true }, "node_modules/js-yaml": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.0.tgz", - "integrity": "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==", + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-4.1.1.tgz", + "integrity": "sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA==", "dependencies": { "argparse": "^2.0.1" }, @@ -11857,6 +11858,35 @@ "resolved": "https://registry.npmjs.org/rfc4648/-/rfc4648-1.5.4.tgz", "integrity": "sha512-rRg/6Lb+IGfJqO05HZkN50UtY7K/JhxJag1kP23+zyMfrvoB0B7RWv06MbOzoc79RgCdNTiUaNsTT1AJZ7Z+cg==" }, + "node_modules/rhea": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/rhea/-/rhea-3.0.4.tgz", + "integrity": "sha512-n3kw8syCdrsfJ72w3rohpoHHlmv/RZZEP9VY5BVjjo0sEGIt4YSKypBgaiA+OUSgJAzLjOECYecsclG5xbYtZw==", + "dependencies": { + "debug": "^4.3.3" + } + }, + "node_modules/rhea/node_modules/debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/rhea/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" + }, "node_modules/rimraf": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-3.0.2.tgz", diff --git a/package.json b/package.json index 50e83052..9057a930 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@datasance/iofogcontroller", - "version": "3.5.8", + "version": "3.5.9", "description": "ioFog Controller project for Datasance PoT @ datasance.com \\nCopyright (c) 2023 Datasance Teknoloji A.S.", "main": "./src/main.js", "author": "Emirhan Durmus", @@ -85,7 +85,7 @@ "https": "1.0.0", "is-elevated": "3.0.0", "jose": "^4.15.9", - "js-yaml": "4.1.0", + "js-yaml": "4.1.1", "jsonschema": "1.4.1", "keycloak-connect": "^26.1.1", "minimatch": "10.0.1", @@ -104,6 +104,7 @@ "pino-std-serializers": "7.0.0", "portscanner": "2.2.0", "qs": "6.12.1", + "rhea": "^3.0.4", "retry-as-promised": "7.0.4", "sequelize": "6.37.7", "sqlite3": "^5.1.7", @@ -132,7 +133,7 @@ "sinon-chai": "3.7.0", "snyk": "^1.1291.0", "standard": "12.0.1", - "js-yaml": "^4.1.0" + "js-yaml": "^4.1.1" }, "files": [ "/scripts", diff --git a/src/services/router-connection-service.js b/src/services/router-connection-service.js new file mode 100644 index 00000000..724e9fc3 --- /dev/null +++ b/src/services/router-connection-service.js @@ -0,0 +1,311 @@ +const rhea = require('rhea') +const config = require('../config') +const logger = require('../logger') +const RouterManager = require('../data/managers/router-manager') +const CertificateService = require('./certificate-service') +const SecretService = require('./secret-service') +const os = require('os') + +const CONTROLLER_CERT_NAME = 'controller-exec-session-client' +const DEFAULT_ROUTER_SERVICE = 'router' +const AMQP_DEFAULT_PORT = 5671 + +class RouterConnectionService { + constructor () { + this.connection = null + this.connectionPromise = null + this.certificatePromise = null + this.cachedCertificate = null + this.connectionOptions = null + this.cachedRouterRecord = null + this.fakeTransaction = { fakeTransaction: true } + this.container = rhea.create_container({ + id: 'controller-exec-session-client', + enable_sasl_external: true + }) + } + + async getConnection () { + if (this.connection && this.connection.is_open && this.connection.is_open()) { + return this.connection + } + if (this.connectionPromise) { + return this.connectionPromise + } + this.connectionPromise = this._createConnection() + return this.connectionPromise + } + + async _createConnection () { + try { + const options = await this._buildConnectionOptions() + return await new Promise((resolve, reject) => { + const connection = this.container.connect(options) + + const cleanupPromise = () => { + this.connection = null + this.connectionPromise = null + } + + connection.once('connection_open', () => { + logger.info('[AMQP] Router connection established') + this.connection = connection + this.connectionPromise = null + connection.on('connection_error', (context) => { + logger.error({ + err: context.error, + transport: 'amqp', + msg: '[AMQP] Connection error event' + }) + }) + connection.on('connection_close', () => { + logger.warn('[AMQP] Router connection closed') + cleanupPromise() + }) + connection.on('disconnected', (context) => { + logger.warn('[AMQP] Router connection disconnected', { + error: context.error ? context.error.message : 'unknown' + }) + cleanupPromise() + }) + resolve(connection) + }) + + connection.once('connection_close', (context) => { + logger.error({ + err: context.error, + transport: 'amqp', + msg: '[AMQP] Unable to open router connection (closed before open)' + }) + cleanupPromise() + reject(new Error('Router connection closed before opening')) + }) + + connection.once('disconnected', (context) => { + logger.error({ + err: context.error, + transport: 'amqp', + msg: '[AMQP] Unable to connect to router' + }) + cleanupPromise() + reject(context.error || new Error('Router disconnected during connect')) + }) + }) + } catch (error) { + this.connectionPromise = null + logger.error('[AMQP] Failed to create router connection', { + error: error.message, + stack: error.stack + }) + throw error + } + } + + async _buildConnectionOptions () { + if (this.connectionOptions && this.cachedCertificate) { + return { + ...this.connectionOptions, + cert: this.cachedCertificate.cert, + key: this.cachedCertificate.key, + ca: [this.cachedCertificate.ca] + } + } + + logger.debug({ msg: '[AMQP] Preparing router connection options' }) + + const { host, port } = await this._resolveRouterEndpoint() + logger.debug({ msg: '[AMQP] Router endpoint resolved', host, port }) + const certBundle = await this._ensureControllerCertificate() + + this.connectionOptions = { + transport: 'tls', + host, + hostname: host, + port, + rejectUnauthorized: true, + idle_time_out: 300000, + reconnect: true, + reconnect_limit: 100, + username: '', + password: '', + container_id: 'controller-exec-session-client' + } + this.cachedCertificate = certBundle + + logger.debug({ msg: '[AMQP] Router connection options built', host, port }) + + return { + ...this.connectionOptions, + cert: certBundle.cert, + key: certBundle.key, + ca: [certBundle.ca] + } + } + + async _resolveRouterEndpoint () { + logger.debug({ msg: '[AMQP] Resolving default router endpoint' }) + try { + const router = await this._getDefaultRouterRecord() + const port = router.messagingPort || AMQP_DEFAULT_PORT + let host = router.host && router.host.trim().length > 0 ? router.host.trim() : '' + + if (this._isKubernetes()) { + const namespace = process.env.CONTROLLER_NAMESPACE + if (namespace && namespace.trim().length > 0) { + host = `${DEFAULT_ROUTER_SERVICE}.${namespace}.svc.cluster.local` + } else if (!host) { + host = DEFAULT_ROUTER_SERVICE + } + } else { + if (!host) { + host = 'localhost' + } + } + logger.debug({ + msg: '[AMQP] Default router resolved', + routerHost: router.host, + computedHost: host, + port, + routerUuid: router.iofogUuid + }) + return { + host, + port, + routerUuid: router.iofogUuid + } + } catch (error) { + logger.error({ err: error, msg: '[AMQP] Failed while resolving router endpoint' }) + throw error + } + } + + async _getDefaultRouterRecord () { + if (this.cachedRouterRecord) { + return this.cachedRouterRecord + } + const router = await RouterManager.findOne({ isDefault: true }, this.fakeTransaction) + if (!router) { + throw new Error('Default router not found. Please ensure default router is provisioned.') + } + this.cachedRouterRecord = router + return router + } + + _isKubernetes () { + const controlPlane = process.env.CONTROL_PLANE || config.get('app.ControlPlane') + return controlPlane && controlPlane.toLowerCase() === 'kubernetes' + } + + async _ensureControllerCertificate () { + if (this.cachedCertificate) { + return this.cachedCertificate + } + if (this.certificatePromise) { + return this.certificatePromise + } + this.certificatePromise = (async () => { + try { + const bundle = await this._createControllerCertificate() + this.cachedCertificate = bundle + return bundle + } finally { + this.certificatePromise = null + } + })() + return this.certificatePromise + } + + async _createControllerCertificate () { + logger.debug('[AMQP] Ensuring controller certificate secret exists', { name: CONTROLLER_CERT_NAME }) + const existingSecret = await this._safeGetSecret(CONTROLLER_CERT_NAME) + const caName = await this._resolveCaName() + if (existingSecret) { + const caSecret = await this._safeGetSecret(caName) + const bundle = this._decodeCertificate(existingSecret, caSecret) + logger.debug({ msg: '[AMQP] Using existing controller-exec-session-client certificate', ca: caName }) + return bundle + } + + const hosts = this._buildControllerHosts() + logger.debug({ msg: '[AMQP] Generating controller-exec-session-client certificate', hosts, ca: caName }) + + try { + await CertificateService.createCertificateEndpoint({ + name: CONTROLLER_CERT_NAME, + subject: CONTROLLER_CERT_NAME, + hosts: hosts.join(','), + ca: { + type: 'direct', + secretName: caName + }, + expiration: 36 // months + }) + } catch (error) { + logger.error({ err: error, ca: caName, msg: '[AMQP] Failed to create controller certificate' }) + throw error + } + + const certSecret = await this._safeGetSecret(CONTROLLER_CERT_NAME) + const caSecret = await this._safeGetSecret(caName) + if (!certSecret || !caSecret) { + throw new Error('Controller certificate creation succeeded but secret not found') + } + logger.debug({ msg: '[AMQP] controller-exec-session-client certificate generated successfully', ca: caName }) + return this._decodeCertificate(certSecret, caSecret) + } + + async _resolveCaName () { + if (this._isKubernetes()) { + return 'default-router-local-ca' + } + const router = await this._getDefaultRouterRecord() + return `${router.iofogUuid}-local-ca` + } + + _buildControllerHosts () { + const hosts = new Set(['localhost', '127.0.0.1']) + const hostname = process.env.HOSTNAME || os.hostname() + if (hostname) hosts.add(hostname) + if (process.env.CONTROLLER_HOST) hosts.add(process.env.CONTROLLER_HOST) + return Array.from(hosts) + } + + _decodeCertificate (certSecret, caSecret) { + if (!certSecret || !certSecret.data) { + throw new Error(`Secret ${CONTROLLER_CERT_NAME} is empty or missing.`) + } + if (!caSecret || !caSecret.data) { + throw new Error('CA secret not found for router connection.') + } + const decode = (value, label) => { + if (!value) { + throw new Error(`Missing ${label} in certificate secret`) + } + return Buffer.from(value, 'base64') + } + return { + cert: decode(certSecret.data['tls.crt'], 'tls.crt'), + key: decode(certSecret.data['tls.key'], 'tls.key'), + ca: decode(caSecret.data['tls.crt'], 'ca.crt') + } + } + + async _safeGetSecret (name) { + try { + return await SecretService.getSecretEndpoint(name) + } catch (error) { + if (error.name === 'NotFoundError') { + logger.debug('[AMQP] Secret not found', { secret: name }) + return null + } + logger.error('[AMQP] Unexpected error while fetching secret', { + secret: name, + error: error.message, + stack: error.stack + }) + throw error + } + } +} + +module.exports = new RouterConnectionService() diff --git a/src/services/websocket-queue-service.js b/src/services/websocket-queue-service.js new file mode 100644 index 00000000..8c962a14 --- /dev/null +++ b/src/services/websocket-queue-service.js @@ -0,0 +1,391 @@ +const WebSocket = require('ws') +const logger = require('../logger') +const RouterConnectionService = require('./router-connection-service') + +const MESSAGE_TYPES = { + STDIN: 0, + STDOUT: 1, + STDERR: 2, + CONTROL: 3, + CLOSE: 4, + ACTIVATION: 5 +} + +const MESSAGE_QUEUE_PREFIX = { + agent: 'agent', + user: 'user' +} + +function buildQueueName (prefix, execId) { + return `${prefix}-${execId}` +} + +function getBufferFromBody (body) { + if (!body) return Buffer.alloc(0) + if (Buffer.isBuffer(body)) return body + if (body.type === 'Buffer' && Array.isArray(body.data)) { + return Buffer.from(body.data) + } + if (typeof body === 'string') { + return Buffer.from(body, 'utf8') + } + return Buffer.from(body) +} + +class WebSocketQueueService { + constructor () { + this.execBridges = new Map() + } + + async enableForSession (session, cleanupCallback) { + const execId = session.execId + if (!execId) { + logger.warn('[AMQP][QUEUE] Missing execId for session, skipping queue bridge enablement') + return false + } + + const bridge = this.execBridges.get(execId) || { + execId, + senders: {}, + receivers: {}, + cleanupCallback: null + } + + // Store cleanup callback for CLOSE message handling + if (cleanupCallback) { + bridge.cleanupCallback = cleanupCallback + } + + if (session.user) { + await this._ensureReceiver(bridge, 'user', session.user, session) + } + if (session.agent) { + await this._ensureReceiver(bridge, 'agent', session.agent, session) + } + this.execBridges.set(execId, bridge) + return true + } + + shouldUseQueue (execId) { + return this.execBridges.has(execId) + } + + async publishToAgent (execId, buffer, options = {}) { + await this._send(execId, 'agent', buffer, options) + } + + async publishToUser (execId, buffer, options = {}) { + await this._send(execId, 'user', buffer, options) + } + + async cleanup (execId) { + const bridge = this.execBridges.get(execId) + if (!bridge) return + + const closeLink = (link) => { + if (!link) return + try { + if (link.receiver) { + link.receiver.close() + } else if (link.sender) { + link.sender.close() + } + } catch (error) { + logger.debug('[AMQP][QUEUE] Failed to close link during cleanup', { execId, error: error.message }) + } + } + + closeLink(bridge.receivers.agent) + closeLink(bridge.receivers.user) + closeLink(bridge.senders.agent) + closeLink(bridge.senders.user) + this.execBridges.delete(execId) + } + + detachSocket (execId, side) { + const bridge = this.execBridges.get(execId) + if (!bridge || !bridge.receivers[side]) return + bridge.receivers[side].socket = null + } + + async _send (execId, side, buffer, options = {}) { + const bridge = await this._ensureSender(execId, side) + if (!bridge) { + throw new Error('Queue bridge missing for execId=' + execId) + } + try { + const message = { + body: buffer, + content_type: 'application/octet-stream' + } + + const applicationProperties = { ...((options && options.applicationProperties) || {}) } + const hasMessageType = options && Object.prototype.hasOwnProperty.call(options, 'messageType') + const messageType = hasMessageType ? options.messageType : null + if (messageType !== null) { + applicationProperties.messageType = messageType + } + if (Object.keys(applicationProperties).length > 0) { + message.application_properties = applicationProperties + } + + bridge.sender.send(message) + logger.debug('[AMQP][QUEUE] Published message to queue', { + execId, + side, + messageSize: buffer.length, + messageType: messageType !== null ? messageType : 'normal' + }) + } catch (error) { + logger.error('[AMQP][QUEUE] Failed to publish message', { execId, side, error: error.message }) + throw error + } + } + + async _ensureSender (execId, side) { + const bridge = this.execBridges.get(execId) + if (!bridge) return null + if (bridge.senders[side]) { + return bridge.senders[side] + } + + const queueName = buildQueueName( + side === 'agent' ? MESSAGE_QUEUE_PREFIX.agent : MESSAGE_QUEUE_PREFIX.user, + execId + ) + const connection = await RouterConnectionService.getConnection() + const sender = await new Promise((resolve, reject) => { + const link = connection.open_sender({ + target: { + address: queueName, + durable: 0, + expiry_policy: 'link-detach' + }, + autosettle: true + }) + + link.once('sender_open', () => resolve(link)) + link.once('sender_close', (context) => reject(context.error || new Error('Sender closed before open'))) + link.once('error', reject) + }) + + sender.on('sender_close', () => { + bridge.senders[side] = null + }) + + bridge.senders[side] = { sender } + return bridge.senders[side] + } + + async _ensureReceiver (bridge, side, socket, session) { + if (!socket) return + if (bridge.receivers[side]) { + // Update socket reference if receiver already exists + bridge.receivers[side].socket = socket + logger.debug('[AMQP][QUEUE] Updated socket reference for existing receiver', { + execId: session.execId, + side, + socketState: socket.readyState + }) + return + } + + const queueName = buildQueueName( + side === 'agent' ? MESSAGE_QUEUE_PREFIX.agent : MESSAGE_QUEUE_PREFIX.user, + session.execId + ) + logger.info('[AMQP][QUEUE] Setting up receiver for queue', { + execId: session.execId, + side, + queueName + }) + const connection = await RouterConnectionService.getConnection() + + const receiver = await new Promise((resolve, reject) => { + const link = connection.open_receiver({ + source: { + address: queueName, + durable: 0, + expiry_policy: 'link-detach' + }, + credit_window: 50 + }) + link.once('receiver_open', () => { + logger.info('[AMQP][QUEUE] Receiver opened successfully', { + execId: session.execId, + side, + queueName + }) + resolve(link) + }) + link.once('receiver_close', (context) => reject(context.error || new Error('Receiver closed before open'))) + link.once('error', reject) + }) + + receiver.on('message', async (context) => { + try { + // Always get the latest socket reference from the bridge + const currentBridge = this.execBridges.get(session.execId) + const ws = currentBridge && currentBridge.receivers[side] ? currentBridge.receivers[side].socket : null + const body = getBufferFromBody(context.message.body) + const msgType = context.message.application_properties + ? context.message.application_properties.messageType + : null + + // Handle CLOSE messages (works for both user and agent sides) + if (msgType === MESSAGE_TYPES.CLOSE) { + await this._handleCloseMessage({ + bridge: currentBridge, + session, + side, + ws, + context, + body + }) + return + } + + // Forward message to socket (normal message or non-CLOSE message) + if (ws && ws.readyState === WebSocket.OPEN) { + try { + ws.send(body, { + binary: true, + compress: false, + mask: false, + fin: true + }) + context.delivery.accept() + logger.debug('[AMQP][QUEUE] Delivered message to socket', { + execId: session.execId, + side, + messageSize: body.length + }) + } catch (error) { + logger.error('[AMQP][QUEUE] Failed to deliver message to socket', { + execId: session.execId, + side, + error: error.message + }) + context.delivery.release() + } + } else { + logger.debug('[AMQP][QUEUE] No socket available for message delivery', { + execId: session.execId, + side, + hasSocket: !!ws, + socketState: ws ? ws.readyState : 'N/A', + hasBridge: !!currentBridge, + hasReceiver: currentBridge && !!currentBridge.receivers[side] + }) + context.delivery.release() + } + } catch (error) { + logger.error('[AMQP][QUEUE] Error handling queued message', { + execId: session.execId, + side, + error: error.message + }) + try { + context.delivery.release() + } catch (releaseError) { + logger.warn('[AMQP][QUEUE] Failed to release delivery after error', { + execId: session.execId, + error: releaseError.message + }) + } + } + }) + + receiver.on('receiver_close', () => { + logger.info('[AMQP][QUEUE] Receiver closed', { + execId: session.execId, + side + }) + bridge.receivers[side] = null + }) + + bridge.receivers[side] = { receiver, socket } + logger.info('[AMQP][QUEUE] Receiver setup complete', { + execId: session.execId, + side, + queueName, + socketState: socket.readyState + }) + } + + async _handleCloseMessage ({ bridge, session, side, ws, context, body }) { + const execId = session.execId + const closeInitiator = side === 'user' ? 'agent' : 'user' + const closeAck = Boolean( + context.message.application_properties && + context.message.application_properties.closeAck + ) + logger.info('[AMQP][QUEUE] Received CLOSE message via queue', { + execId, + side, + closeInitiator, + closeAck + }) + + // Attempt to close the socket gracefully (if present) + if (ws && ws.readyState === WebSocket.OPEN) { + try { + const reason = closeInitiator === 'agent' ? 'Agent closed connection' : 'User closed connection' + ws.close(1000, reason) + logger.debug('[AMQP][QUEUE] Closed WebSocket with code 1000 after CLOSE message', { + execId, + side + }) + } catch (error) { + logger.warn('[AMQP][QUEUE] Failed to close WebSocket after CLOSE message', { + execId, + side, + error: error.message + }) + } + } else { + logger.debug('[AMQP][QUEUE] No active socket while handling CLOSE message', { + execId, + side, + hasSocket: !!ws, + socketState: ws ? ws.readyState : 'N/A' + }) + } + + context.delivery.accept() + + if (!closeAck && this.execBridges.has(execId)) { + const ackSide = side === 'user' ? 'agent' : 'user' + try { + await this._send(execId, ackSide, body, { + messageType: MESSAGE_TYPES.CLOSE, + applicationProperties: { closeAck: true } + }) + logger.debug('[AMQP][QUEUE] Sent CLOSE acknowledgement', { + execId, + ackSide + }) + } catch (error) { + logger.warn('[AMQP][QUEUE] Failed to send CLOSE acknowledgement', { + execId, + ackSide, + error: error.message + }) + } + } + + // Invoke cleanup callback to remove session/queue resources + if (bridge && bridge.cleanupCallback) { + try { + await bridge.cleanupCallback(execId) + } catch (error) { + logger.error('[AMQP][QUEUE] Error in cleanup callback during CLOSE handling', { + execId, + error: error.message + }) + } + } + } +} + +module.exports = new WebSocketQueueService() diff --git a/src/websocket/server.js b/src/websocket/server.js index a44c8be4..fc694bb6 100644 --- a/src/websocket/server.js +++ b/src/websocket/server.js @@ -13,6 +13,7 @@ const keycloak = require('../config/keycloak.js').initKeycloak() const AuthDecorator = require('../decorators/authorization-decorator') const TransactionDecorator = require('../decorators/transaction-decorator') const msgpack = require('@msgpack/msgpack') +const WebSocketQueueService = require('../services/websocket-queue-service') const MESSAGE_TYPES = { STDIN: 0, @@ -31,13 +32,32 @@ class WebSocketServer { this.connectionLimits = new Map() this.rateLimits = new Map() this.sessionManager = new SessionManager(config.get('server.webSocket')) + this.queueService = WebSocketQueueService + this.pendingCloseTimeouts = new Map() // Track pending CLOSE messages in cross-replica scenarios this.config = { pingInterval: process.env.WS_PING_INTERVAL || config.get('server.webSocket.pingInterval'), pongTimeout: process.env.WS_PONG_TIMEOUT || config.get('server.webSocket.pongTimeout'), maxPayload: process.env.WS_MAX_PAYLOAD || config.get('server.webSocket.maxPayload'), sessionTimeout: process.env.WS_SESSION_TIMEOUT || config.get('server.webSocket.session.timeout'), cleanupInterval: process.env.WS_CLEANUP_INTERVAL || config.get('server.webSocket.session.cleanupInterval'), - sessionMaxConnections: process.env.WS_SESSION_MAX_CONNECTIONS || config.get('server.webSocket.session.maxConnections') + sessionMaxConnections: process.env.WS_SESSION_MAX_CONNECTIONS || config.get('server.webSocket.session.maxConnections'), + closeResponseTimeout: process.env.WS_CLOSE_RESPONSE_TIMEOUT || 5000 // 5 seconds timeout for agent CLOSE response + } + + this.ensureSocketPongHandler = (ws) => { + if (!ws || ws._hasPingListener) { + return + } + ws._hasPingListener = true + ws.on('ping', () => { + if (ws.readyState === WebSocket.OPEN) { + try { + ws.pong() + } catch (error) { + logger.debug('[RELAY] Failed to respond to ping frame', { error: error.message }) + } + } + }) } } @@ -353,6 +373,7 @@ class WebSocketServer { async handleAgentConnection (ws, req, token, microserviceUuid, transaction) { try { + this.ensureSocketPongHandler(ws) logger.debug('[WS-CONN] Processing agent connection:' + JSON.stringify({ url: req.url, microserviceUuid, @@ -416,18 +437,57 @@ class WebSocketServer { execId, microserviceUuid: msgMicroserviceUuid })) - this.setupMessageForwarding(execId, transaction) + await this.setupMessageForwarding(execId, transaction) } else { - await this.sessionManager.addPendingAgent(msgMicroserviceUuid, execId, ws, transaction) - await MicroserviceExecStatusManager.update( - { microserviceUuid: microserviceUuid }, - { execSessionId: execId, status: microserviceExecState.PENDING }, - transaction - ) - logger.info('[WS-SESSION] No pending user found for agent, added to pending list:' + JSON.stringify({ - execId, - microserviceUuid: msgMicroserviceUuid - })) + this.attachPendingKeepAliveHandler(ws) + try { + await MicroserviceExecStatusManager.update( + { microserviceUuid: microserviceUuid }, + { execSessionId: execId, status: microserviceExecState.PENDING }, + transaction + ) + logger.debug('[WS-SESSION] Updated microservice exec status to PENDING', { + execId, + microserviceUuid: microserviceUuid + }) + } catch (error) { + logger.error('[WS-SESSION] Failed to update microservice exec status to PENDING', { + execId, + microserviceUuid: microserviceUuid, + error: error.message, + stack: error.stack + }) + // Continue anyway - the in-memory state is correct + } + // Create session with agent only and enable queue bridge for cross-replica support + // This allows the agent to receive messages from users on other replicas via AMQP queues + const agentOnlySession = this.sessionManager.createSession(execId, msgMicroserviceUuid, ws, null, transaction) + try { + // Pass cleanup callback so queue service can notify us when CLOSE is received + await this.queueService.enableForSession(agentOnlySession, (execId) => { + // Clear timeout if it exists (agent responded to CLOSE) + const timeout = this.pendingCloseTimeouts.get(execId) + if (timeout) { + clearTimeout(timeout) + this.pendingCloseTimeouts.delete(execId) + logger.debug('[WS-SESSION] Cleared pending CLOSE timeout - agent responded', { execId }) + } + this.cleanupSession(execId, transaction) + }) + agentOnlySession.queueBridgeEnabled = true + logger.info('[WS-SESSION] No pending user found for agent, added to pending list and enabled queue bridge for cross-replica support:' + JSON.stringify({ + execId, + microserviceUuid: msgMicroserviceUuid + })) + await this.setupMessageForwarding(execId, transaction) + } catch (error) { + logger.warn('[WS-SESSION] Failed to enable queue bridge for pending agent, will use direct relay when user connects:', { + execId, + microserviceUuid: msgMicroserviceUuid, + error: error.message + }) + agentOnlySession.queueBridgeEnabled = false + } } } @@ -443,9 +503,34 @@ class WebSocketServer { })) // Handle connection close - ws.on('close', () => { + ws.on('close', async () => { for (const [execId, session] of this.sessionManager.sessions) { if (session.agent === ws) { + // In cross-replica scenarios, send CLOSE message to user via queue + // Note: session.user is null on agent's replica in cross-replica scenarios + const queueEnabled = this.queueService.shouldUseQueue(execId) + if (queueEnabled) { + try { + const closeMsg = { + type: MESSAGE_TYPES.CLOSE, + execId: execId, + microserviceUuid: session.microserviceUuid, + timestamp: Date.now(), + data: Buffer.from('Agent closed connection') + } + const encoded = this.encodeMessage(closeMsg) + await this.queueService.publishToUser(execId, encoded, { messageType: MESSAGE_TYPES.CLOSE }) + logger.info('[WS-CLOSE] Sent CLOSE message to user via queue after agent disconnect', { + execId, + microserviceUuid: session.microserviceUuid + }) + } catch (error) { + logger.error('[WS-CLOSE] Failed to send CLOSE message to user via queue', { + execId, + error: error.message + }) + } + } this.cleanupSession(execId, transaction) } } @@ -506,6 +591,7 @@ class WebSocketServer { async handleUserConnection (ws, req, token, microserviceUuid, transaction) { try { + this.ensureSocketPongHandler(ws) await this.validateUserConnection(token, microserviceUuid, transaction) logger.info('User connection validated successfully for microservice:' + microserviceUuid) @@ -535,7 +621,7 @@ class WebSocketServer { const pendingAgent = this.sessionManager.findPendingAgentForExecId(microserviceUuid, availableExecId) if (pendingAgent) { - // Activate session using agent's execId + // Activate session using agent's execId (agent is on same replica) const session = this.sessionManager.tryActivateSession(microserviceUuid, availableExecId, ws, false, transaction) if (session) { logger.info('Session activated for user:', { @@ -544,9 +630,29 @@ class WebSocketServer { userState: ws.readyState, agentState: pendingAgent.readyState }) - this.setupMessageForwarding(availableExecId, transaction) + await this.setupMessageForwarding(availableExecId, transaction) return } + } else { + // Agent is on a different replica - create session with just user and enable queue bridge + // The AMQP queues will handle message relay between replicas + logger.info('Found PENDING execId in DB but agent is on different replica, activating session with user only:', { + execId: availableExecId, + microserviceUuid + }) + this.sessionManager.createSession(availableExecId, microserviceUuid, null, ws, transaction) + await MicroserviceExecStatusManager.update( + { microserviceUuid: microserviceUuid }, + { execSessionId: availableExecId, status: microserviceExecState.ACTIVE }, + transaction + ) + await this.setupMessageForwarding(availableExecId, transaction) + logger.info('Cross-replica session activated with user only:', { + execId: availableExecId, + microserviceUuid, + userState: ws.readyState + }) + return } } @@ -558,6 +664,7 @@ class WebSocketServer { pendingAgentCount: pendingAgentExecIds.length })) this.sessionManager.addPendingUser(microserviceUuid, ws) + this.attachPendingKeepAliveHandler(ws) // IMMEDIATE RE-CHECK: Look for any newly available agents after adding user (database query) const retryPendingAgents = await this.getPendingAgentExecIdsFromDB(microserviceUuid, transaction) @@ -572,6 +679,8 @@ class WebSocketServer { const pendingAgent = this.sessionManager.findPendingAgentForExecId(microserviceUuid, availableExecId) if (pendingAgent) { + // Remove user from pending first since we're activating + this.sessionManager.removePendingUser(microserviceUuid, ws) const session = this.sessionManager.tryActivateSession(microserviceUuid, availableExecId, ws, false, transaction) if (session) { logger.info('Session activated immediately after re-check:' + JSON.stringify({ @@ -581,9 +690,30 @@ class WebSocketServer { agentState: pendingAgent.readyState })) - this.setupMessageForwarding(availableExecId, transaction) + await this.setupMessageForwarding(availableExecId, transaction) return // Exit early, session activated successfully } + } else { + // Agent is on different replica - activate with user only + logger.info('Found PENDING execId in retry but agent is on different replica, activating session with user only:', { + execId: availableExecId, + microserviceUuid + }) + // Remove user from pending first + this.sessionManager.removePendingUser(microserviceUuid, ws) + this.sessionManager.createSession(availableExecId, microserviceUuid, null, ws, transaction) + await MicroserviceExecStatusManager.update( + { microserviceUuid: microserviceUuid }, + { execSessionId: availableExecId, status: microserviceExecState.ACTIVE }, + transaction + ) + await this.setupMessageForwarding(availableExecId, transaction) + logger.info('Cross-replica session activated with user only (retry):', { + execId: availableExecId, + microserviceUuid, + userState: ws.readyState + }) + return } } @@ -641,6 +771,8 @@ class WebSocketServer { const pendingAgent = this.sessionManager.findPendingAgentForExecId(microserviceUuid, availableExecId) if (pendingAgent) { + // Remove user from pending first + this.sessionManager.removePendingUser(microserviceUuid, ws) const session = this.sessionManager.tryActivateSession(microserviceUuid, availableExecId, ws, false, transaction) if (session) { logger.info('Session activated via periodic retry:' + JSON.stringify({ @@ -650,10 +782,32 @@ class WebSocketServer { agentState: pendingAgent.readyState })) - this.setupMessageForwarding(availableExecId, transaction) + await this.setupMessageForwarding(availableExecId, transaction) clearInterval(retryTimer) // Stop retry timer return // Exit early, session activated successfully } + } else { + // Agent is on different replica - activate with user only + logger.info('Periodic retry found PENDING execId but agent is on different replica, activating session with user only:', { + execId: availableExecId, + microserviceUuid + }) + // Remove user from pending first + this.sessionManager.removePendingUser(microserviceUuid, ws) + this.sessionManager.createSession(availableExecId, microserviceUuid, null, ws, transaction) + await MicroserviceExecStatusManager.update( + { microserviceUuid: microserviceUuid }, + { execSessionId: availableExecId, status: microserviceExecState.ACTIVE }, + transaction + ) + await this.setupMessageForwarding(availableExecId, transaction) + logger.info('Cross-replica session activated with user only (periodic retry):', { + execId: availableExecId, + microserviceUuid, + userState: ws.readyState + }) + clearInterval(retryTimer) // Stop retry timer + return } } } catch (retryError) { @@ -777,7 +931,7 @@ class WebSocketServer { // return noisePatterns.some(pattern => pattern.test(output)) // } - setupMessageForwarding (execId, transaction) { + async setupMessageForwarding (execId, transaction) { const session = this.sessionManager.getSession(execId) if (!session) { logger.error('[RELAY] Failed to setup message forwarding: No session found for execId=' + execId) @@ -793,40 +947,81 @@ class WebSocketServer { agentState: agent ? agent.readyState : 'N/A', userState: user ? user.readyState : 'N/A' })) + this.detachPendingKeepAliveHandler(user) + this.detachPendingKeepAliveHandler(agent) + try { + // Pass cleanup callback so queue service can notify us when CLOSE is received + await this.queueService.enableForSession(session, (execId) => { + // Clear timeout if it exists (agent responded to CLOSE) + const timeout = this.pendingCloseTimeouts.get(execId) + if (timeout) { + clearTimeout(timeout) + this.pendingCloseTimeouts.delete(execId) + logger.debug('[RELAY] Cleared pending CLOSE timeout - agent responded', { execId }) + } + const currentTransaction = session.transaction + this.cleanupSession(execId, currentTransaction) + }) + session.queueBridgeEnabled = true + logger.info('[RELAY] AMQP queue bridge enabled for exec session', { + execId, + microserviceUuid: session.microserviceUuid + }) + } catch (error) { + session.queueBridgeEnabled = false + logger.warn('[RELAY] Failed to enable AMQP queue bridge, falling back to direct WebSocket relay', { + execId, + error: error.message + }) + } - // Send activation message to agent - if (agent) { - const activationMsg = { - type: MESSAGE_TYPES.ACTIVATION, - data: Buffer.from(JSON.stringify({ - execId: execId, - microserviceUuid: session.microserviceUuid, - timestamp: Date.now() - })), - microserviceUuid: session.microserviceUuid, + // Send activation message to agent (works for both direct WebSocket and queue-based forwarding) + const activationMsg = { + type: MESSAGE_TYPES.ACTIVATION, + data: Buffer.from(JSON.stringify({ execId: execId, + microserviceUuid: session.microserviceUuid, timestamp: Date.now() - } + })), + microserviceUuid: session.microserviceUuid, + execId: execId, + timestamp: Date.now() + } - this.sendMessageToAgent(agent, activationMsg, execId, session.microserviceUuid) - .then(success => { - if (success) { - logger.info('[RELAY] Session activation complete:' + JSON.stringify({ - execId, - microserviceUuid: session.microserviceUuid, - agentState: agent.readyState - })) - } else { - logger.error('[RELAY] Session activation failed:' + JSON.stringify({ - execId, - microserviceUuid: session.microserviceUuid, - agentState: agent.readyState - })) - // Cleanup the session if activation fails + // sendMessageToAgent handles queue-based forwarding when agent is null + this.sendMessageToAgent(session.agent, activationMsg, execId, session.microserviceUuid) + .then(success => { + if (success) { + logger.info('[RELAY] Session activation complete:' + JSON.stringify({ + execId, + microserviceUuid: session.microserviceUuid, + agentState: session.agent ? session.agent.readyState : 'N/A (cross-replica)', + queueEnabled: this.queueService.shouldUseQueue(execId) + })) + } else { + logger.error('[RELAY] Session activation failed:' + JSON.stringify({ + execId, + microserviceUuid: session.microserviceUuid, + agentState: session.agent ? session.agent.readyState : 'N/A', + queueEnabled: this.queueService.shouldUseQueue(execId) + })) + // Only cleanup if we have a direct agent connection (not queue-based) + if (session.agent) { this.cleanupSession(execId, transaction) } - }) - } + } + }) + .catch(error => { + logger.error('[RELAY] Session activation error:' + JSON.stringify({ + execId, + microserviceUuid: session.microserviceUuid, + error: error.message + })) + // Only cleanup if we have a direct agent connection (not queue-based) + if (session.agent) { + this.cleanupSession(execId, transaction) + } + }) // Remove any previous message handlers to avoid duplicates if (user) { @@ -838,8 +1033,8 @@ class WebSocketServer { agent.removeAllListeners('message') } - // Forward user -> agent - if (user && agent) { + // Forward user -> agent (works for both direct WebSocket and queue-based forwarding) + if (user) { logger.debug('[RELAY] Setting up user->agent message forwarding for execId=' + execId) user.on('message', async (data, isBinary) => { logger.debug('[RELAY] User message received:' + JSON.stringify({ @@ -848,7 +1043,8 @@ class WebSocketServer { dataType: typeof data, dataLength: data.length, userState: user.readyState, - agentState: agent.readyState + agentState: agent ? agent.readyState : 'N/A (cross-replica)', + queueEnabled: this.queueService.shouldUseQueue(execId) })) if (!isBinary) { @@ -859,7 +1055,8 @@ class WebSocketServer { text, length: text.length, userState: user.readyState, - agentState: agent.readyState + agentState: session.agent ? session.agent.readyState : 'N/A (cross-replica)', + queueEnabled: this.queueService.shouldUseQueue(execId) })) // Convert text to binary message in agent's expected format @@ -871,7 +1068,8 @@ class WebSocketServer { timestamp: Date.now() } - await this.sendMessageToAgent(agent, msg, execId, session.microserviceUuid) + // sendMessageToAgent handles queue-based forwarding when agent is null + await this.sendMessageToAgent(session.agent, msg, execId, session.microserviceUuid) return } @@ -885,11 +1083,72 @@ class WebSocketServer { if (msg.type === MESSAGE_TYPES.CLOSE) { logger.info(`[RELAY] User sent CLOSE for execId=${execId}`) - await this.sendMessageToAgent(agent, msg, execId, session.microserviceUuid) - // Get current transaction from the session - const currentTransaction = session.transaction - this.cleanupSession(execId, currentTransaction) - return + + const queueEnabled = this.queueService.shouldUseQueue(execId) + + // Forward CLOSE to agent first + await this.sendMessageToAgent(session.agent, msg, execId, session.microserviceUuid) + + if (queueEnabled) { + // Cross-replica scenario: Don't close socket immediately + // Wait for agent's CLOSE response via queue + // The queue service will handle closing the socket in _handleCloseMessage + // when it receives the agent's CLOSE response + logger.debug('[RELAY] Cross-replica CLOSE: waiting for agent response via queue', { + execId, + microserviceUuid: session.microserviceUuid + }) + + // Set timeout in case agent doesn't respond + const timeout = setTimeout(() => { + const currentSession = this.sessionManager.getSession(execId) + if (currentSession && currentSession.user && currentSession.user.readyState === WebSocket.OPEN) { + logger.warn('[RELAY] Agent did not respond to CLOSE within timeout, closing user socket', { + execId, + microserviceUuid: session.microserviceUuid, + timeout: this.config.closeResponseTimeout + }) + try { + currentSession.user.close(1000, 'Session closed (timeout)') + const currentTransaction = currentSession.transaction + this.cleanupSession(execId, currentTransaction) + } catch (error) { + logger.error('[RELAY] Failed to close user socket on timeout', { + execId, + error: error.message + }) + } + } + this.pendingCloseTimeouts.delete(execId) + }, this.config.closeResponseTimeout) + + this.pendingCloseTimeouts.set(execId, timeout) + // Don't cleanup yet - queue service will call cleanup callback when agent responds + return + } else { + // Same replica: Close immediately (existing behavior) + // Close user WebSocket with code 1000 so client's onclose handler shows "Successfully closed" + // The client expects code 1000 (normal closure) to display the success message + if (user && user.readyState === WebSocket.OPEN) { + try { + user.close(1000, 'Session closed') + logger.debug('[RELAY] Closed user WebSocket with code 1000:' + JSON.stringify({ + execId, + microserviceUuid: session.microserviceUuid + })) + } catch (error) { + logger.warn('[RELAY] Failed to close user WebSocket:' + JSON.stringify({ + execId, + error: error.message + })) + } + } + + // Get current transaction from the session and cleanup + const currentTransaction = session.transaction + this.cleanupSession(execId, currentTransaction) + return + } } if (msg.type === MESSAGE_TYPES.CONTROL) { @@ -919,7 +1178,8 @@ class WebSocketServer { } } - await this.sendMessageToAgent(agent, msg, execId, session.microserviceUuid) + // sendMessageToAgent handles queue-based forwarding when agent is null + await this.sendMessageToAgent(session.agent, msg, execId, session.microserviceUuid) } catch (error) { logger.error('[RELAY] Failed to process binary message:' + JSON.stringify({ execId, @@ -927,12 +1187,14 @@ class WebSocketServer { stack: error.stack, bufferLength: buffer.length, userState: user.readyState, - agentState: agent.readyState + agentState: session.agent ? session.agent.readyState : 'N/A (cross-replica)' })) } }) + } - // Forward agent -> user + // Forward agent -> user (works for both direct WebSocket and queue-based forwarding) + if (agent) { logger.debug('[RELAY] Setting up agent->user message forwarding for execId=' + execId) agent.on('message', async (data, isBinary) => { logger.debug('[RELAY] Agent message received:' + JSON.stringify({ @@ -940,8 +1202,9 @@ class WebSocketServer { isBinary, dataType: typeof data, dataLength: data.length, - userState: user.readyState, - agentState: agent.readyState + userState: session.user ? session.user.readyState : 'N/A (cross-replica)', + agentState: agent.readyState, + queueEnabled: this.queueService.shouldUseQueue(execId) })) try { @@ -956,16 +1219,50 @@ class WebSocketServer { if (msg.type === MESSAGE_TYPES.CLOSE) { logger.info(`[RELAY] Agent sent CLOSE for execId=${execId}`) - if (user.readyState === WebSocket.OPEN) { - user.close(1000, 'Agent closed connection') + + const queueEnabled = this.queueService.shouldUseQueue(execId) + + // In cross-replica scenarios, publish CLOSE to queue so user's replica can handle it + if (queueEnabled) { + try { + // Pass message type so queue receiver can detect CLOSE without decoding + await this.queueService.publishToUser(execId, buffer, { messageType: MESSAGE_TYPES.CLOSE }) + logger.debug('[RELAY] Forwarded agent CLOSE message to user via queue:' + JSON.stringify({ + execId, + type: msg.type + })) + } catch (error) { + logger.error('[RELAY] Failed to enqueue CLOSE message for user', { + execId, + error: error.message + }) + } + } else if (session.user && session.user.readyState === WebSocket.OPEN) { + // Direct connection - close user WebSocket immediately + session.user.close(1000, 'Agent closed connection') } + // Get current transaction from the session const currentTransaction = session.transaction this.cleanupSession(execId, currentTransaction) return } - if (user.readyState === WebSocket.OPEN) { + const queueEnabled = this.queueService.shouldUseQueue(execId) + if (queueEnabled) { + try { + await this.queueService.publishToUser(execId, buffer) + logger.debug('[RELAY] Forwarded agent message to user via queue:' + JSON.stringify({ + execId, + type: msg.type + })) + } catch (error) { + logger.error('[RELAY] Failed to enqueue message for user', { + execId, + error: error.message + }) + } + } else if (session.user && session.user.readyState === WebSocket.OPEN) { if (msg.type === MESSAGE_TYPES.STDOUT || msg.type === MESSAGE_TYPES.STDERR) { if (msg.data && msg.data.length > 0) { // Create MessagePack message for user @@ -978,7 +1275,7 @@ class WebSocketServer { } // Encode and send as binary const encoded = this.encodeMessage(userMsg) - user.send(encoded, { + session.user.send(encoded, { binary: true, compress: false, mask: false, @@ -993,7 +1290,7 @@ class WebSocketServer { })) } } else if (msg.type === MESSAGE_TYPES.CONTROL) { - user.send(data, { + session.user.send(data, { binary: true, compress: false, mask: false, @@ -1001,10 +1298,11 @@ class WebSocketServer { }) } } else { - logger.error('[RELAY] User not ready to receive message:' + JSON.stringify({ + logger.debug('[RELAY] User not available (cross-replica), message should be delivered via queue:' + JSON.stringify({ execId, - userState: user.readyState, - messageType: msg.type + userState: session.user ? session.user.readyState : 'N/A', + messageType: msg.type, + queueEnabled })) } } catch (error) { @@ -1187,6 +1485,14 @@ class WebSocketServer { const session = this.sessionManager.getSession(execId) if (!session) return + // Clear any pending CLOSE timeout + const timeout = this.pendingCloseTimeouts.get(execId) + if (timeout) { + clearTimeout(timeout) + this.pendingCloseTimeouts.delete(execId) + logger.debug('[RELAY] Cleared pending CLOSE timeout during cleanup', { execId }) + } + // Send CLOSE message to agent if it's still connected if (session.agent && session.agent.readyState === WebSocket.OPEN) { const closeMsg = { @@ -1215,7 +1521,8 @@ class WebSocketServer { } } - // Close the connections + // Close the connections (only if not already closed) + // Note: User connection may already be closed if user initiated the close if (session.user && session.user.readyState === WebSocket.OPEN) { session.user.close(1000, 'Session closed') } @@ -1225,6 +1532,13 @@ class WebSocketServer { this.sessionManager.removeSession(execId, transaction) logger.info('[RELAY] Session cleaned up for execId=' + execId) + this.queueService.cleanup(execId) + .catch(error => { + logger.warn('[RELAY] Failed to cleanup queue bridge during session cleanup', { + execId, + error: error.message + }) + }) } // Utility to extract microserviceUuid from path @@ -1243,18 +1557,32 @@ class WebSocketServer { // Helper method for sending messages to agent async sendMessageToAgent (agent, message, execId, microserviceUuid) { - if (!agent || agent.readyState !== WebSocket.OPEN) { - logger.error('[RELAY] Cannot send message - agent not ready:' + JSON.stringify({ - execId, - microserviceUuid, - agentState: agent ? agent.readyState : 'N/A', - messageType: message.type - })) - return false - } - try { const encoded = this.encodeMessage(message) + const isQueueEnabled = this.queueService.shouldUseQueue(execId) + const messageType = typeof message.type === 'number' ? message.type : null + + if (isQueueEnabled) { + await this.queueService.publishToAgent(execId, encoded, { messageType }) + logger.debug('[RELAY] Queued message for agent via AMQP:' + JSON.stringify({ + execId, + microserviceUuid, + messageType: message.type, + encodedLength: encoded.length + })) + return true + } + + if (!agent || agent.readyState !== WebSocket.OPEN) { + logger.error('[RELAY] Cannot send message - agent not ready:' + JSON.stringify({ + execId, + microserviceUuid, + agentState: agent ? agent.readyState : 'N/A', + messageType: message.type + })) + return false + } + agent.send(encoded, { binary: true, compress: false, @@ -1280,6 +1608,65 @@ class WebSocketServer { } } + attachPendingKeepAliveHandler (ws) { + if (!ws || ws._pendingKeepAliveHandler) { + return + } + ws._pendingKeepAliveHandler = (data, isBinary) => { + if (!isBinary) return + let msg + try { + msg = this.decodeMessage(Buffer.from(data)) + } catch (error) { + return + } + if (msg.type === MESSAGE_TYPES.CONTROL) { + const controlData = msg.data ? msg.data.toString() : '' + if (controlData === 'keepalive') { + this._sendKeepAliveResponse(ws, msg.execId || 'pending', msg.microserviceUuid || null) + } + } + } + ws.on('message', ws._pendingKeepAliveHandler) + ws.on('ping', () => { + if (ws.readyState === WebSocket.OPEN) { + try { + ws.pong() + } catch (error) { + logger.debug('[RELAY] Failed to send pong on pending connection', { error: error.message }) + } + } + }) + } + + detachPendingKeepAliveHandler (ws) { + if (ws && ws._pendingKeepAliveHandler) { + ws.removeListener('message', ws._pendingKeepAliveHandler) + ws._pendingKeepAliveHandler = null + } + } + + _sendKeepAliveResponse (ws, execId, microserviceUuid) { + try { + const keepAliveResponse = { + type: MESSAGE_TYPES.CONTROL, + data: Buffer.from('keepalive'), + microserviceUuid, + execId, + timestamp: Date.now() + } + const encoded = this.encodeMessage(keepAliveResponse) + ws.send(encoded, { + binary: true, + compress: false, + mask: false, + fin: true + }) + } catch (error) { + logger.debug('[RELAY] Failed to send keepalive response', { error: error.message }) + } + } + // Helper method to check if auth is configured isAuthConfigured () { const requiredConfigs = [ From c15890b594dfd0728fd4c0afe06571d46dac9547 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emirhan=20Durmu=C5=9F?= Date: Fri, 21 Nov 2025 21:20:48 +0300 Subject: [PATCH 2/2] events track middleware introduced for endpoints auditing, viewer version upgraded --- docs/swagger.yaml | 221 +++++- package-lock.json | 51 +- package.json | 3 +- src/config/controller.yaml | 4 + src/config/env-mapping.js | 4 + src/controllers/event-controller.js | 37 + src/data/managers/event-manager.js | 152 +++++ ....0.5.sql => db_migration_mysql_v1.0.6.sql} | 28 + ..._v1.0.5.sql => db_migration_pg_v1.0.6.sql} | 30 + ...0.5.sql => db_migration_sqlite_v1.0.6.sql} | 30 + src/data/models/event.js | 108 +++ src/data/providers/database-provider.js | 12 +- src/jobs/event-cleanup-job.js | 74 ++ src/middlewares/event-audit-middleware.js | 72 ++ src/routes/event.js | 88 +++ src/schemas/event.js | 64 ++ src/server.js | 5 + src/services/event-service.js | 646 ++++++++++++++++++ src/services/iofog-service.js | 2 +- src/websocket/server.js | 282 +++++++- 20 files changed, 1854 insertions(+), 59 deletions(-) create mode 100644 src/controllers/event-controller.js create mode 100644 src/data/managers/event-manager.js rename src/data/migrations/mysql/{db_migration_mysql_v1.0.5.sql => db_migration_mysql_v1.0.6.sql} (96%) rename src/data/migrations/postgres/{db_migration_pg_v1.0.5.sql => db_migration_pg_v1.0.6.sql} (96%) rename src/data/migrations/sqlite/{db_migration_sqlite_v1.0.5.sql => db_migration_sqlite_v1.0.6.sql} (96%) create mode 100644 src/data/models/event.js create mode 100644 src/jobs/event-cleanup-job.js create mode 100644 src/middlewares/event-audit-middleware.js create mode 100644 src/routes/event.js create mode 100644 src/schemas/event.js create mode 100644 src/services/event-service.js diff --git a/docs/swagger.yaml b/docs/swagger.yaml index f38adde9..de8343a3 100755 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1,6 +1,6 @@ openapi : "3.0.0" info: - version: 3.5.0 + version: 3.5.9 title: Datasance PoT Controller paths: /status: @@ -4935,6 +4935,123 @@ paths: description: Not Found "500": description: Internal Server Error + /events: + get: + tags: + - Events + summary: List audit events + description: Retrieve audit events with optional filters and pagination. + operationId: listEvents + security: + - authToken: [] + parameters: + - in: query + name: limit + description: Maximum number of events to return (default 200, max 1000) + schema: + type: integer + minimum: 1 + maximum: 1000 + required: false + - in: query + name: offset + description: Number of events to skip before collecting results + schema: + type: integer + minimum: 0 + required: false + - in: query + name: startTime + description: Start of time range (Unix timestamp in ms or ISO 8601) + schema: + type: string + required: false + - in: query + name: endTime + description: End of time range (Unix timestamp in ms or ISO 8601) + schema: + type: string + required: false + - in: query + name: endpointType + description: Filter by endpoint type (agent or user) + schema: + type: string + required: false + - in: query + name: resourceType + description: Filter by resource type (agent, microservice, etc.) + schema: + type: string + required: false + - in: query + name: status + description: Filter by status (SUCCESS or FAILED) + schema: + type: string + required: false + - in: query + name: method + description: Filter by HTTP/WS method(s) + schema: + type: array + items: + type: string + style: form + explode: true + required: false + - in: query + name: actorId + description: Filter by actor identifier (username or fog UUID) + schema: + type: string + required: false + - in: query + name: eventType + description: Filter by event type (HTTP, WS_CONNECT, WS_DISCONNECT) + schema: + type: string + required: false + responses: + "200": + description: Events list + content: + application/json: + schema: + $ref: "#/components/schemas/EventListResponse" + "400": + description: Bad Request + "401": + description: Not Authorized + "500": + description: Internal Server Error + delete: + tags: + - Events + summary: Delete audit events + description: Delete all events when `days` is 0, or delete events older than the provided number of days. + operationId: deleteEvents + security: + - authToken: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/EventDeleteRequest" + responses: + "200": + description: Deletion summary + content: + application/json: + schema: + $ref: "#/components/schemas/EventDeleteResponse" + "400": + description: Bad Request + "401": + description: Not Authorized + "500": + description: Internal Server Error tags: - name: Controller description: Manage your controller @@ -4972,6 +5089,8 @@ tags: description: Manage your volume mounts - name: ConfigMap description: Manage your config maps + - name: Events + description: Manage audit events servers: - url: http://localhost:51121/api/v3 components: @@ -5014,6 +5133,106 @@ components: description: New Flow Info required: true schemas: + EventRecord: + type: object + properties: + id: + type: integer + timestamp: + type: integer + format: int64 + eventType: + type: string + endpointType: + type: string + actorId: + type: string + nullable: true + method: + type: string + nullable: true + resourceType: + type: string + nullable: true + resourceId: + type: string + nullable: true + endpointPath: + type: string + ipAddress: + type: string + nullable: true + status: + type: string + statusCode: + type: integer + nullable: true + statusMessage: + type: string + nullable: true + requestId: + type: string + nullable: true + createdAt: + type: string + format: date-time + updatedAt: + type: string + format: date-time + required: + - id + - timestamp + - eventType + - endpointType + - endpointPath + - status + - createdAt + - updatedAt + EventListResponse: + type: object + properties: + events: + type: array + items: + $ref: "#/components/schemas/EventRecord" + total: + type: integer + limit: + type: integer + offset: + type: integer + required: + - events + - total + - limit + - offset + EventDeleteRequest: + type: object + properties: + days: + type: integer + minimum: 0 + maximum: 365 + required: + - days + EventDeleteResponse: + type: object + properties: + deletedCount: + type: integer + deletedBefore: + type: string + format: date-time + nullable: true + deletedAt: + type: string + format: date-time + deletedAll: + type: boolean + required: + - deletedCount + - deletedAt + - deletedAll EdgeResourcesListResponse: type: object properties: diff --git a/package-lock.json b/package-lock.json index e6d7aaac..adc65ed0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "hasInstallScript": true, "license": "EPL-2.0", "dependencies": { - "@datasance/ecn-viewer": "1.2.3", + "@datasance/ecn-viewer": "1.2.6", "@kubernetes/client-node": "^0.22.3", "@msgpack/msgpack": "^3.1.2", "@opentelemetry/api": "^1.9.0", @@ -51,7 +51,6 @@ "nconf": "0.12.1", "node-fetch-npm": "^2.0.4", "node-forge": "^1.3.1", - "node-schedule": "^2.1.1", "os": "0.1.2", "path": "0.12.7", "pg": "8.12.0", @@ -427,9 +426,9 @@ } }, "node_modules/@datasance/ecn-viewer": { - "version": "1.2.3", - "resolved": "https://registry.npmjs.org/@datasance/ecn-viewer/-/ecn-viewer-1.2.3.tgz", - "integrity": "sha512-Q2fc4cCpzBrISo97itDuTgZnBfnx5vg+wKBG8IdkvbFIG253u2Su+DKyd6lJpf0kKZK8Ic6vei0XaTDBubNYrw==" + "version": "1.2.6", + "resolved": "https://registry.npmjs.org/@datasance/ecn-viewer/-/ecn-viewer-1.2.6.tgz", + "integrity": "sha512-/NV1ll6Vt97P3Fdb8oNDZLHGNNNoUW8zF+VVB0RFEEZpUBZqq5vWx72RyNoTP6Jdr7NqnZUb6PA/1Z9GTN3fvw==" }, "node_modules/@eslint-community/eslint-utils": { "version": "4.7.0", @@ -4139,17 +4138,6 @@ "node": ">= 0.10" } }, - "node_modules/cron-parser": { - "version": "4.9.0", - "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", - "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", - "dependencies": { - "luxon": "^3.2.1" - }, - "engines": { - "node": ">=12.0.0" - } - }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -8533,11 +8521,6 @@ "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==" }, - "node_modules/long-timeout": { - "version": "0.1.1", - "resolved": "https://registry.npmjs.org/long-timeout/-/long-timeout-0.1.1.tgz", - "integrity": "sha512-BFRuQUqc7x2NWxfJBCyUrN8iYUYznzL9JROmRz1gZ6KlOIgmoD+njPVbb+VNn2nGMKggMsK79iUNErillsrx7w==" - }, "node_modules/loose-envify": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.4.0.tgz", @@ -8573,14 +8556,6 @@ "es5-ext": "~0.10.2" } }, - "node_modules/luxon": { - "version": "3.6.1", - "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.6.1.tgz", - "integrity": "sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==", - "engines": { - "node": ">=12" - } - }, "node_modules/make-dir": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-3.1.0.tgz", @@ -9645,19 +9620,6 @@ "integrity": "sha512-xxOWJsBKtzAq7DY0J+DTzuz58K8e7sJbdgwkbMWQe8UYB6ekmsQ45q0M/tJDsGaZmbC+l7n57UV8Hl5tHxO9uw==", "dev": true }, - "node_modules/node-schedule": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/node-schedule/-/node-schedule-2.1.1.tgz", - "integrity": "sha512-OXdegQq03OmXEjt2hZP33W2YPs/E5BcFQks46+G2gAxs4gHOIVD1u7EqlYLYSKsaIpyKCK9Gbk0ta1/gjRSMRQ==", - "dependencies": { - "cron-parser": "^4.2.0", - "long-timeout": "0.1.1", - "sorted-array-functions": "^1.3.0" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/nopt": { "version": "7.2.1", "resolved": "https://registry.npmjs.org/nopt/-/nopt-7.2.1.tgz", @@ -12747,11 +12709,6 @@ "atomic-sleep": "^1.0.0" } }, - "node_modules/sorted-array-functions": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/sorted-array-functions/-/sorted-array-functions-1.3.0.tgz", - "integrity": "sha512-2sqgzeFlid6N4Z2fUQ1cvFmTOLRi/sEDzSQ0OKYchqgoPmQBVyM3959qYx3fpS6Esef80KjmpgPeEr028dP3OA==" - }, "node_modules/source-map": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", diff --git a/package.json b/package.json index 9057a930..3d8e4ce8 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "iofog-controller": "src/main.js" }, "dependencies": { - "@datasance/ecn-viewer": "1.2.3", + "@datasance/ecn-viewer": "1.2.6", "@kubernetes/client-node": "^0.22.3", "@msgpack/msgpack": "^3.1.2", "@opentelemetry/api": "^1.9.0", @@ -96,7 +96,6 @@ "nconf": "0.12.1", "node-fetch-npm": "^2.0.4", "node-forge": "^1.3.1", - "node-schedule": "^2.1.1", "os": "0.1.2", "path": "0.12.7", "pg": "8.12.0", diff --git a/src/config/controller.yaml b/src/config/controller.yaml index cd93ad6d..252c752b 100644 --- a/src/config/controller.yaml +++ b/src/config/controller.yaml @@ -51,6 +51,10 @@ settings: fogStatusUpdateInterval: 30 # Fog status update interval in seconds fogStatusUpdateTolerance: 3 # Fog status update tolerance fogExpiredTokenCleanupInterval: 300 # Fog expired token cleanup interval in seconds + eventRetentionDays: 30 # Days to retain events (default: 30) + eventCleanupInterval: 86400 # Cleanup job interval in seconds (default: 24 hours) + eventAuditEnabled: true # Enable/disable event auditing + eventCaptureIpAddress: true # Capture IP address (default: true, set false for privacy compliance) # Database Configuration database: diff --git a/src/config/env-mapping.js b/src/config/env-mapping.js index d4365fb5..4a097817 100644 --- a/src/config/env-mapping.js +++ b/src/config/env-mapping.js @@ -39,6 +39,10 @@ module.exports = { 'FOG_STATUS_UPDATE_INTERVAL': 'settings.fogStatusUpdateInterval', 'FOG_STATUS_UPDATE_TOLERANCE': 'settings.fogStatusUpdateTolerance', 'FOG_EXPIRED_TOKEN_CLEANUP_INTERVAL': 'settings.fogExpiredTokenCleanupInterval', + 'EVENT_RETENTION_DAYS': 'settings.eventRetentionDays', + 'EVENT_CLEANUP_INTERVAL': 'settings.eventCleanupInterval', + 'EVENT_AUDIT_ENABLED': 'settings.eventAuditEnabled', + 'EVENT_CAPTURE_IP_ADDRESS': 'settings.eventCaptureIpAddress', // Database Configuration 'DB_PROVIDER': 'database.provider', diff --git a/src/controllers/event-controller.js b/src/controllers/event-controller.js new file mode 100644 index 00000000..62f49107 --- /dev/null +++ b/src/controllers/event-controller.js @@ -0,0 +1,37 @@ +/* + * ******************************************************************************* + * * Copyright (c) 2023 Datasance Teknoloji A.S. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Eclipse Public License v. 2.0 + * * http://www.eclipse.org/legal/epl-2.0 + * * + * * SPDX-License-Identifier: EPL-2.0 + * ******************************************************************************* + * + */ + +const EventService = require('../services/event-service') + +/** + * List events with query filters and pagination + * @param {object} req - Express request object + * @returns {Promise} Events list with pagination info + */ +async function listEventsEndpoint (req) { + return EventService.listEvents({ query: req.query }, { req }) +} + +/** + * Delete old events manually + * @param {object} req - Express request object + * @returns {Promise} Deletion result + */ +async function deleteOldEventsEndpoint (req) { + return EventService.deleteEvents({ body: req.body }, { req }) +} + +module.exports = { + listEventsEndpoint, + deleteOldEventsEndpoint +} diff --git a/src/data/managers/event-manager.js b/src/data/managers/event-manager.js new file mode 100644 index 00000000..7b32bdff --- /dev/null +++ b/src/data/managers/event-manager.js @@ -0,0 +1,152 @@ +/* + * ******************************************************************************* + * * Copyright (c) 2023 Datasance Teknoloji A.S. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Eclipse Public License v. 2.0 + * * http://www.eclipse.org/legal/epl-2.0 + * * + * * SPDX-License-Identifier: EPL-2.0 + * ******************************************************************************* + * + */ + +const BaseManager = require('./base-manager') +const models = require('../models') +const Event = models.Event +const { Op } = require('sequelize') +const AppHelper = require('../../helpers/app-helper') +const logger = require('../../logger') + +class EventManager extends BaseManager { + getEntity () { + return Event + } + + async findAllWithFilters (filters, transaction) { + AppHelper.checkTransaction(transaction) + + const where = {} + + // Time range filters + if (filters.startTime || filters.endTime) { + where.timestamp = {} + if (filters.startTime) { + where.timestamp[Op.gte] = filters.startTime + } + if (filters.endTime) { + where.timestamp[Op.lte] = filters.endTime + } + } + + // Endpoint type filter + if (filters.endpointType) { + where.endpointType = filters.endpointType + } + + // Resource type filter + if (filters.resourceType) { + where.resourceType = filters.resourceType + } + + // Status filter + if (filters.status) { + where.status = filters.status + } + + // Method filter (can be array for multiple methods) + if (filters.method) { + if (Array.isArray(filters.method)) { + where.method = { [Op.in]: filters.method } + } else { + where.method = filters.method + } + } + + // Actor ID filter + if (filters.actorId) { + where.actorId = filters.actorId + } + + // Event type filter + if (filters.eventType) { + where.eventType = filters.eventType + } + + // Ensure limit is a valid positive integer + let limit = 200 // Default + if (filters.limit !== undefined && filters.limit !== null) { + const parsedLimit = parseInt(filters.limit) + if (!isNaN(parsedLimit) && parsedLimit > 0) { + limit = Math.min(parsedLimit, 1000) + } + } + + // Ensure offset is a valid non-negative integer + let offset = 0 + if (filters.offset !== undefined && filters.offset !== null) { + const parsedOffset = parseInt(filters.offset) + if (!isNaN(parsedOffset) && parsedOffset >= 0) { + offset = parsedOffset + } + } + + const options = { + where: where, + order: [['timestamp', 'DESC']], + limit: Number(limit), // Ensure it's a number + offset: Number(offset) // Ensure it's a number + } + + if (!transaction.fakeTransaction) { + options.transaction = transaction + } + + const { count, rows } = await Event.findAndCountAll(options) + + // CRITICAL: Always enforce the limit, even if Sequelize returns more rows + // Sequelize's findAndCountAll sometimes doesn't respect limit in certain scenarios + const limitedRows = rows.slice(0, limit) + + if (rows.length > limit) { + logger.warn(`Sequelize returned ${rows.length} rows but limit was ${limit}. Limiting to ${limit} rows.`) + } + + logger.debug(`Event query final - returning ${limitedRows.length} events`) + + return { + events: limitedRows, + total: count, + limit: limit, + offset: offset + } + } + + async deleteEventsOlderThanDays (days, transaction) { + AppHelper.checkTransaction(transaction) + + const options = { + where: {} + } + + // Special case: days = 0 means delete ALL events (no timestamp filter) + // days > 0 means delete events older than that many days + if (days > 0) { + const cutoffTimestamp = Date.now() - (days * 24 * 60 * 60 * 1000) + options.where.timestamp = { + [Op.lt]: cutoffTimestamp + } + } + // If days = 0, where clause is empty, so all events will be deleted + + if (!transaction.fakeTransaction) { + options.transaction = transaction + } + + const deletedCount = await Event.destroy(options) + return deletedCount + } +} + +const instance = new EventManager() +module.exports = instance diff --git a/src/data/migrations/mysql/db_migration_mysql_v1.0.5.sql b/src/data/migrations/mysql/db_migration_mysql_v1.0.6.sql similarity index 96% rename from src/data/migrations/mysql/db_migration_mysql_v1.0.5.sql rename to src/data/migrations/mysql/db_migration_mysql_v1.0.6.sql index b0e86802..27868646 100644 --- a/src/data/migrations/mysql/db_migration_mysql_v1.0.5.sql +++ b/src/data/migrations/mysql/db_migration_mysql_v1.0.6.sql @@ -813,4 +813,32 @@ ALTER TABLE Microservices ADD COLUMN host_network_mode BOOLEAN DEFAULT false; ALTER TABLE Microservices ADD COLUMN is_privileged BOOLEAN DEFAULT false; ALTER TABLE Microservices DROP COLUMN root_host_access; +CREATE TABLE IF NOT EXISTS Events ( + id INT AUTO_INCREMENT PRIMARY KEY NOT NULL, + timestamp BIGINT NOT NULL, + event_type VARCHAR(20) NOT NULL, + endpoint_type VARCHAR(10) NOT NULL, + actor_id VARCHAR(255), + method VARCHAR(10), + resource_type VARCHAR(50), + resource_id VARCHAR(255), + endpoint_path TEXT NOT NULL, + ip_address VARCHAR(45), + status VARCHAR(20) NOT NULL, + status_code INT, + status_message TEXT, + request_id VARCHAR(255), + created_at DATETIME, + updated_at DATETIME +); + +CREATE INDEX idx_events_timestamp ON Events (timestamp); +CREATE INDEX idx_events_endpoint_type ON Events (endpoint_type); +CREATE INDEX idx_events_actor_id ON Events (actor_id); +CREATE INDEX idx_events_resource_type ON Events (resource_type); +CREATE INDEX idx_events_status ON Events (status); +CREATE INDEX idx_events_method ON Events (method); +CREATE INDEX idx_events_event_type ON Events (event_type); +CREATE INDEX idx_events_created_at ON Events (created_at); + COMMIT; \ No newline at end of file diff --git a/src/data/migrations/postgres/db_migration_pg_v1.0.5.sql b/src/data/migrations/postgres/db_migration_pg_v1.0.6.sql similarity index 96% rename from src/data/migrations/postgres/db_migration_pg_v1.0.5.sql rename to src/data/migrations/postgres/db_migration_pg_v1.0.6.sql index 09d6404b..405d70d2 100644 --- a/src/data/migrations/postgres/db_migration_pg_v1.0.5.sql +++ b/src/data/migrations/postgres/db_migration_pg_v1.0.6.sql @@ -812,3 +812,33 @@ ALTER TABLE "Microservices" ADD COLUMN is_activated BOOLEAN DEFAULT true; ALTER TABLE "Microservices" ADD COLUMN host_network_mode BOOLEAN DEFAULT false; ALTER TABLE "Microservices" ADD COLUMN is_privileged BOOLEAN DEFAULT false; ALTER TABLE "Microservices" DROP COLUMN root_host_access; + +CREATE TABLE IF NOT EXISTS "Events" ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY NOT NULL, + timestamp BIGINT NOT NULL, + event_type VARCHAR(20) NOT NULL, + endpoint_type VARCHAR(10) NOT NULL, + actor_id VARCHAR(255), + method VARCHAR(10), + resource_type VARCHAR(50), + resource_id VARCHAR(255), + endpoint_path TEXT NOT NULL, + ip_address VARCHAR(45), + status VARCHAR(20) NOT NULL, + status_code INT, + status_message TEXT, + request_id VARCHAR(255), + created_at TIMESTAMP(0), + updated_at TIMESTAMP(0) +); + + +CREATE INDEX idx_events_timestamp ON "Events" (timestamp); +CREATE INDEX idx_events_endpoint_type ON "Events" (endpoint_type); +CREATE INDEX idx_events_actor_id ON "Events" (actor_id); +CREATE INDEX idx_events_resource_type ON "Events" (resource_type); +CREATE INDEX idx_events_status ON "Events" (status); +CREATE INDEX idx_events_method ON "Events" (method); +CREATE INDEX idx_events_event_type ON "Events" (event_type); +CREATE INDEX idx_events_created_at ON "Events" (created_at); + diff --git a/src/data/migrations/sqlite/db_migration_sqlite_v1.0.5.sql b/src/data/migrations/sqlite/db_migration_sqlite_v1.0.6.sql similarity index 96% rename from src/data/migrations/sqlite/db_migration_sqlite_v1.0.5.sql rename to src/data/migrations/sqlite/db_migration_sqlite_v1.0.6.sql index ab609d82..8e3ca8e1 100644 --- a/src/data/migrations/sqlite/db_migration_sqlite_v1.0.5.sql +++ b/src/data/migrations/sqlite/db_migration_sqlite_v1.0.6.sql @@ -798,3 +798,33 @@ ALTER TABLE Microservices ADD COLUMN is_activated BOOLEAN DEFAULT true; ALTER TABLE Microservices ADD COLUMN host_network_mode BOOLEAN DEFAULT false; ALTER TABLE Microservices ADD COLUMN is_privileged BOOLEAN DEFAULT false; + +CREATE TABLE IF NOT EXISTS Events ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + timestamp BIGINT NOT NULL, + event_type VARCHAR(20) NOT NULL, + endpoint_type VARCHAR(10) NOT NULL, + actor_id VARCHAR(255), + method VARCHAR(10), + resource_type VARCHAR(50), + resource_id VARCHAR(255), + endpoint_path TEXT NOT NULL, + ip_address VARCHAR(45), + status VARCHAR(20) NOT NULL, + status_code INTEGER, + status_message TEXT, + request_id VARCHAR(255), + created_at DATETIME, + updated_at DATETIME +); + + +CREATE INDEX IF NOT EXISTS idx_events_timestamp ON Events (timestamp); +CREATE INDEX IF NOT EXISTS idx_events_endpoint_type ON Events (endpoint_type); +CREATE INDEX IF NOT EXISTS idx_events_actor_id ON Events (actor_id); +CREATE INDEX IF NOT EXISTS idx_events_resource_type ON Events (resource_type); +CREATE INDEX IF NOT EXISTS idx_events_status ON Events (status); +CREATE INDEX IF NOT EXISTS idx_events_method ON Events (method); +CREATE INDEX IF NOT EXISTS idx_events_event_type ON Events (event_type); +CREATE INDEX IF NOT EXISTS idx_events_created_at ON Events (created_at); + diff --git a/src/data/models/event.js b/src/data/models/event.js new file mode 100644 index 00000000..27a095e7 --- /dev/null +++ b/src/data/models/event.js @@ -0,0 +1,108 @@ +'use strict' +module.exports = (sequelize, DataTypes) => { + const Event = sequelize.define('Event', { + id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false, + field: 'id' + }, + timestamp: { + type: DataTypes.BIGINT, + allowNull: false, + field: 'timestamp' + }, + eventType: { + type: DataTypes.STRING(20), + allowNull: false, + field: 'event_type' + }, + endpointType: { + type: DataTypes.STRING(10), + allowNull: false, + field: 'endpoint_type' + }, + actorId: { + type: DataTypes.STRING(255), + allowNull: true, + field: 'actor_id' + }, + method: { + type: DataTypes.STRING(10), + allowNull: true, + field: 'method' + }, + resourceType: { + type: DataTypes.STRING(50), + allowNull: true, + field: 'resource_type' + }, + resourceId: { + type: DataTypes.STRING(255), + allowNull: true, + field: 'resource_id' + }, + endpointPath: { + type: DataTypes.TEXT, + allowNull: false, + field: 'endpoint_path' + }, + ipAddress: { + type: DataTypes.STRING(45), + allowNull: true, + field: 'ip_address' + }, + status: { + type: DataTypes.STRING(20), + allowNull: false, + field: 'status' + }, + statusCode: { + type: DataTypes.INTEGER, + allowNull: true, + field: 'status_code' + }, + statusMessage: { + type: DataTypes.TEXT, + allowNull: true, + field: 'status_message' + }, + requestId: { + type: DataTypes.STRING(255), + allowNull: true, + field: 'request_id' + } + }, { + tableName: 'Events', + timestamps: true, + underscored: true, + indexes: [ + { + fields: ['timestamp'] + }, + { + fields: ['endpoint_type'] + }, + { + fields: ['actor_id'] + }, + { + fields: ['resource_type'] + }, + { + fields: ['status'] + }, + { + fields: ['method'] + }, + { + fields: ['event_type'] + }, + { + fields: ['created_at'] + } + ] + }) + return Event +} diff --git a/src/data/providers/database-provider.js b/src/data/providers/database-provider.js index 941edff7..134b4bf7 100644 --- a/src/data/providers/database-provider.js +++ b/src/data/providers/database-provider.js @@ -251,8 +251,8 @@ class DatabaseProvider { // SQLite migration async runMigrationSQLite (dbName) { - const migrationSqlPath = path.resolve(__dirname, '../migrations/sqlite/db_migration_sqlite_v1.0.5.sql') - const migrationVersion = '1.0.5' + const migrationSqlPath = path.resolve(__dirname, '../migrations/sqlite/db_migration_sqlite_v1.0.6.sql') + const migrationVersion = '1.0.6' if (!fs.existsSync(migrationSqlPath)) { logger.error(`Migration file not found: ${migrationSqlPath}`) @@ -324,8 +324,8 @@ class DatabaseProvider { // MySQL migration async runMigrationMySQL (db) { - const migrationSqlPath = path.resolve(__dirname, '../migrations/mysql/db_migration_mysql_v1.0.5.sql') - const migrationVersion = '1.0.5' + const migrationSqlPath = path.resolve(__dirname, '../migrations/mysql/db_migration_mysql_v1.0.6.sql') + const migrationVersion = '1.0.6' if (!fs.existsSync(migrationSqlPath)) { logger.error(`Migration file not found: ${migrationSqlPath}`) @@ -385,8 +385,8 @@ class DatabaseProvider { // PostgreSQL migration async runMigrationPostgres (db) { - const migrationSqlPath = path.resolve(__dirname, '../migrations/postgres/db_migration_pg_v1.0.5.sql') - const migrationVersion = '1.0.5' + const migrationSqlPath = path.resolve(__dirname, '../migrations/postgres/db_migration_pg_v1.0.6.sql') + const migrationVersion = '1.0.6' if (!fs.existsSync(migrationSqlPath)) { logger.error(`Migration file not found: ${migrationSqlPath}`) diff --git a/src/jobs/event-cleanup-job.js b/src/jobs/event-cleanup-job.js new file mode 100644 index 00000000..7b9f0907 --- /dev/null +++ b/src/jobs/event-cleanup-job.js @@ -0,0 +1,74 @@ +/* + * ******************************************************************************* + * * Copyright (c) 2023 Datasance Teknoloji A.S. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Eclipse Public License v. 2.0 + * * http://www.eclipse.org/legal/epl-2.0 + * * + * * SPDX-License-Identifier: EPL-2.0 + * ******************************************************************************* + * + */ + +const EventManager = require('../data/managers/event-manager') +const EventService = require('../services/event-service') +const Config = require('../config') +const logger = require('../logger') + +async function run () { + try { + await cleanupOldEvents() + } catch (error) { + logger.error('Error during event cleanup:', error) + } finally { + // Schedule next run with current interval (may have changed via env var) + const currentInterval = process.env.EVENT_CLEANUP_INTERVAL || Config.get('settings.eventCleanupInterval', 86400) + setTimeout(run, currentInterval * 1000) + } +} + +async function cleanupOldEvents () { + try { + // Read retention days from config + const retentionDays = process.env.EVENT_RETENTION_DAYS || Config.get('settings.eventRetentionDays', 30) + + logger.debug(`Starting cleanup of events older than ${retentionDays} days`) + const count = await EventManager.deleteEventsOlderThanDays(retentionDays, { fakeTransaction: true }) + logger.info(`Cleaned up ${count} events older than ${retentionDays} days`) + + // Create audit trail for automated cleanup (non-blocking) + // This allows admins to distinguish between manual deletions and automated cleanup + if (count > 0) { + setImmediate(async () => { + try { + await EventService.createEvent({ + timestamp: Date.now(), + eventType: 'HTTP', + endpointType: 'user', + actorId: 'SYSTEM_CLEANUP', + method: 'DELETE', + resourceType: 'event', + resourceId: null, + endpointPath: '/api/v3/events', + ipAddress: null, + status: 'SUCCESS', + statusCode: 200, + statusMessage: `Automated cleanup: Deleted ${count} events older than ${retentionDays} days`, + requestId: null + }, { fakeTransaction: true }).catch(err => { + logger.error('Failed to create cleanup job audit record (non-blocking):', err) + }) + } catch (error) { + logger.error('Error creating cleanup job audit record (non-blocking):', error) + } + }) + } + } catch (error) { + logger.error('Error during event cleanup:', error) + } +} + +module.exports = { + run +} diff --git a/src/middlewares/event-audit-middleware.js b/src/middlewares/event-audit-middleware.js new file mode 100644 index 00000000..24b3e75a --- /dev/null +++ b/src/middlewares/event-audit-middleware.js @@ -0,0 +1,72 @@ +/* + * ******************************************************************************* + * * Copyright (c) 2023 Datasance Teknoloji A.S. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Eclipse Public License v. 2.0 + * * http://www.eclipse.org/legal/epl-2.0 + * * + * * SPDX-License-Identifier: EPL-2.0 + * ******************************************************************************* + * + */ + +const EventService = require('../services/event-service') +const config = require('../config') +const logger = require('../logger') + +/** + * Event audit middleware for HTTP requests + * Tracks all non-GET API operations (POST, PATCH, DELETE, PUT) + * CRITICAL: This middleware is fully async and non-blocking. + * Event logging failures will never affect request processing. + */ +function eventAuditMiddleware (req, res, next) { + // Only track non-GET methods + if (req.method === 'GET') { + return next() + } + + // Don't audit DELETE on events endpoint (to avoid recursion and noise) + // The DELETE endpoint controller will explicitly create an event AFTER successful deletion + if (req.method === 'DELETE' && req.path === '/api/v3/events') { + return next() + } + + // Check if auditing is enabled (reads from YAML or env var) + const auditEnabled = process.env.EVENT_AUDIT_ENABLED || config.get('settings.eventAuditEnabled', true) + if (!auditEnabled) { + return next() + } + + // Capture request start time + const startTime = Date.now() + + // Store original end function + const originalEnd = res.end + + // Wrap response.end to capture status + res.end = function (...args) { + // Call original end first + originalEnd.apply(this, args) + + // Defer event logging to next tick - NEVER AWAIT + setImmediate(async () => { + try { + // Fire and forget - never await + EventService.createHttpEvent(req, res, startTime).catch(err => { + // Silent error handling - never throw + logger.error('Event logging failed (non-blocking):', err) + }) + } catch (error) { + // Catch any synchronous errors + logger.error('Event logging setup failed (non-blocking):', error) + // Don't throw - request already completed + } + }) + } + + next() +} + +module.exports = eventAuditMiddleware diff --git a/src/routes/event.js b/src/routes/event.js new file mode 100644 index 00000000..d9588674 --- /dev/null +++ b/src/routes/event.js @@ -0,0 +1,88 @@ +/* + * ******************************************************************************* + * * Copyright (c) 2023 Datasance Teknoloji A.S. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Eclipse Public License v. 2.0 + * * http://www.eclipse.org/legal/epl-2.0 + * * + * * SPDX-License-Identifier: EPL-2.0 + * ******************************************************************************* + * + */ + +const constants = require('../helpers/constants') +const EventController = require('../controllers/event-controller') +const ResponseDecorator = require('../decorators/response-decorator') +const logger = require('../logger') +const Errors = require('../helpers/errors') +const keycloak = require('../config/keycloak.js').initKeycloak() + +module.exports = [ + { + method: 'get', + path: '/api/v3/events', + middleware: async (req, res) => { + logger.apiReq(req) + + const successCode = constants.HTTP_CODE_SUCCESS + const errorCodes = [ + { + code: constants.HTTP_CODE_UNAUTHORIZED, + errors: [Errors.AuthenticationError] + }, + { + code: constants.HTTP_CODE_BAD_REQUEST, + errors: [Errors.ValidationError] + } + ] + + // Protected with Keycloak (SRE role only) + await keycloak.protect(['SRE'])(req, res, async () => { + const listEventsEndpoint = ResponseDecorator.handleErrors(EventController.listEventsEndpoint, successCode, errorCodes) + const responseObject = await listEventsEndpoint(req) + const user = req.kauth.grant.access_token.content.preferred_username + res + .status(responseObject.code) + .send(responseObject.body) + + logger.apiRes({ req: req, user: user, res: res, responseObject: responseObject }) + }) + } + }, + { + method: 'delete', + path: '/api/v3/events', + middleware: async (req, res) => { + logger.apiReq(req) + + const successCode = constants.HTTP_CODE_SUCCESS + const errorCodes = [ + { + code: constants.HTTP_CODE_UNAUTHORIZED, + errors: [Errors.AuthenticationError] + }, + { + code: constants.HTTP_CODE_FORBIDDEN, + errors: [Errors.AuthorizationError] + }, + { + code: constants.HTTP_CODE_BAD_REQUEST, + errors: [Errors.ValidationError] + } + ] + + // Protected with Keycloak (SRE role only) + await keycloak.protect(['SRE'])(req, res, async () => { + const deleteOldEventsEndpoint = ResponseDecorator.handleErrors(EventController.deleteOldEventsEndpoint, successCode, errorCodes) + const responseObject = await deleteOldEventsEndpoint(req) + const user = req.kauth.grant.access_token.content.preferred_username + res + .status(responseObject.code) + .send(responseObject.body) + + logger.apiRes({ req: req, user: user, res: res, responseObject: responseObject }) + }) + } + } +] diff --git a/src/schemas/event.js b/src/schemas/event.js new file mode 100644 index 00000000..5803045b --- /dev/null +++ b/src/schemas/event.js @@ -0,0 +1,64 @@ +const eventListQuery = { + id: '/eventListQuery', + type: 'object', + properties: { + startTime: { + anyOf: [ + { type: 'number' }, + { type: 'string' } + ] + }, + endTime: { + anyOf: [ + { type: 'number' }, + { type: 'string' } + ] + }, + endpointType: { type: 'string', minLength: 1 }, + resourceType: { type: 'string', minLength: 1 }, + status: { type: 'string', minLength: 1 }, + method: { + anyOf: [ + { type: 'string', minLength: 1 }, + { + type: 'array', + items: { type: 'string', minLength: 1 } + } + ] + }, + actorId: { type: 'string', minLength: 1 }, + eventType: { type: 'string', minLength: 1 }, + limit: { + anyOf: [ + { type: 'number', minimum: 1 }, + { type: 'string', pattern: '^\\d+$' } + ] + }, + offset: { + anyOf: [ + { type: 'number', minimum: 0 }, + { type: 'string', pattern: '^\\d+$' } + ] + } + }, + additionalProperties: false +} + +const eventDeleteRequest = { + id: '/eventDeleteRequest', + type: 'object', + properties: { + days: { + type: 'number', + minimum: 0, + maximum: 365 + } + }, + required: ['days'], + additionalProperties: false +} + +module.exports = { + mainSchemas: [eventListQuery, eventDeleteRequest], + innerSchemas: [] +} diff --git a/src/server.js b/src/server.js index 5cae2fff..f5b1d3f8 100755 --- a/src/server.js +++ b/src/server.js @@ -85,6 +85,11 @@ initialize().then(() => { next() }) + // Event audit middleware - tracks non-GET operations + // Must be after authentication middleware but before route handlers + const eventAuditMiddleware = require('./middlewares/event-audit-middleware') + app.use(eventAuditMiddleware) + global.appRoot = path.resolve(__dirname) const registerRoute = (route) => { diff --git a/src/services/event-service.js b/src/services/event-service.js new file mode 100644 index 00000000..2581bfd0 --- /dev/null +++ b/src/services/event-service.js @@ -0,0 +1,646 @@ +/* + * ******************************************************************************* + * * Copyright (c) 2023 Datasance Teknoloji A.S. + * * + * * This program and the accompanying materials are made available under the + * * terms of the Eclipse Public License v. 2.0 + * * http://www.eclipse.org/legal/epl-2.0 + * * + * * SPDX-License-Identifier: EPL-2.0 + * ******************************************************************************* + * + */ + +const EventManager = require('../data/managers/event-manager') +const config = require('../config') +const logger = require('../logger') +const Errors = require('../helpers/errors') +const Validator = require('../schemas') +const TransactionDecorator = require('../decorators/transaction-decorator') + +/** + * Extract resource type from URL path + * @param {string} path - URL path + * @returns {string|null} Resource type or null + */ +function extractResourceType (path) { + if (!path) return null + + // Resource type mapping based on URL patterns + const resourcePatterns = [ + { pattern: /^\/api\/v3\/iofog/, type: 'agent' }, + { pattern: /^\/api\/v3\/microservices/, type: 'microservice' }, + { pattern: /^\/api\/v3\/applications/, type: 'application' }, + { pattern: /^\/api\/v3\/agent/, type: 'agent' }, + { pattern: /^\/api\/v3\/config/, type: 'config' }, + { pattern: /^\/api\/v3\/secrets/, type: 'secret' }, + { pattern: /^\/api\/v3\/services/, type: 'service' }, + { pattern: /^\/api\/v3\/certificates/, type: 'certificate' }, + { pattern: /^\/api\/v3\/tunnels/, type: 'tunnel' }, + { pattern: /^\/api\/v3\/routes/, type: 'routing' }, + { pattern: /^\/api\/v3\/router/, type: 'router' }, + { pattern: /^\/api\/v3\/registries/, type: 'registry' }, + { pattern: /^\/api\/v3\/volumeMounts/, type: 'volumeMount' }, + { pattern: /^\/api\/v3\/configMaps/, type: 'configMap' }, + { pattern: /^\/api\/v3\/edgeResources/, type: 'edgeResource' }, + { pattern: /^\/api\/v3\/diagnostics/, type: 'diagnostics' }, + { pattern: /^\/api\/v3\/flows/, type: 'application' }, + { pattern: /^\/api\/v3\/applicationTemplates/, type: 'applicationTemplate' }, + { pattern: /^\/api\/v3\/catalog/, type: 'catalog' }, + { pattern: /^\/api\/v3\/controller/, type: 'controller' }, + { pattern: /^\/api\/v3\/users/, type: 'user' }, + { pattern: /^\/api\/v3\/capabilities/, type: 'capabilities' }, + { pattern: /^\/api\/v3\/events/, type: 'event' } + ] + + for (const { pattern, type } of resourcePatterns) { + if (pattern.test(path)) { + return type + } + } + + return null +} + +/** + * Extract resource ID from URL path, params, or body + * @param {object} req - Express request object + * @returns {string|null} Resource ID or null + */ +function extractResourceId (req) { + if (!req) return null + + // Try path parameters first (most common) + if (req.params) { + // Common parameter names + const paramNames = ['uuid', 'id', 'name', 'key', 'appName', 'microserviceUuid'] + for (const paramName of paramNames) { + if (req.params[paramName]) { + return req.params[paramName] + } + } + // Check for versioned resources (name:version) + if (req.params.name && req.params.version) { + return `${req.params.name}:${req.params.version}` + } + } + + // Try query parameters + if (req.query) { + if (req.query.uuid) return req.query.uuid + if (req.query.id) return req.query.id + if (req.query.name) return req.query.name + } + + // Try request body (for POST/PUT/PATCH) + if (req.body) { + if (req.body.uuid) return req.body.uuid + if (req.body.id) return req.body.id + if (req.body.name) return req.body.name + } + + return null +} + +/** + * Extract username from Keycloak JWT token + * @param {string} token - Bearer token string (with or without "Bearer " prefix) + * @returns {string|null} Username or null + */ +function extractUsernameFromToken (token) { + if (!token) return null + + try { + // Remove "Bearer " prefix if present + const cleanToken = token.replace(/^Bearer\s+/i, '') + const tokenParts = cleanToken.split('.') + if (tokenParts.length === 3) { + const payload = JSON.parse(Buffer.from(tokenParts[1], 'base64').toString()) + return payload.preferred_username || null + } + } catch (error) { + logger.debug('Failed to extract username from token:', error) + } + + return null +} + +/** + * Extract IPv4 address from request, converting IPv6-mapped IPv4 addresses + * @param {object} req - Express request object + * @returns {string|null} IPv4 address or null + */ +function extractIPv4Address (req) { + if (!req) return null + + let ipAddress = null + + // Try req.ip first (Express sets this) + if (req.ip) { + ipAddress = req.ip + } else if (req.connection && req.connection.remoteAddress) { + ipAddress = req.connection.remoteAddress + } else if (req.socket && req.socket.remoteAddress) { + ipAddress = req.socket.remoteAddress + } + + if (!ipAddress) { + return null + } + + // Convert IPv6-mapped IPv4 address (::ffff:127.0.0.1) to IPv4 (127.0.0.1) + if (ipAddress.startsWith('::ffff:')) { + return ipAddress.substring(7) // Remove '::ffff:' prefix + } + + // Filter out pure IPv6 addresses (like ::1) + if (ipAddress.includes(':')) { + return null + } + + // Return IPv4 address as-is + return ipAddress +} + +/** + * Sanitize URL path by removing sensitive query parameters + * Prevents storing tokens and other sensitive data in audit logs + * @param {string} urlPath - Full URL path with query string + * @returns {string} Sanitized path without sensitive query parameters + */ +function sanitizeEndpointPath (urlPath) { + if (!urlPath) return urlPath + + try { + // List of sensitive query parameters to remove + const sensitiveParams = ['token', 'access_token', 'refresh_token', 'api_key', 'apikey', 'password', 'secret'] + + // Split URL into path and query string + const [path, queryString] = urlPath.split('?') + + // If no query string, return path as-is + if (!queryString) { + return path + } + + // Parse query parameters + const params = new URLSearchParams(queryString) + const sanitizedParams = new URLSearchParams() + + // Copy only non-sensitive parameters + for (const [key, value] of params.entries()) { + if (!sensitiveParams.includes(key.toLowerCase())) { + sanitizedParams.append(key, value) + } + } + + // Reconstruct URL + const sanitizedQuery = sanitizedParams.toString() + return sanitizedQuery ? `${path}?${sanitizedQuery}` : path + } catch (error) { + // If parsing fails, return path without query string as fallback + logger.debug('Failed to sanitize endpoint path, removing query string:', error) + const [path] = urlPath.split('?') + return path + } +} + +/** + * Extract actor ID from request (user or agent) + * @param {object} req - Express request object + * @returns {string|null} Actor ID (username or fog UUID) or null + */ +function extractActorId (req) { + if (!req) return null + + // Check if it's an agent endpoint + if (req.path && req.path.startsWith('/api/v3/agent/')) { + // Extract fog UUID from JWT token + try { + const authHeader = req.headers.authorization + if (authHeader) { + const [scheme, token] = authHeader.split(' ') + if (scheme.toLowerCase() === 'bearer' && token) { + const tokenParts = token.split('.') + if (tokenParts.length === 3) { + const payload = JSON.parse(Buffer.from(tokenParts[1], 'base64').toString()) + return payload.sub || null + } + } + } + } catch (error) { + logger.debug('Failed to extract fog UUID from token:', error) + } + return null + } + + // User endpoint - try Keycloak middleware first (for HTTP requests) + try { + if (req.kauth && req.kauth.grant && req.kauth.grant.access_token && + req.kauth.grant.access_token.content && req.kauth.grant.access_token.content.preferred_username) { + return req.kauth.grant.access_token.content.preferred_username + } + } catch (error) { + logger.debug('Failed to extract username from Keycloak middleware:', error) + } + + // Fallback: extract from token directly (for WebSocket connections) + try { + const authHeader = req.headers.authorization + if (authHeader) { + const username = extractUsernameFromToken(authHeader) + if (username) { + return username + } + } + } catch (error) { + logger.debug('Failed to extract username from token:', error) + } + + return null +} + +/** + * Determine status from HTTP status code + * @param {number} statusCode - HTTP status code + * @returns {string} 'SUCCESS' or 'FAILED' + */ +function determineStatus (statusCode) { + if (!statusCode) return 'FAILED' + return (statusCode >= 200 && statusCode < 300) ? 'SUCCESS' : 'FAILED' +} + +/** + * Determine status from WebSocket close code + * @param {number} closeCode - WebSocket close code + * @returns {string} 'SUCCESS' or 'FAILED' + */ +function determineWsStatus (closeCode) { + // Normal closure (1000) and going away (1001) are considered success + // Other codes indicate errors + if (closeCode === 1000 || closeCode === 1001) { + return 'SUCCESS' + } + return 'FAILED' +} + +/** + * Create an event record + * @param {object} eventData - Event data object + * @param {object} transaction - Database transaction + * @returns {Promise} Created event + */ +async function createEvent (eventData, transaction) { + const eventRecord = { + timestamp: eventData.timestamp || Date.now(), + eventType: eventData.eventType, + endpointType: eventData.endpointType, + actorId: eventData.actorId || null, + method: eventData.method || null, + resourceType: eventData.resourceType || null, + resourceId: eventData.resourceId || null, + endpointPath: eventData.endpointPath, + ipAddress: eventData.ipAddress || null, + status: eventData.status, + statusCode: eventData.statusCode || null, + statusMessage: eventData.statusMessage || null, + requestId: eventData.requestId || null + } + + return EventManager.create(eventRecord, transaction) +} + +/** + * Create event from HTTP request/response + * @param {object} req - Express request object + * @param {object} res - Express response object + * @param {number} startTime - Request start timestamp + * @returns {Promise} + */ +async function createHttpEvent (req, res, startTime) { + // Check if auditing is enabled + const auditEnabled = process.env.EVENT_AUDIT_ENABLED || config.get('settings.eventAuditEnabled', true) + if (!auditEnabled) { + return + } + + // Only track non-GET methods + if (req.method === 'GET') { + return + } + + // Don't audit DELETE on events endpoint (handled explicitly in controller) + if (req.method === 'DELETE' && req.path === '/api/v3/events') { + return + } + + const captureIp = process.env.EVENT_CAPTURE_IP_ADDRESS || config.get('settings.eventCaptureIpAddress', true) + const endpointType = req.path.startsWith('/api/v3/agent/') ? 'agent' : 'user' + const actorId = extractActorId(req) + const resourceType = extractResourceType(req.path) + const resourceId = extractResourceId(req) + const status = determineStatus(res.statusCode) + + const eventData = { + timestamp: startTime, + eventType: 'HTTP', + endpointType: endpointType, + actorId: actorId, + method: req.method, + resourceType: resourceType, + resourceId: resourceId, + endpointPath: req.path, + ipAddress: captureIp ? extractIPv4Address(req) : null, + status: status, + statusCode: res.statusCode, + statusMessage: status === 'SUCCESS' ? 'Success' : `HTTP ${res.statusCode}`, + requestId: req.id || null + } + + // Use fake transaction for non-blocking event creation + await createEvent(eventData, { fakeTransaction: true }).catch(err => { + logger.error('Event logging failed (non-blocking):', err) + }) +} + +/** + * Create WebSocket connection event + * @param {object} connectionData - Connection data + * @returns {Promise} + */ +async function createWsConnectEvent (connectionData) { + // Check if auditing is enabled + const auditEnabled = process.env.EVENT_AUDIT_ENABLED || config.get('settings.eventAuditEnabled', true) + if (!auditEnabled) { + return + } + + const captureIp = process.env.EVENT_CAPTURE_IP_ADDRESS || config.get('settings.eventCaptureIpAddress', true) + const endpointType = connectionData.endpointType || 'user' + // Sanitize path to remove sensitive query parameters (e.g., token) + const sanitizedPath = sanitizeEndpointPath(connectionData.path) + const resourceType = extractResourceType(sanitizedPath) + + const eventData = { + timestamp: connectionData.timestamp || Date.now(), + eventType: 'WS_CONNECT', + endpointType: endpointType, + actorId: connectionData.actorId || null, + method: 'WS', + resourceType: resourceType, + resourceId: connectionData.resourceId || null, + endpointPath: sanitizedPath, + ipAddress: captureIp ? (connectionData.ipAddress || null) : null, + status: 'SUCCESS', + statusCode: null, + statusMessage: 'WebSocket connection established', + requestId: null + } + + // Use fake transaction for non-blocking event creation + await createEvent(eventData, { fakeTransaction: true }).catch(err => { + logger.error('WebSocket connect event logging failed (non-blocking):', err) + }) +} + +/** + * Create WebSocket disconnection event + * @param {object} connectionData - Connection data + * @returns {Promise} + */ +async function createWsDisconnectEvent (connectionData) { + // Check if auditing is enabled + const auditEnabled = process.env.EVENT_AUDIT_ENABLED || config.get('settings.eventAuditEnabled', true) + if (!auditEnabled) { + return + } + + const captureIp = process.env.EVENT_CAPTURE_IP_ADDRESS || config.get('settings.eventCaptureIpAddress', true) + const endpointType = connectionData.endpointType || 'user' + // Sanitize path to remove sensitive query parameters (e.g., token) + const sanitizedPath = sanitizeEndpointPath(connectionData.path) + const resourceType = extractResourceType(sanitizedPath) + const status = determineWsStatus(connectionData.closeCode) + + const eventData = { + timestamp: connectionData.timestamp || Date.now(), + eventType: 'WS_DISCONNECT', + endpointType: endpointType, + actorId: connectionData.actorId || null, + method: 'WS', + resourceType: resourceType, + resourceId: connectionData.resourceId || null, + endpointPath: sanitizedPath, + ipAddress: captureIp ? (connectionData.ipAddress || null) : null, + status: status, + statusCode: connectionData.closeCode || null, + statusMessage: status === 'SUCCESS' ? 'WebSocket connection closed normally' : `WebSocket closed with code ${connectionData.closeCode}`, + requestId: null + } + + // Use fake transaction for non-blocking event creation + await createEvent(eventData, { fakeTransaction: true }).catch(err => { + logger.error('WebSocket disconnect event logging failed (non-blocking):', err) + }) +} + +/** + * Parse time from query parameter (Unix timestamp or ISO 8601) + * @param {string|number} timeValue - Time value + * @returns {number|null} Unix timestamp in milliseconds or null + */ +function parseTime (timeValue) { + if (timeValue === undefined || timeValue === null || timeValue === '') return null + + if (typeof timeValue === 'number') { + return timeValue < 10000000000 ? timeValue * 1000 : timeValue + } + + if (typeof timeValue === 'string' && /^\d+$/.test(timeValue)) { + const timestamp = parseInt(timeValue) + return timestamp < 10000000000 ? timestamp * 1000 : timestamp + } + + try { + const date = new Date(timeValue) + if (!isNaN(date.getTime())) { + return date.getTime() + } + } catch (error) { + // Ignore parsing errors + } + + return null +} + +/** + * Normalize event payloads before returning to clients + * Ensures timestamp is always a number + * @param {object} event - Sequelize event instance + * @returns {object} normalized plain object + */ +function normalizeEventForResponse (event) { + const json = typeof event.toJSON === 'function' ? event.toJSON() : { ...event } + + if (json.timestamp !== undefined && json.timestamp !== null) { + const numericTimestamp = Number(json.timestamp) + if (!Number.isNaN(numericTimestamp)) { + json.timestamp = numericTimestamp + } + } + + return json +} + +/** + * List events with filters and pagination + * @param {object} params - Parameters containing raw query object + * @param {object} context - Optional context (req, user, etc.) + * @returns {Promise} Events list with pagination info + */ +async function listEvents (params = {}, context = {}, transaction) { + const query = params.query || {} + await Validator.validate(query, Validator.schemas.eventListQuery) + + const filters = {} + + if (query.startTime !== undefined) { + const startTime = parseTime(query.startTime) + if (startTime === null) { + throw new Errors.ValidationError('Invalid startTime format. Use Unix timestamp (seconds or milliseconds) or ISO 8601 format (e.g., 2023-10-01T12:00:00Z)') + } + filters.startTime = startTime + } + + if (query.endTime !== undefined) { + const endTime = parseTime(query.endTime) + if (endTime === null) { + throw new Errors.ValidationError('Invalid endTime format. Use Unix timestamp (seconds or milliseconds) or ISO 8601 format (e.g., 2023-10-01T12:00:00Z)') + } + filters.endTime = endTime + } + + if (filters.startTime && filters.endTime && filters.startTime > filters.endTime) { + throw new Errors.ValidationError('startTime must be before or equal to endTime') + } + + if (query.endpointType) { + filters.endpointType = query.endpointType + } + + if (query.resourceType) { + filters.resourceType = query.resourceType + } + + if (query.status) { + filters.status = query.status + } + + if (query.method) { + filters.method = Array.isArray(query.method) ? query.method : [query.method] + } + + if (query.actorId) { + filters.actorId = query.actorId + } + + if (query.eventType) { + filters.eventType = query.eventType + } + + let limit = 200 + if (query.limit !== undefined && query.limit !== null && query.limit !== '') { + const parsedLimit = parseInt(query.limit) + if (!isNaN(parsedLimit) && parsedLimit > 0) { + limit = Math.min(parsedLimit, 1000) + } + } + + let offset = 0 + if (query.offset !== undefined && query.offset !== null && query.offset !== '') { + const parsedOffset = parseInt(query.offset) + if (!isNaN(parsedOffset) && parsedOffset >= 0) { + offset = parsedOffset + } + } + + filters.limit = limit + filters.offset = offset + + const result = await EventManager.findAllWithFilters(filters, transaction) + + return { + events: result.events.map(normalizeEventForResponse), + total: result.total, + limit: result.limit, + offset: result.offset + } +} + +/** + * Delete events based on retention configuration + * @param {object} params - Parameters containing days + * @param {object} context - Additional context (req, etc.) + * @returns {Promise} Deletion summary + */ +async function deleteEvents (params = {}, context = {}, transaction) { + const body = params.body || {} + await Validator.validate(body, Validator.schemas.eventDeleteRequest) + + const { days } = body + const request = context.req || {} + + const cutoffTimestamp = days > 0 ? Date.now() - (days * 24 * 60 * 60 * 1000) : null + const deletedCount = await EventManager.deleteEventsOlderThanDays(days, transaction) + + setImmediate(async () => { + try { + const captureIp = process.env.EVENT_CAPTURE_IP_ADDRESS || config.get('settings.eventCaptureIpAddress', true) + const endpointType = request.path && request.path.startsWith('/api/v3/agent/') ? 'agent' : 'user' + const actorId = extractActorId(request) + + await createEvent({ + timestamp: Date.now(), + eventType: 'HTTP', + endpointType, + actorId, + method: 'DELETE', + resourceType: 'event', + resourceId: null, + endpointPath: '/api/v3/events', + ipAddress: captureIp ? extractIPv4Address(request) : null, + status: 'SUCCESS', + statusCode: 200, + statusMessage: days === 0 ? `Deleted all ${deletedCount} events` : `Deleted ${deletedCount} events older than ${days} days`, + requestId: request.id || null + }, { fakeTransaction: true }).catch(err => { + logger.error('Failed to create DELETE events audit record (non-blocking):', err) + }) + } catch (error) { + logger.error('Error creating DELETE events audit record (non-blocking):', error) + } + }) + + return { + deletedCount, + deletedBefore: cutoffTimestamp ? new Date(cutoffTimestamp).toISOString() : null, + deletedAt: new Date().toISOString(), + deletedAll: days === 0 + } +} + +module.exports = { + extractResourceType, + extractResourceId, + extractActorId, + extractUsernameFromToken, + extractIPv4Address, + determineStatus, + determineWsStatus, + createEvent: TransactionDecorator.generateTransaction(createEvent), + createHttpEvent, + createWsConnectEvent, + createWsDisconnectEvent, + listEvents: TransactionDecorator.generateTransaction(listEvents), + deleteEvents: TransactionDecorator.generateTransaction(deleteEvents) +} diff --git a/src/services/iofog-service.js b/src/services/iofog-service.js index d305ed0d..b6ef2dc1 100644 --- a/src/services/iofog-service.js +++ b/src/services/iofog-service.js @@ -896,7 +896,7 @@ async function generateProvisioningKeyEndPoint (fogData, isCLI, transaction) { const newProvision = { iofogUuid: fogData.uuid, provisionKey: AppHelper.generateUUID(), - expirationTime: new Date().getTime() + (10 * 60 * 1000) + expirationTime: new Date().getTime() + (20 * 60 * 1000) } const fog = await FogManager.findOne(queryFogData, transaction) diff --git a/src/websocket/server.js b/src/websocket/server.js index fc694bb6..3ef2e88d 100644 --- a/src/websocket/server.js +++ b/src/websocket/server.js @@ -24,6 +24,8 @@ const MESSAGE_TYPES = { ACTIVATION: 5 } +const EventService = require('../services/event-service') + class WebSocketServer { constructor () { this.wss = null @@ -299,6 +301,8 @@ class WebSocketServer { // If token is found in query params, format it as Bearer token if (token) { token = `Bearer ${token}` + // Store in headers for event creation code + req.headers.authorization = token } } @@ -438,6 +442,38 @@ class WebSocketServer { microserviceUuid: msgMicroserviceUuid })) await this.setupMessageForwarding(execId, transaction) + + // Record WebSocket connection event (non-blocking) + setImmediate(async () => { + try { + const authHeader = req.headers.authorization + let actorId = null + if (authHeader) { + const [scheme, token] = authHeader.split(' ') + if (scheme.toLowerCase() === 'bearer' && token) { + try { + const tokenParts = token.split('.') + if (tokenParts.length === 3) { + const payload = JSON.parse(Buffer.from(tokenParts[1], 'base64').toString()) + actorId = payload.sub || null + } + } catch (err) { + // Ignore token parsing errors + } + } + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'agent', + actorId: actorId, + path: req.url, + resourceId: msgMicroserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) } else { this.attachPendingKeepAliveHandler(ws) try { @@ -480,6 +516,39 @@ class WebSocketServer { microserviceUuid: msgMicroserviceUuid })) await this.setupMessageForwarding(execId, transaction) + + // Record WebSocket connection event for agent (non-blocking) + // This covers the case when agent connects but no user is waiting (cross-replica or normal) + setImmediate(async () => { + try { + const authHeader = req.headers.authorization + let actorId = null + if (authHeader) { + const [scheme, token] = authHeader.split(' ') + if (scheme.toLowerCase() === 'bearer' && token) { + try { + const tokenParts = token.split('.') + if (tokenParts.length === 3) { + const payload = JSON.parse(Buffer.from(tokenParts[1], 'base64').toString()) + actorId = payload.sub || null + } + } catch (err) { + // Ignore token parsing errors + } + } + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'agent', + actorId: actorId, + path: req.url, + resourceId: msgMicroserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) } catch (error) { logger.warn('[WS-SESSION] Failed to enable queue bridge for pending agent, will use direct relay when user connects:', { execId, @@ -487,6 +556,38 @@ class WebSocketServer { error: error.message }) agentOnlySession.queueBridgeEnabled = false + + // Record WebSocket connection event even if queue bridge failed (non-blocking) + setImmediate(async () => { + try { + const authHeader = req.headers.authorization + let actorId = null + if (authHeader) { + const [scheme, token] = authHeader.split(' ') + if (scheme.toLowerCase() === 'bearer' && token) { + try { + const tokenParts = token.split('.') + if (tokenParts.length === 3) { + const payload = JSON.parse(Buffer.from(tokenParts[1], 'base64').toString()) + actorId = payload.sub || null + } + } catch (err) { + // Ignore token parsing errors + } + } + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'agent', + actorId: actorId, + path: req.url, + resourceId: msgMicroserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) } } } @@ -503,7 +604,40 @@ class WebSocketServer { })) // Handle connection close - ws.on('close', async () => { + ws.on('close', async (code, reason) => { + // Record WebSocket disconnection event (non-blocking) + setImmediate(async () => { + try { + const authHeader = req.headers.authorization + let actorId = null + if (authHeader) { + const [scheme, token] = authHeader.split(' ') + if (scheme.toLowerCase() === 'bearer' && token) { + try { + const tokenParts = token.split('.') + if (tokenParts.length === 3) { + const payload = JSON.parse(Buffer.from(tokenParts[1], 'base64').toString()) + actorId = payload.sub || null + } + } catch (err) { + // Ignore token parsing errors + } + } + } + await EventService.createWsDisconnectEvent({ + timestamp: Date.now(), + endpointType: 'agent', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null, + closeCode: code + }) + } catch (err) { + logger.error('Failed to create WS_DISCONNECT event (non-blocking):', err) + } + }) + for (const [execId, session] of this.sessionManager.sessions) { if (session.agent === ws) { // In cross-replica scenarios, send CLOSE message to user via queue @@ -631,6 +765,27 @@ class WebSocketServer { agentState: pendingAgent.readyState }) await this.setupMessageForwarding(availableExecId, transaction) + + // Record WebSocket connection event (non-blocking) + setImmediate(async () => { + try { + // Extract actorId from token (req.kauth not available for WebSocket connections) + let actorId = null + if (req.headers && req.headers.authorization) { + actorId = EventService.extractUsernameFromToken(req.headers.authorization) + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'user', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) return } } else { @@ -652,6 +807,27 @@ class WebSocketServer { microserviceUuid, userState: ws.readyState }) + + // Record WebSocket connection event (non-blocking) + setImmediate(async () => { + try { + // Extract actorId from token (req.kauth not available for WebSocket connections) + let actorId = null + if (req.headers && req.headers.authorization) { + actorId = EventService.extractUsernameFromToken(req.headers.authorization) + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'user', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) return } } @@ -691,6 +867,26 @@ class WebSocketServer { })) await this.setupMessageForwarding(availableExecId, transaction) + + // Record WebSocket connection event (non-blocking) + setImmediate(async () => { + try { + let actorId = null + if (req.headers && req.headers.authorization) { + actorId = EventService.extractUsernameFromToken(req.headers.authorization) + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'user', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) return // Exit early, session activated successfully } } else { @@ -713,6 +909,26 @@ class WebSocketServer { microserviceUuid, userState: ws.readyState }) + + // Record WebSocket connection event (non-blocking) + setImmediate(async () => { + try { + let actorId = null + if (req.headers && req.headers.authorization) { + actorId = EventService.extractUsernameFromToken(req.headers.authorization) + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'user', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) return } } @@ -783,6 +999,26 @@ class WebSocketServer { })) await this.setupMessageForwarding(availableExecId, transaction) + + // Record WebSocket connection event (non-blocking) + setImmediate(async () => { + try { + let actorId = null + if (req.headers && req.headers.authorization) { + actorId = EventService.extractUsernameFromToken(req.headers.authorization) + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'user', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) clearInterval(retryTimer) // Stop retry timer return // Exit early, session activated successfully } @@ -806,6 +1042,26 @@ class WebSocketServer { microserviceUuid, userState: ws.readyState }) + + // Record WebSocket connection event (non-blocking) + setImmediate(async () => { + try { + let actorId = null + if (req.headers && req.headers.authorization) { + actorId = EventService.extractUsernameFromToken(req.headers.authorization) + } + await EventService.createWsConnectEvent({ + timestamp: Date.now(), + endpointType: 'user', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null + }) + } catch (err) { + logger.error('Failed to create WS_CONNECT event (non-blocking):', err) + } + }) clearInterval(retryTimer) // Stop retry timer return } @@ -881,7 +1137,29 @@ class WebSocketServer { } }, PENDING_USER_TIMEOUT) - ws.on('close', () => { + ws.on('close', (code, reason) => { + // Record WebSocket disconnection event (non-blocking) + setImmediate(async () => { + try { + // Extract actorId from token (req.kauth not available for WebSocket connections) + let actorId = null + if (req.headers && req.headers.authorization) { + actorId = EventService.extractUsernameFromToken(req.headers.authorization) + } + await EventService.createWsDisconnectEvent({ + timestamp: Date.now(), + endpointType: 'user', + actorId: actorId, + path: req.url, + resourceId: microserviceUuid, + ipAddress: EventService.extractIPv4Address(req) || null, + closeCode: code + }) + } catch (err) { + logger.error('Failed to create WS_DISCONNECT event (non-blocking):', err) + } + }) + for (const [execId, session] of this.sessionManager.sessions) { if (session.user === ws) { this.cleanupSession(execId, transaction)