diff --git a/src/entities/player.ts b/src/entities/player.ts index 26592efb..cb8e6448 100644 --- a/src/entities/player.ts +++ b/src/entities/player.ts @@ -160,7 +160,7 @@ export default class Player { return ( conn.hasScope(APIKeyScope.READ_PLAYER_PRESENCE) && !!conn.playerAliasId && - this.game.id === conn.game.id + this.game.id === conn.gameId ) }) await sendMessages(conns, 'v1.players.presence.updated', { diff --git a/src/index.ts b/src/index.ts index f73448fd..80cb0cfb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,6 +18,7 @@ import Socket from './socket' import httpTracingMiddleware from './middleware/http-tracing-middleware' import { secondsToMilliseconds } from 'date-fns' import compress from 'koa-compress' +import { EntityManager } from '@mikro-orm/mysql' const isTest = process.env.NODE_ENV === 'test' @@ -49,7 +50,7 @@ export default async function init(): Promise { keepAliveTimeout: secondsToMilliseconds(60) }, app.callback()) - app.context.wss = new Socket(server, app.context.em) + app.context.wss = new Socket(server, (app.context.em as EntityManager).fork()) if (!isTest) { server.listen(80, () => console.info('Listening on port 80')) diff --git a/src/services/game.service.ts b/src/services/game.service.ts index c83d6beb..23578527 100644 --- a/src/services/game.service.ts +++ b/src/services/game.service.ts @@ -19,7 +19,7 @@ import updateAllowedKeys from '../lib/entities/updateAllowedKeys' async function sendLiveConfigUpdatedMessage(req: Request, game: Game) { const socket: Socket = req.ctx.wss const conns = socket.findConnections((conn) => { - return conn.game.id === game.id && conn.hasScope(APIKeyScope.READ_GAME_CONFIG) + return conn.gameId === game.id && conn.hasScope(APIKeyScope.READ_GAME_CONFIG) }) await sendMessages(conns, 'v1.live-config.updated', { config: game.getLiveConfig() diff --git a/src/socket/index.ts b/src/socket/index.ts index 3a7d5b2f..76a98bb0 100644 --- a/src/socket/index.ts +++ b/src/socket/index.ts @@ -32,15 +32,12 @@ export default class Socket { constructor(server: Server, private readonly em: EntityManager) { this.wss = new WebSocketServer({ server }) this.wss.on('connection', async (ws, req) => { - await getSocketTracer().startActiveSpan('socket.event_handler', async (span) => { - await this.handleConnection(ws, req) - - ws.on('message', (data) => this.handleMessage(ws, data)) - ws.on('pong', () => this.handlePong(ws)) - ws.on('close', () => this.closeConnection(ws, { preclosed: true })) - ws.on('error', captureException) - span.end() - }) + await this.handleConnection(ws, req) + + ws.on('message', (data) => this.handleMessage(ws, data)) + ws.on('pong', () => this.handlePong(ws)) + ws.on('close', () => this.closeConnection(ws, { preclosed: true })) + ws.on('error', captureException) }) this.router = new SocketRouter(this) @@ -80,32 +77,34 @@ export default class Socket { async handleConnection(ws: WebSocket, req: IncomingMessage): Promise { withIsolationScope(async () => { await getSocketTracer().startActiveSpan('socket.open', async (span) => { - logConnection(req) - - await RequestContext.create(this.em, async () => { - const url = new URL(req.url!, 'http://localhost') - const ticket = new SocketTicket(url.searchParams.get('ticket') ?? '') - - if (await ticket.validate(this.redis)) { - const connection = new SocketConnection(this, ws, ticket, req.socket.remoteAddress!) - this.connections.set(ws, connection) - - await this.trackEvent({ - eventType: 'open', - reqOrRes: 'req', - code: null, - gameId: connection.game.id, - playerAliasId: null, - devBuild: ticket.devBuild - }) - - await sendMessage(connection, 'v1.connected', {}) - } else { - await this.closeConnection(ws) - } - }) - - span.end() + try { + logConnection(req) + + await RequestContext.create(this.em, async () => { + const url = new URL(req.url!, 'http://localhost') + const ticket = new SocketTicket(url.searchParams.get('ticket') ?? '') + + if (await ticket.validate(this.redis)) { + const connection = new SocketConnection(this, ws, ticket, req.socket.remoteAddress!) + this.connections.set(ws, connection) + + await this.trackEvent({ + eventType: 'open', + reqOrRes: 'req', + code: null, + gameId: connection.gameId, + playerAliasId: null, + devBuild: ticket.devBuild + }) + + await sendMessage(connection, 'v1.connected', {}) + } else { + await this.closeConnection(ws) + } + }) + } finally { + span.end() + } }) }) } @@ -113,17 +112,19 @@ export default class Socket { async handleMessage(ws: WebSocket, data: RawData): Promise { withIsolationScope(async () => { await getSocketTracer().startActiveSpan('socket.message', async (span) => { - await RequestContext.create(this.em, async () => { - const connection = this.connections.get(ws) - if (connection) { - await this.router.handleMessage(connection, data) - /* v8 ignore next 3 */ - } else { - await this.closeConnection(ws) - } - }) - - span.end() + try { + await RequestContext.create(this.em, async () => { + const connection = this.connections.get(ws) + if (connection) { + await this.router.handleMessage(connection, data) + /* v8 ignore next 3 */ + } else { + await this.closeConnection(ws) + } + }) + } finally { + span.end() + } }) }) } @@ -166,7 +167,7 @@ export default class Socket { eventType: 'close', reqOrRes: preclosed ? 'req' : 'res', code: preclosed ? null : code.toString(), - gameId: connection.game.id, + gameId: connection.gameId, playerAliasId: connection.playerAliasId, devBuild: connection.isDevBuild() }) diff --git a/src/socket/listeners/gameChannelListeners.ts b/src/socket/listeners/gameChannelListeners.ts index 506494a4..f308e443 100644 --- a/src/socket/listeners/gameChannelListeners.ts +++ b/src/socket/listeners/gameChannelListeners.ts @@ -5,6 +5,7 @@ import { RequestContext } from '@mikro-orm/mysql' import GameChannel from '../../entities/game-channel' import { sendMessages } from '../messages/socketMessage' import { APIKeyScope } from '../../entities/api-key' +import { getResultCacheOptions } from '../../lib/perf/getResultCacheOptions' const gameChannelListeners = [ createListener( @@ -21,9 +22,10 @@ const gameChannelListeners = [ .getRepository(GameChannel) .findOne({ id: data.channel.id, - game: conn.game + game: conn.gameId }, { - populate: ['members:ref'] + populate: ['members:ref'], + ...getResultCacheOptions(`channel-listener-members-${data.channel.id}`, 1000) }) if (!channel) { diff --git a/src/socket/listeners/playerListeners.ts b/src/socket/listeners/playerListeners.ts index fd0f99ae..ea4bb45a 100644 --- a/src/socket/listeners/playerListeners.ts +++ b/src/socket/listeners/playerListeners.ts @@ -28,7 +28,7 @@ const playerListeners = [ .findOneOrFail({ id: data.playerAliasId, player: { - game: conn.game + game: conn.gameId } }, { populate: ['player.auth'] diff --git a/src/socket/messages/socketMessage.ts b/src/socket/messages/socketMessage.ts index 2ad88c4b..623f1f97 100644 --- a/src/socket/messages/socketMessage.ts +++ b/src/socket/messages/socketMessage.ts @@ -33,7 +33,8 @@ export async function sendMessages(conns: SocketConnection[], await getSocketTracer().startActiveSpan('socket.send_many_messages', async (span) => { try { const message = JSON.stringify({ res: type, data }) - await Promise.all(conns.map((conn) => conn.sendMessage(type, data, message))) + // pass empty object as data since we already have the serialised message + await Promise.all(conns.map((conn) => conn.sendMessage(type, {} as T, message))) } finally { span.end() } diff --git a/src/socket/router/socketRouter.ts b/src/socket/router/socketRouter.ts index a4bc934c..8e4e18d9 100644 --- a/src/socket/router/socketRouter.ts +++ b/src/socket/router/socketRouter.ts @@ -78,7 +78,7 @@ export default class SocketRouter { eventType: message.req, reqOrRes: 'req', code: null, - gameId: conn.game.id, + gameId: conn.gameId, playerAliasId: conn.playerAliasId, devBuild: conn.isDevBuild() }) diff --git a/src/socket/socketConnection.ts b/src/socket/socketConnection.ts index 548a31f1..905fbcde 100644 --- a/src/socket/socketConnection.ts +++ b/src/socket/socketConnection.ts @@ -1,7 +1,6 @@ import { WebSocket } from 'ws' import PlayerAlias from '../entities/player-alias' -import Game from '../entities/game' -import APIKey, { APIKeyScope } from '../entities/api-key' +import { APIKeyScope } from '../entities/api-key' import { RequestContext, EntityManager } from '@mikro-orm/mysql' import { v4 } from 'uuid' import Redis from 'ioredis' @@ -13,12 +12,15 @@ import { SocketErrorCode } from './messages/socketError' import SocketTicket from './socketTicket' import { setTraceAttributes } from '@hyperdx/node-opentelemetry' import { getSocketTracer } from './socketTracer' +import { getResultCacheOptions } from '../lib/perf/getResultCacheOptions' export default class SocketConnection { alive: boolean = true playerAliasId!: number - game: Game - private apiKey: APIKey + readonly gameId: number + private readonly apiKeyId: number + private readonly apiKeyScopes: APIKeyScope[] + private readonly devBuild: boolean rateLimitKey: string = `requests.socket:${v4()}` rateLimitWarnings: number = 0 @@ -26,11 +28,13 @@ export default class SocketConnection { constructor( private readonly wss: Socket, private readonly ws: WebSocket, - private readonly ticket: SocketTicket, + ticket: SocketTicket, private readonly remoteAddress: string ) { - this.game = this.ticket.apiKey.game - this.apiKey = this.ticket.apiKey + this.gameId = ticket.apiKey.game.id + this.apiKeyId = ticket.apiKey.id + this.apiKeyScopes = ticket.apiKey.scopes + this.devBuild = ticket.devBuild } async getPlayerAlias(): Promise { @@ -39,15 +43,18 @@ export default class SocketConnection { if (!em) { throw new Error('Missing request context for entity manager') } - return em.repo(PlayerAlias).findOne(this.playerAliasId) + return em.repo(PlayerAlias).findOne( + this.playerAliasId, + getResultCacheOptions(`socket-connection-alias-${this.playerAliasId}`, 1000) + ) } getAPIKeyId(): number { - return this.ticket.apiKey.id + return this.apiKeyId } hasScope(scope: APIKeyScope): boolean { - return this.apiKey.scopes.includes(APIKeyScope.FULL_ACCESS) || this.apiKey.scopes.includes(scope) + return this.apiKeyScopes.includes(APIKeyScope.FULL_ACCESS) || this.apiKeyScopes.includes(scope) } hasScopes(scopes: APIKeyScope[]): boolean { @@ -78,35 +85,37 @@ export default class SocketConnection { } isDevBuild(): boolean { - return this.ticket.devBuild + return this.devBuild } async sendMessage(res: SocketMessageResponse, data: T, serialisedMessage?: string): Promise { await getSocketTracer().startActiveSpan('socket.send_message', async (span) => { - if (this.ws.readyState === this.ws.OPEN) { - const devBuild = this.isDevBuild() - const message = serialisedMessage ?? JSON.stringify({ res, data }) - - setTraceAttributes({ - 'socket.message_receiver.alias_id': this.playerAliasId, - 'socket.message_receiver.dev_build': devBuild - }) - - logResponse(this, res, message) - - await this.wss.trackEvent({ - eventType: res, - reqOrRes: 'res', - code: 'errorCode' in data ? (data.errorCode as SocketErrorCode) : null, - gameId: this.game.id, - playerAliasId: this.playerAliasId, - devBuild: devBuild - }) - - this.ws.send(message) + try { + if (this.ws.readyState === this.ws.OPEN) { + const devBuild = this.isDevBuild() + const message = serialisedMessage ?? JSON.stringify({ res, data }) + + setTraceAttributes({ + 'socket.message_receiver.alias_id': this.playerAliasId, + 'socket.message_receiver.dev_build': devBuild + }) + + logResponse(this, res, message) + + await this.wss.trackEvent({ + eventType: res, + reqOrRes: 'res', + code: 'errorCode' in data ? (data.errorCode as SocketErrorCode) : null, + gameId: this.gameId, + playerAliasId: this.playerAliasId, + devBuild: devBuild + }) + + this.ws.send(message) + } + } finally { + span.end() } - - span.end() }) }