Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/entities/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export default class Player {
return (
conn.hasScope(APIKeyScope.READ_PLAYER_PRESENCE) &&
!!conn.playerAliasId &&
this.game.id === conn.game.id
this.game.id === conn.gameId
)
})
await sendMessages(conns, 'v1.players.presence.updated', {
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Socket from './socket'
import httpTracingMiddleware from './middleware/http-tracing-middleware'
import { secondsToMilliseconds } from 'date-fns'
import compress from 'koa-compress'
import { EntityManager } from '@mikro-orm/mysql'

const isTest = process.env.NODE_ENV === 'test'

Expand Down Expand Up @@ -49,7 +50,7 @@ export default async function init(): Promise<Koa> {
keepAliveTimeout: secondsToMilliseconds(60)
}, app.callback())

app.context.wss = new Socket(server, app.context.em)
app.context.wss = new Socket(server, (app.context.em as EntityManager).fork())

if (!isTest) {
server.listen(80, () => console.info('Listening on port 80'))
Expand Down
2 changes: 1 addition & 1 deletion src/services/game.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import updateAllowedKeys from '../lib/entities/updateAllowedKeys'
async function sendLiveConfigUpdatedMessage(req: Request, game: Game) {
const socket: Socket = req.ctx.wss
const conns = socket.findConnections((conn) => {
return conn.game.id === game.id && conn.hasScope(APIKeyScope.READ_GAME_CONFIG)
return conn.gameId === game.id && conn.hasScope(APIKeyScope.READ_GAME_CONFIG)
})
await sendMessages(conns, 'v1.live-config.updated', {
config: game.getLiveConfig()
Expand Down
95 changes: 48 additions & 47 deletions src/socket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,12 @@ export default class Socket {
constructor(server: Server, private readonly em: EntityManager) {
this.wss = new WebSocketServer({ server })
this.wss.on('connection', async (ws, req) => {
await getSocketTracer().startActiveSpan('socket.event_handler', async (span) => {
await this.handleConnection(ws, req)

ws.on('message', (data) => this.handleMessage(ws, data))
ws.on('pong', () => this.handlePong(ws))
ws.on('close', () => this.closeConnection(ws, { preclosed: true }))
ws.on('error', captureException)
span.end()
})
await this.handleConnection(ws, req)

ws.on('message', (data) => this.handleMessage(ws, data))
ws.on('pong', () => this.handlePong(ws))
ws.on('close', () => this.closeConnection(ws, { preclosed: true }))
ws.on('error', captureException)
})

this.router = new SocketRouter(this)
Expand Down Expand Up @@ -80,50 +77,54 @@ export default class Socket {
async handleConnection(ws: WebSocket, req: IncomingMessage): Promise<void> {
withIsolationScope(async () => {
await getSocketTracer().startActiveSpan('socket.open', async (span) => {
logConnection(req)

await RequestContext.create(this.em, async () => {
const url = new URL(req.url!, 'http://localhost')
const ticket = new SocketTicket(url.searchParams.get('ticket') ?? '')

if (await ticket.validate(this.redis)) {
const connection = new SocketConnection(this, ws, ticket, req.socket.remoteAddress!)
this.connections.set(ws, connection)

await this.trackEvent({
eventType: 'open',
reqOrRes: 'req',
code: null,
gameId: connection.game.id,
playerAliasId: null,
devBuild: ticket.devBuild
})

await sendMessage(connection, 'v1.connected', {})
} else {
await this.closeConnection(ws)
}
})

span.end()
try {
logConnection(req)

await RequestContext.create(this.em, async () => {
const url = new URL(req.url!, 'http://localhost')
const ticket = new SocketTicket(url.searchParams.get('ticket') ?? '')

if (await ticket.validate(this.redis)) {
const connection = new SocketConnection(this, ws, ticket, req.socket.remoteAddress!)
this.connections.set(ws, connection)

await this.trackEvent({
eventType: 'open',
reqOrRes: 'req',
code: null,
gameId: connection.gameId,
playerAliasId: null,
devBuild: ticket.devBuild
})

await sendMessage(connection, 'v1.connected', {})
} else {
await this.closeConnection(ws)
}
})
} finally {
span.end()
}
})
})
}

async handleMessage(ws: WebSocket, data: RawData): Promise<void> {
withIsolationScope(async () => {
await getSocketTracer().startActiveSpan('socket.message', async (span) => {
await RequestContext.create(this.em, async () => {
const connection = this.connections.get(ws)
if (connection) {
await this.router.handleMessage(connection, data)
/* v8 ignore next 3 */
} else {
await this.closeConnection(ws)
}
})

span.end()
try {
await RequestContext.create(this.em, async () => {
const connection = this.connections.get(ws)
if (connection) {
await this.router.handleMessage(connection, data)
/* v8 ignore next 3 */
} else {
await this.closeConnection(ws)
}
})
} finally {
span.end()
}
})
})
}
Expand Down Expand Up @@ -166,7 +167,7 @@ export default class Socket {
eventType: 'close',
reqOrRes: preclosed ? 'req' : 'res',
code: preclosed ? null : code.toString(),
gameId: connection.game.id,
gameId: connection.gameId,
playerAliasId: connection.playerAliasId,
devBuild: connection.isDevBuild()
})
Expand Down
6 changes: 4 additions & 2 deletions src/socket/listeners/gameChannelListeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { RequestContext } from '@mikro-orm/mysql'
import GameChannel from '../../entities/game-channel'
import { sendMessages } from '../messages/socketMessage'
import { APIKeyScope } from '../../entities/api-key'
import { getResultCacheOptions } from '../../lib/perf/getResultCacheOptions'

const gameChannelListeners = [
createListener(
Expand All @@ -21,9 +22,10 @@ const gameChannelListeners = [
.getRepository(GameChannel)
.findOne({
id: data.channel.id,
game: conn.game
game: conn.gameId
}, {
populate: ['members:ref']
populate: ['members:ref'],
...getResultCacheOptions(`channel-listener-members-${data.channel.id}`, 1000)
})

if (!channel) {
Expand Down
2 changes: 1 addition & 1 deletion src/socket/listeners/playerListeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const playerListeners = [
.findOneOrFail({
id: data.playerAliasId,
player: {
game: conn.game
game: conn.gameId
}
}, {
populate: ['player.auth']
Expand Down
3 changes: 2 additions & 1 deletion src/socket/messages/socketMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ export async function sendMessages<T extends object>(conns: SocketConnection[],
await getSocketTracer().startActiveSpan('socket.send_many_messages', async (span) => {
try {
const message = JSON.stringify({ res: type, data })
await Promise.all(conns.map((conn) => conn.sendMessage(type, data, message)))
// pass empty object as data since we already have the serialised message
await Promise.all(conns.map((conn) => conn.sendMessage(type, {} as T, message)))
} finally {
span.end()
}
Expand Down
2 changes: 1 addition & 1 deletion src/socket/router/socketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export default class SocketRouter {
eventType: message.req,
reqOrRes: 'req',
code: null,
gameId: conn.game.id,
gameId: conn.gameId,
playerAliasId: conn.playerAliasId,
devBuild: conn.isDevBuild()
})
Expand Down
77 changes: 43 additions & 34 deletions src/socket/socketConnection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { WebSocket } from 'ws'
import PlayerAlias from '../entities/player-alias'
import Game from '../entities/game'
import APIKey, { APIKeyScope } from '../entities/api-key'
import { APIKeyScope } from '../entities/api-key'
import { RequestContext, EntityManager } from '@mikro-orm/mysql'
import { v4 } from 'uuid'
import Redis from 'ioredis'
Expand All @@ -13,24 +12,29 @@ import { SocketErrorCode } from './messages/socketError'
import SocketTicket from './socketTicket'
import { setTraceAttributes } from '@hyperdx/node-opentelemetry'
import { getSocketTracer } from './socketTracer'
import { getResultCacheOptions } from '../lib/perf/getResultCacheOptions'

export default class SocketConnection {
alive: boolean = true
playerAliasId!: number
game: Game
private apiKey: APIKey
readonly gameId: number
private readonly apiKeyId: number
private readonly apiKeyScopes: APIKeyScope[]
private readonly devBuild: boolean

rateLimitKey: string = `requests.socket:${v4()}`
rateLimitWarnings: number = 0

constructor(
private readonly wss: Socket,
private readonly ws: WebSocket,
private readonly ticket: SocketTicket,
ticket: SocketTicket,
private readonly remoteAddress: string
) {
this.game = this.ticket.apiKey.game
this.apiKey = this.ticket.apiKey
this.gameId = ticket.apiKey.game.id
this.apiKeyId = ticket.apiKey.id
this.apiKeyScopes = ticket.apiKey.scopes
this.devBuild = ticket.devBuild
}

async getPlayerAlias(): Promise<PlayerAlias | null> {
Expand All @@ -39,15 +43,18 @@ export default class SocketConnection {
if (!em) {
throw new Error('Missing request context for entity manager')
}
return em.repo(PlayerAlias).findOne(this.playerAliasId)
return em.repo(PlayerAlias).findOne(
this.playerAliasId,
getResultCacheOptions(`socket-connection-alias-${this.playerAliasId}`, 1000)
)
}

getAPIKeyId(): number {
return this.ticket.apiKey.id
return this.apiKeyId
}

hasScope(scope: APIKeyScope): boolean {
return this.apiKey.scopes.includes(APIKeyScope.FULL_ACCESS) || this.apiKey.scopes.includes(scope)
return this.apiKeyScopes.includes(APIKeyScope.FULL_ACCESS) || this.apiKeyScopes.includes(scope)
}

hasScopes(scopes: APIKeyScope[]): boolean {
Expand Down Expand Up @@ -78,35 +85,37 @@ export default class SocketConnection {
}

isDevBuild(): boolean {
return this.ticket.devBuild
return this.devBuild
}

async sendMessage<T extends object>(res: SocketMessageResponse, data: T, serialisedMessage?: string): Promise<void> {
await getSocketTracer().startActiveSpan('socket.send_message', async (span) => {
if (this.ws.readyState === this.ws.OPEN) {
const devBuild = this.isDevBuild()
const message = serialisedMessage ?? JSON.stringify({ res, data })

setTraceAttributes({
'socket.message_receiver.alias_id': this.playerAliasId,
'socket.message_receiver.dev_build': devBuild
})

logResponse(this, res, message)

await this.wss.trackEvent({
eventType: res,
reqOrRes: 'res',
code: 'errorCode' in data ? (data.errorCode as SocketErrorCode) : null,
gameId: this.game.id,
playerAliasId: this.playerAliasId,
devBuild: devBuild
})

this.ws.send(message)
try {
if (this.ws.readyState === this.ws.OPEN) {
const devBuild = this.isDevBuild()
const message = serialisedMessage ?? JSON.stringify({ res, data })

setTraceAttributes({
'socket.message_receiver.alias_id': this.playerAliasId,
'socket.message_receiver.dev_build': devBuild
})

logResponse(this, res, message)

await this.wss.trackEvent({
eventType: res,
reqOrRes: 'res',
code: 'errorCode' in data ? (data.errorCode as SocketErrorCode) : null,
gameId: this.gameId,
playerAliasId: this.playerAliasId,
devBuild: devBuild
})

this.ws.send(message)
}
} finally {
span.end()
}

span.end()
})
}

Expand Down
Loading