diff --git a/src/lib/queues/createDeleteClickHousePlayerDataQueue.ts b/src/lib/queues/createDeleteClickHousePlayerDataQueue.ts index f18557fa..e1d8a106 100644 --- a/src/lib/queues/createDeleteClickHousePlayerDataQueue.ts +++ b/src/lib/queues/createDeleteClickHousePlayerDataQueue.ts @@ -13,7 +13,6 @@ export function createDeleteClickHousePlayerDataQueue() { const queries: string[] = aliasIds.length > 0 ? [ 'DELETE FROM event_props WHERE event_id IN (SELECT id FROM events WHERE player_alias_id IN ({aliasIds:Array(UInt32)}))', 'DELETE FROM events WHERE player_alias_id IN ({aliasIds:Array(UInt32)})', - 'DELETE FROM socket_events WHERE player_alias_id IN ({aliasIds:Array(UInt32)})', 'DELETE FROM player_game_stat_snapshots WHERE player_alias_id IN ({aliasIds:Array(UInt32)})', 'DELETE FROM player_sessions WHERE player_id IN ({playerIds:Array(String)})' ] : [] diff --git a/src/lib/queues/game-metrics/flush-socket-events-queue-handler.ts b/src/lib/queues/game-metrics/flush-socket-events-queue-handler.ts deleted file mode 100644 index 7b363792..00000000 --- a/src/lib/queues/game-metrics/flush-socket-events-queue-handler.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { FlushMetricsQueueHandler } from './flush-metrics-queue-handler' -import { getInsertableSocketEventData, SocketEventData } from '../../../socket/socketEvent' -import { ClickHouseSocketEvent } from '../../../socket/socketEvent' - -type SerialisedClickHouseSocketEvent = ClickHouseSocketEvent & { id: string } - -export class FlushSocketEventsQueueHandler extends FlushMetricsQueueHandler { - constructor() { - super('socket-events', async (clickhouse, values) => { - await clickhouse.insert({ - table: 'socket_events', - values: values.filter((socketEvent) => !socketEvent.dev_build), - format: 'JSONEachRow' - }) - }, { - logsInTests: false - }) - } - - protected serialiseItem(socketEvent: SocketEventData) { - return { - id: socketEvent.id, - ...getInsertableSocketEventData(socketEvent) - } - } -} diff --git a/src/migrations/clickhouse/004CreatePlayerGameStatSnapshotsTable.ts b/src/migrations/clickhouse/003CreatePlayerGameStatSnapshotsTable.ts similarity index 100% rename from src/migrations/clickhouse/004CreatePlayerGameStatSnapshotsTable.ts rename to src/migrations/clickhouse/003CreatePlayerGameStatSnapshotsTable.ts diff --git a/src/migrations/clickhouse/003CreateSocketEventsTable.ts b/src/migrations/clickhouse/003CreateSocketEventsTable.ts deleted file mode 100644 index 8df934f8..00000000 --- a/src/migrations/clickhouse/003CreateSocketEventsTable.ts +++ /dev/null @@ -1,14 +0,0 @@ -export const CreateSocketEventsTable = `CREATE TABLE IF NOT EXISTS ${process.env.CLICKHOUSE_DB}.socket_events ( - id UUID DEFAULT generateUUIDv4(), - event_type String, - req_or_res Enum('req' = 0, 'res' = 1), - code Nullable(String), - game_id UInt32, - player_alias_id Nullable(UInt32), - dev_build Boolean, - created_at DateTime64(3), - PRIMARY KEY (id), - INDEX game_id_idx (game_id) TYPE minmax GRANULARITY 64, - INDEX player_alias_id_idx (player_alias_id) TYPE minmax GRANULARITY 64 -) ENGINE = MergeTree() -ORDER BY (id, created_at, game_id);` diff --git a/src/migrations/clickhouse/005MigrateEventsTimestampsToDate64.ts b/src/migrations/clickhouse/004MigrateEventsTimestampsToDate64.ts similarity index 100% rename from src/migrations/clickhouse/005MigrateEventsTimestampsToDate64.ts rename to src/migrations/clickhouse/004MigrateEventsTimestampsToDate64.ts diff --git a/src/migrations/clickhouse/006CreatePlayerSessionsTable.ts b/src/migrations/clickhouse/005CreatePlayerSessionsTable.ts similarity index 100% rename from src/migrations/clickhouse/006CreatePlayerSessionsTable.ts rename to src/migrations/clickhouse/005CreatePlayerSessionsTable.ts diff --git a/src/migrations/clickhouse/007AddEventPropsEventIdIndex.ts b/src/migrations/clickhouse/006AddEventPropsEventIdIndex.ts similarity index 100% rename from src/migrations/clickhouse/007AddEventPropsEventIdIndex.ts rename to src/migrations/clickhouse/006AddEventPropsEventIdIndex.ts diff --git a/src/migrations/clickhouse/index.ts b/src/migrations/clickhouse/index.ts index 5f089d25..c7892a51 100644 --- a/src/migrations/clickhouse/index.ts +++ b/src/migrations/clickhouse/index.ts @@ -1,13 +1,12 @@ import { ClickHouseClient } from '@clickhouse/client' import { CreateEventsTable } from './001CreateEventsTable' import { CreateEventPropsTable } from './002CreateEventPropsTable' -import { CreateSocketEventsTable } from './003CreateSocketEventsTable' import { CreateMigrationsTable } from './000CreateMigrationsTable' import { formatDateForClickHouse } from '../../lib/clickhouse/formatDateTime' -import { CreatePlayerGameStatSnapshotsTable } from './004CreatePlayerGameStatSnapshotsTable' -import { MigrateEventsTimestampsToDate64 } from './005MigrateEventsTimestampsToDate64' -import { CreatePlayerSessionsTable } from './006CreatePlayerSessionsTable' -import { AddEventPropsEventIdIndex } from './007AddEventPropsEventIdIndex' +import { CreatePlayerGameStatSnapshotsTable } from './003CreatePlayerGameStatSnapshotsTable' +import { MigrateEventsTimestampsToDate64 } from './004MigrateEventsTimestampsToDate64' +import { CreatePlayerSessionsTable } from './005CreatePlayerSessionsTable' +import { AddEventPropsEventIdIndex } from './006AddEventPropsEventIdIndex' type ClickHouseMigration = { name: string @@ -23,10 +22,6 @@ const migrations: ClickHouseMigration[] = [ name: 'CreateEventPropsTable', sql: CreateEventPropsTable }, - { - name: 'CreateSocketEventsTable', - sql: CreateSocketEventsTable - }, { name: 'CreatePlayerGameStatSnapshotsTable', sql: CreatePlayerGameStatSnapshotsTable diff --git a/src/socket/index.ts b/src/socket/index.ts index 76a98bb0..2349feca 100644 --- a/src/socket/index.ts +++ b/src/socket/index.ts @@ -6,13 +6,10 @@ import SocketConnection from './socketConnection' import SocketRouter from './router/socketRouter' import { sendMessage } from './messages/socketMessage' import { logConnection, logConnectionClosed } from './messages/socketLogger' -import { SocketEventData } from './socketEvent' import Redis from 'ioredis' import { createRedisConnection } from '../config/redis.config' import SocketTicket from './socketTicket' import { getSocketTracer } from './socketTracer' -import { FlushSocketEventsQueueHandler } from '../lib/queues/game-metrics/flush-socket-events-queue-handler' -import { v4 } from 'uuid' import { enableSocketTracing } from './enableSocketTracing' type CloseConnectionOptions = { @@ -26,7 +23,6 @@ export default class Socket { private readonly wss: WebSocketServer private connections: Map = new Map() private router: SocketRouter - private queueHandler: FlushSocketEventsQueueHandler redis: Redis constructor(server: Server, private readonly em: EntityManager) { @@ -42,8 +38,6 @@ export default class Socket { this.router = new SocketRouter(this) - this.queueHandler = new FlushSocketEventsQueueHandler() - const interval = this.heartbeat() this.wss.on('close', () => { clearInterval(interval) @@ -88,15 +82,6 @@ export default class Socket { const connection = new SocketConnection(this, ws, ticket, req.socket.remoteAddress!) this.connections.set(ws, connection) - await this.trackEvent({ - eventType: 'open', - reqOrRes: 'req', - code: null, - gameId: connection.gameId, - playerAliasId: null, - devBuild: ticket.devBuild - }) - await sendMessage(connection, 'v1.connected', {}) } else { await this.closeConnection(ws) @@ -163,15 +148,6 @@ export default class Socket { logConnectionClosed(connection, preclosed, code, options.reason) - await this.trackEvent({ - eventType: 'close', - reqOrRes: preclosed ? 'req' : 'res', - code: preclosed ? null : code.toString(), - gameId: connection.gameId, - playerAliasId: connection.playerAliasId, - devBuild: connection.isDevBuild() - }) - this.connections.delete(ws) } @@ -182,13 +158,4 @@ export default class Socket { findConnections(filter: (conn: SocketConnection) => boolean): SocketConnection[] { return Array.from(this.connections.values()).filter(filter) } - - async trackEvent(data: Omit): Promise { - if (process.env.DISABLE_SOCKET_EVENTS === '1') { - return - } - - /* v8 ignore next - tests mock this implementation */ - await this.queueHandler.add({ id: v4(), ...data }) - } } diff --git a/src/socket/router/socketRouter.ts b/src/socket/router/socketRouter.ts index 8e4e18d9..32374dbe 100644 --- a/src/socket/router/socketRouter.ts +++ b/src/socket/router/socketRouter.ts @@ -74,15 +74,6 @@ export default class SocketRouter { } async routeMessage(conn: SocketConnection, message: SocketMessage): Promise { - await this.wss.trackEvent({ - eventType: message.req, - reqOrRes: 'req', - code: null, - gameId: conn.gameId, - playerAliasId: conn.playerAliasId, - devBuild: conn.isDevBuild() - }) - for (const route of routes) { for await (const listener of route) { if (listener.req === message.req) { diff --git a/src/socket/socketConnection.ts b/src/socket/socketConnection.ts index 905fbcde..208e8314 100644 --- a/src/socket/socketConnection.ts +++ b/src/socket/socketConnection.ts @@ -102,15 +102,6 @@ export default class SocketConnection { logResponse(this, res, message) - await this.wss.trackEvent({ - eventType: res, - reqOrRes: 'res', - code: 'errorCode' in data ? (data.errorCode as SocketErrorCode) : null, - gameId: this.gameId, - playerAliasId: this.playerAliasId, - devBuild: devBuild - }) - this.ws.send(message) } } finally { diff --git a/src/socket/socketEvent.ts b/src/socket/socketEvent.ts deleted file mode 100644 index 565526df..00000000 --- a/src/socket/socketEvent.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { formatDateForClickHouse } from '../lib/clickhouse/formatDateTime' -import { SocketMessageRequest, SocketMessageResponse } from './messages/socketMessage' - -export type ClickHouseSocketEvent = { - event_type: string - req_or_res: 'req' | 'res' - code: string | null - game_id: number - player_alias_id: number | null - dev_build: boolean - created_at: string -} - -export type SocketEvent = { - id: string - eventType: SocketMessageRequest | SocketMessageResponse | 'open' | 'close' | 'unknown' - reqOrRes: 'req' | 'res' - code: string | null - gameId: number - playerAliasId: number | null - devBuild: boolean - createdAt: Date -} - -export type SocketEventData = Omit - -export function getInsertableSocketEventData(event: SocketEventData): ClickHouseSocketEvent { - return { - event_type: event.eventType, - req_or_res: event.reqOrRes, - code: event.code, - game_id: event.gameId, - player_alias_id: event.playerAliasId, - dev_build: event.devBuild, - created_at: formatDateForClickHouse(new Date()) - } -} diff --git a/src/tasks/cleanupOnlinePlayers.ts b/src/tasks/cleanupOnlinePlayers.ts index 9ee4994d..3c09f24c 100644 --- a/src/tasks/cleanupOnlinePlayers.ts +++ b/src/tasks/cleanupOnlinePlayers.ts @@ -1,144 +1,53 @@ -import { EntityManager, NotFoundError } from '@mikro-orm/mysql' +import { EntityManager } from '@mikro-orm/mysql' import { getMikroORM } from '../config/mikro-orm.config' import createClickHouseClient from '../lib/clickhouse/createClient' -import PlayerSession, { ClickHousePlayerSession } from '../entities/player-session' +import { ClickHousePlayerSession } from '../entities/player-session' import { ClickHouseClient } from '@clickhouse/client' -import PlayerAlias from '../entities/player-alias' -import { ClickHouseSocketEvent } from '../socket/socketEvent' -import { v4 } from 'uuid' -import { addMinutes, differenceInMinutes, subDays } from 'date-fns' +import { subDays } from 'date-fns' import PlayerPresence from '../entities/player-presence' import { formatDateForClickHouse } from '../lib/clickhouse/formatDateTime' type CleanupStats = { - unfinishedSessions: number - hadClosedEvent: number - hadNoEvents: number - hadSessionSinceOriginal: number - newSessionDurationMinutes: number[] - newSessionDurationLessThanOneMinute: number + sessionsDeleted: number presenceUpdated: number presenceDeleted: number } -let cleanupStats: Record - -function getOrCreateGameCleanupStats(gameId: number): CleanupStats { - if (!cleanupStats[gameId]) { - cleanupStats[gameId] = { - unfinishedSessions: 0, - hadClosedEvent: 0, - hadNoEvents: 0, - hadSessionSinceOriginal: 0, - newSessionDurationMinutes: [], - newSessionDurationLessThanOneMinute: 0, - presenceUpdated: 0, - presenceDeleted: 0 - } - } - return cleanupStats[gameId] -} - -async function cleanupSession(em: EntityManager, clickhouse: ClickHouseClient, session: ClickHousePlayerSession) { - const gameStats = getOrCreateGameCleanupStats(session.game_id) - gameStats.unfinishedSessions++ - - const aliases = await em.createQueryBuilder(PlayerAlias) - .select('id') - .where({ player: session.player_id }) - .execute('all', false) as { id: number }[] - - const sessionSinceOriginal = await clickhouse.query({ - query: ` - SELECT started_at from player_sessions - WHERE player_id = '${session.player_id}' AND started_at > '${session.started_at}' - ORDER BY started_at ASC - LIMIT 1 - `, - format: 'JSONEachRow' - }) - .then((res) => res.json>()) - .then((res) => res[0]) - - if (sessionSinceOriginal) { - gameStats.hadSessionSinceOriginal++ - } +let cleanupStats: CleanupStats - const socketEvents = await clickhouse.query({ - query: ` - SELECT event_type, created_at FROM socket_events - WHERE - player_alias_id IN {aliasIds:Array(UInt32)} - AND created_at <= dateSub(DAY, 1, now()) - AND created_at >= '${session.started_at}' - ${sessionSinceOriginal ? `AND created_at <= '${sessionSinceOriginal.started_at}'` : ''} - ORDER BY created_at DESC - LIMIT 5 - `, - format: 'JSONEachRow', - query_params: { - aliasIds: aliases.map(({ id }) => id) - } - }).then((res) => res.json>()) - - const hydratedSession = await new PlayerSession().hydrate(em, session) - const prevSessionId = hydratedSession.id - - const closeEvent = socketEvents.find(({ event_type }) => event_type === 'close') - if (closeEvent) { - hydratedSession.endedAt = new Date(closeEvent.created_at) - gameStats.hadClosedEvent++ - } else { - if (socketEvents.length > 0) { - hydratedSession.endedAt = new Date(socketEvents[0].created_at) - } else { - hydratedSession.endedAt = new Date(addMinutes(hydratedSession.startedAt, 1)) - gameStats.hadNoEvents++ - } - } - - let duration = differenceInMinutes(hydratedSession.endedAt, hydratedSession.startedAt) - if (duration < 1) { - gameStats.newSessionDurationLessThanOneMinute++ - hydratedSession.endedAt = addMinutes(hydratedSession.startedAt, 1) - duration = 1 - } - - hydratedSession.id = v4() - await clickhouse.exec({ query: `DELETE FROM player_sessions WHERE id = '${prevSessionId}'` }) - await clickhouse.insert({ - table: 'player_sessions', - values: hydratedSession.toInsertable(), - format: 'JSON' +async function deleteSession(clickhouse: ClickHouseClient, sessionId: string) { + await clickhouse.exec({ + query: 'DELETE FROM player_sessions WHERE id = {sessionId:String}', + query_params: { sessionId } }) - - gameStats.newSessionDurationMinutes.push(duration) + cleanupStats.sessionsDeleted++ } async function cleanupPresence(em: EntityManager, clickhouse: ClickHouseClient, presence: PlayerPresence) { - const gameStats = getOrCreateGameCleanupStats(presence.player.game.id) - const latestSessionForPresence = await clickhouse.query({ query: ` SELECT ended_at from player_sessions - WHERE player_id = '${presence.player.id}' AND ended_at >= '${formatDateForClickHouse(presence.updatedAt)}' + WHERE player_id = {playerId:String} AND ended_at >= {endedAt:String} LIMIT 1 `, + query_params: { + playerId: presence.player.id, + endedAt: formatDateForClickHouse(presence.updatedAt) + }, format: 'JSONEachRow' }) .then((res) => res.json>()) .then((res) => res[0]) if (latestSessionForPresence?.ended_at) { - gameStats.presenceUpdated++ + cleanupStats.presenceUpdated++ presence.online = false await em.flush() } } async function removeDisconnectedPresence(em: EntityManager, presence: PlayerPresence) { - const gameStats = getOrCreateGameCleanupStats(presence.playerAlias.player.game.id) - gameStats.presenceDeleted++ + cleanupStats.presenceDeleted++ await em.removeAndFlush(presence) } @@ -147,7 +56,11 @@ export default async function cleanupOnlinePlayers() { const em = orm.em.fork() as EntityManager const clickhouse = createClickHouseClient() - cleanupStats = {} + cleanupStats = { + sessionsDeleted: 0, + presenceUpdated: 0, + presenceDeleted: 0 + } const sessions = await clickhouse.query({ query: ` @@ -162,22 +75,7 @@ export default async function cleanupOnlinePlayers() { console.info(`Found ${sessions.length} unfinished sessions`) for (const session of sessions) { - try { - await cleanupSession(em, clickhouse, session) - } catch (err) { - if (err instanceof NotFoundError) { - // player was deleted, just delete their sessions - await clickhouse.exec({ - query: 'DELETE FROM player_sessions WHERE player_id = {playerId:String}', - query_params: { - playerId: session.player_id - } - }) - /* v8 ignore next 3 */ - } else { - throw err - } - } + await deleteSession(clickhouse, session.id) } // todo, find out how this is happening diff --git a/tests/setupTest.ts b/tests/setupTest.ts index b62acb22..6092c9b4 100644 --- a/tests/setupTest.ts +++ b/tests/setupTest.ts @@ -24,7 +24,6 @@ async function truncateTables() { beforeAll(async () => { vi.mock('nodemailer') vi.mock('bullmq') - vi.stubEnv('DISABLE_SOCKET_EVENTS', '1') const app = await init() global.app = app.callback() diff --git a/tests/socket/socketEvents.test.ts b/tests/socket/socketEvents.test.ts deleted file mode 100644 index 18a01d4b..00000000 --- a/tests/socket/socketEvents.test.ts +++ /dev/null @@ -1,194 +0,0 @@ -import createAPIKeyAndToken from '../utils/createAPIKeyAndToken' -import { ClickHouseSocketEvent } from '../../src/socket/socketEvent' -import createSocketIdentifyMessage from '../utils/createSocketIdentifyMessage' -import { APIKeyScope } from '../../src/entities/api-key' -import { createSocketTicket } from '../../src/services/api/socket-ticket-api.service' -import createTestSocket from '../utils/createTestSocket' -import Socket from '../../src/socket' -import { FlushSocketEventsQueueHandler } from '../../src/lib/queues/game-metrics/flush-socket-events-queue-handler' -import { v4 } from 'uuid' - -describe('Socket events', () => { - beforeAll(() => { - vi.stubEnv('DISABLE_SOCKET_EVENTS', '0') - - // essentially the same functionality but with instant flushing - vi.spyOn(Socket.prototype, 'trackEvent').mockImplementation(async (data) => { - const handler = new FlushSocketEventsQueueHandler() - await handler.add({ id: v4(), ...data }) - await handler.handle() - }) - }) - - afterAll(() => { - vi.stubEnv('DISABLE_SOCKET_EVENTS', '1') - vi.restoreAllMocks() - }) - - it('should track open, connected and close events', async () => { - const [apiKey] = await createAPIKeyAndToken([]) - const ticket = await createSocketTicket(redis, apiKey, false) - - await createTestSocket(`/?ticket=${ticket}`, async () => {}) - - let events: ClickHouseSocketEvent[] = [] - await vi.waitUntil(async () => { - events = await clickhouse.query({ - query: `SELECT * FROM socket_events WHERE game_id = ${apiKey.game.id} ORDER BY created_at`, - format: 'JSONEachRow' - }).then((res) => res.json()) - return events.length === 3 - }) - - expect(events[0].event_type).toBe('open') - expect(events[0].req_or_res).toBe('req') - expect(events[0].code).toBeNull() - expect(events[0].game_id).toBe(apiKey.game.id) - expect(events[0].player_alias_id).toBeNull() - expect(events[0].dev_build).toBe(false) - - expect(events[1].event_type).toBe('v1.connected') - expect(events[1].req_or_res).toBe('res') - expect(events[1].code).toBeNull() - expect(events[1].game_id).toBe(apiKey.game.id) - expect(events[1].player_alias_id).toBeNull() - expect(events[1].dev_build).toBe(false) - - expect(events[2].event_type).toBe('close') - expect(events[2].req_or_res).toBe('req') - expect(events[2].code).toBeNull() - expect(events[2].game_id).toBe(apiKey.game.id) - expect(events[2].player_alias_id).toBeNull() - expect(events[2].dev_build).toBe(false) - }) - - it('should track requests and responses', async () => { - const { identifyMessage, ticket, player } = await createSocketIdentifyMessage([APIKeyScope.READ_PLAYERS]) - - await createTestSocket(`/?ticket=${ticket}`, async (client) => { - await client.identify(identifyMessage) - }) - - await vi.waitFor(async () => { - const events = await clickhouse.query({ - query: `SELECT * FROM socket_events WHERE game_id = ${player.game.id} ORDER BY created_at`, - format: 'JSONEachRow' - }).then((res) => res.json()) - - expect(events).toEqual(expect.arrayContaining([ - expect.objectContaining({ - event_type: 'open', - req_or_res: 'req', - code: null, - game_id: player.game.id, - player_alias_id: null, - dev_build: false - }), - expect.objectContaining({ - event_type: 'v1.connected', - req_or_res: 'res', - code: null, - game_id: player.game.id, - player_alias_id: null, - dev_build: false - }), - expect.objectContaining({ - event_type: 'v1.players.identify', - req_or_res: 'req', - code: null, - game_id: player.game.id, - player_alias_id: null, - dev_build: false - }), - expect.objectContaining({ - event_type: 'close', - req_or_res: 'req', - code: null, - game_id: player.game.id, - player_alias_id: player.aliases[0].id, - dev_build: false - }) - ])) - }) - }) - - it('should track errors', async () => { - const { identifyMessage, ticket, player } = await createSocketIdentifyMessage([APIKeyScope.READ_PLAYERS]) - - await createTestSocket(`/?ticket=${ticket}`, async (client) => { - client.sendJson({ - ...identifyMessage, - data: { - ...identifyMessage.data, - socketToken: 'invalid' - } - }) - await client.expectJsonToStrictEqual({ - res: 'v1.error', - data: { - req: 'v1.players.identify', - message: 'Invalid socket token', - errorCode: 'INVALID_SOCKET_TOKEN' - } - }) - }) - - let events: ClickHouseSocketEvent[] = [] - await vi.waitUntil(async () => { - events = await clickhouse.query({ - query: `SELECT * FROM socket_events WHERE game_id = ${player.game.id} ORDER BY created_at`, - format: 'JSONEachRow' - }).then((res) => res.json()) - return events.length === 5 - }) - - expect(events[0].event_type).toBe('open') - expect(events[0].req_or_res).toBe('req') - expect(events[0].code).toBeNull() - expect(events[0].game_id).toBe(player.game.id) - expect(events[0].player_alias_id).toBeNull() - expect(events[0].dev_build).toBe(false) - - expect(events[1].event_type).toBe('v1.connected') - expect(events[1].req_or_res).toBe('res') - expect(events[1].code).toBeNull() - expect(events[1].game_id).toBe(player.game.id) - expect(events[1].player_alias_id).toBeNull() - expect(events[1].dev_build).toBe(false) - - expect(events[2].event_type).toBe('v1.players.identify') - expect(events[2].req_or_res).toBe('req') - expect(events[2].code).toBeNull() - expect(events[2].game_id).toBe(player.game.id) - expect(events[2].player_alias_id).toBeNull() - expect(events[2].dev_build).toBe(false) - - expect(events[3].event_type).toBe('v1.error') - expect(events[3].req_or_res).toBe('res') - expect(events[3].code).toBe('INVALID_SOCKET_TOKEN') - expect(events[3].game_id).toBe(player.game.id) - expect(events[3].player_alias_id).toBeNull() - expect(events[3].dev_build).toBe(false) - - expect(events[4].event_type).toBe('close') - expect(events[4].req_or_res).toBe('req') - expect(events[4].code).toBeNull() - expect(events[4].game_id).toBe(player.game.id) - expect(events[4].player_alias_id).toBeNull() - expect(events[4].dev_build).toBe(false) - }) - - it('should not track dev build events', async () => { - const [apiKey] = await createAPIKeyAndToken([]) - const ticket = await createSocketTicket(redis, apiKey, true) - - await createTestSocket(`/?ticket=${ticket}`, async () => {}) - - const events = await clickhouse.query({ - query: `SELECT * FROM socket_events WHERE game_id = ${apiKey.game.id} ORDER BY created_at`, - format: 'JSONEachRow' - }).then((res) => res.json()) - - expect(events).toHaveLength(0) - }) -}) diff --git a/tests/tasks/cleanupOnlinePlayers.test.ts b/tests/tasks/cleanupOnlinePlayers.test.ts index a55f1a94..652bed96 100644 --- a/tests/tasks/cleanupOnlinePlayers.test.ts +++ b/tests/tasks/cleanupOnlinePlayers.test.ts @@ -1,4 +1,4 @@ -import { addHours, addMinutes, isToday, subDays, subMinutes } from 'date-fns' +import { addMinutes, isToday, subDays, subMinutes } from 'date-fns' import cleanupOnlinePlayers from '../../src/tasks/cleanupOnlinePlayers' import PlayerFactory from '../fixtures/PlayerFactory' import createOrganisationAndGame from '../utils/createOrganisationAndGame' @@ -19,7 +19,7 @@ describe('cleanupOnlinePlayers', () => { vi.useRealTimers() }) - it('should remove an unfinished session if a close event is found', async () => { + it('should delete unfinished sessions older than 1 day', async () => { const [, game] = await createOrganisationAndGame() const player = await new PlayerFactory([game]).one() await em.persistAndFlush(player) @@ -36,268 +36,15 @@ describe('cleanupOnlinePlayers', () => { return sessions.length === 1 }) - // create a socket closed event - const closedAt = addMinutes(session.startedAt, 5) - await clickhouse.insert({ - table: 'socket_events', - values: { - event_type: 'close', - req_or_res: 'req', - code: null, - game_id: game.id, - player_alias_id: player.aliases[0].id, - dev_build: false, - created_at: formatDateForClickHouse(closedAt) - }, - format: 'JSON' - }) - - await cleanupOnlinePlayers() - - let finishedSession: ClickHousePlayerSession | undefined - await vi.waitUntil(async () => { - finishedSession = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}'`, - format: 'JSONEachRow' - }).then((res) => res.json()).then((res) => res[0]) - return !!finishedSession - }) - - assert(finishedSession?.ended_at) - expect(new Date(finishedSession.ended_at).getTime()).toBe(closedAt.getTime()) - }) - - it('should use the latest event if a close event is not found to use as the end time', async () => { - const [, game] = await createOrganisationAndGame() - const player = await new PlayerFactory([game]).one() - await em.persistAndFlush(player) - - const session = new PlayerSession() - session.construct(player) - await player.insertSession(clickhouse, session) - - await vi.waitUntil(async () => { - const sessions = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}' and ended_at IS NULL`, - format: 'JSONEachRow' - }).then((res) => res.json()) - return sessions.length === 1 - }) - - // create a random event to use as their last event - const createdAt = addMinutes(session.startedAt, 5) - await clickhouse.insert({ - table: 'socket_events', - values: { - event_type: 'v1.channels.message', - req_or_res: 'req', - code: null, - game_id: game.id, - player_alias_id: player.aliases[0].id, - dev_build: false, - created_at: formatDateForClickHouse(createdAt) - }, - format: 'JSON' - }) - - await cleanupOnlinePlayers() - - let finishedSession: ClickHousePlayerSession | undefined - await vi.waitUntil(async () => { - finishedSession = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}'`, - format: 'JSONEachRow' - }).then((res) => res.json()).then((res) => res[0]) - return !!finishedSession - }) - - assert(finishedSession?.ended_at) - expect(new Date(finishedSession.ended_at).getTime()).toBe(createdAt.getTime()) - }) - - it('should add a minute to the started_at if there are no events for the session', async () => { - const [, game] = await createOrganisationAndGame() - const player = await new PlayerFactory([game]).one() - await em.persistAndFlush(player) - - const session = new PlayerSession() - session.construct(player) - await player.insertSession(clickhouse, session) - - await vi.waitUntil(async () => { - const sessions = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}' and ended_at IS NULL`, - format: 'JSONEachRow' - }).then((res) => res.json()) - return sessions.length === 1 - }) - - await cleanupOnlinePlayers() - - let finishedSession: ClickHousePlayerSession | undefined - await vi.waitUntil(async () => { - finishedSession = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}'`, - format: 'JSONEachRow' - }).then((res) => res.json()).then((res) => res[0]) - return !!finishedSession - }) - - assert(finishedSession?.ended_at) - expect(new Date(finishedSession.ended_at).getTime()).toBe((addMinutes(session.startedAt, 1)).getTime()) - }) - - it('should not use events from any newer sessions', async () => { - const [, game] = await createOrganisationAndGame() - const player = await new PlayerFactory([game]).one() - await em.persistAndFlush(player) - - const session = new PlayerSession() - session.construct(player) - await player.insertSession(clickhouse, session) - - vi.setSystemTime(addHours(new Date(), 1)) - - const sessionSinceOriginal = new PlayerSession() - sessionSinceOriginal.construct(player) - sessionSinceOriginal.endSession() - await player.insertSession(clickhouse, sessionSinceOriginal) - - await vi.waitUntil(async () => { - const sessions = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}' and ended_at IS NULL`, - format: 'JSONEachRow' - }).then((res) => res.json()) - return sessions.length === 1 - }) - - // socket event for the latest session, not the unfinished one - const createdAt = addMinutes(sessionSinceOriginal.startedAt, 5) - await clickhouse.insert({ - table: 'socket_events', - values: { - event_type: 'v1.channels.message', - req_or_res: 'req', - code: null, - game_id: game.id, - player_alias_id: player.aliases[0].id, - dev_build: false, - created_at: formatDateForClickHouse(createdAt) - }, - format: 'JSON' - }) - await cleanupOnlinePlayers() - let finishedSession: ClickHousePlayerSession | undefined - await vi.waitUntil(async () => { - finishedSession = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}' ORDER BY started_at ASC`, - format: 'JSONEachRow' - }).then((res) => res.json()).then((res) => res[0]) - return !!finishedSession - }) - - // the only event was part of the latest session, not the original - assert(finishedSession?.ended_at) - expect(new Date(finishedSession.ended_at).getTime()).toBe(addMinutes(session.startedAt, 1).getTime()) - }) - - it('should not use events that were created before the session', async () => { - const [, game] = await createOrganisationAndGame() - const player = await new PlayerFactory([game]).one() - await em.persistAndFlush(player) - - const session = new PlayerSession() - session.construct(player) - await player.insertSession(clickhouse, session) - await vi.waitUntil(async () => { const sessions = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}' and ended_at IS NULL`, - format: 'JSONEachRow' - }).then((res) => res.json()) - return sessions.length === 1 - }) - - // random event created before the session started - const createdAt = subMinutes(session.startedAt, 5) - await clickhouse.insert({ - table: 'socket_events', - values: { - event_type: 'v1.channels.message', - req_or_res: 'req', - code: null, - game_id: game.id, - player_alias_id: player.aliases[0].id, - dev_build: false, - created_at: formatDateForClickHouse(createdAt) - }, - format: 'JSON' - }) - - await cleanupOnlinePlayers() - - let finishedSession: ClickHousePlayerSession | undefined - await vi.waitUntil(async () => { - finishedSession = await clickhouse.query({ query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}'`, format: 'JSONEachRow' - }).then((res) => res.json()).then((res) => res[0]) - return !!finishedSession - }) - - assert(finishedSession?.ended_at) - // no events during the session, so it should be set to 1 minute after startedAt - expect(new Date(finishedSession.ended_at).getTime()).toBe((addMinutes(session.startedAt, 1)).getTime()) - }) - - it('should not allow the session length to be less than a minute', async () => { - const [, game] = await createOrganisationAndGame() - const player = await new PlayerFactory([game]).one() - await em.persistAndFlush(player) - - const session = new PlayerSession() - session.construct(player) - await player.insertSession(clickhouse, session) - - await vi.waitUntil(async () => { - const sessions = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}' and ended_at IS NULL`, - format: 'JSONEachRow' }).then((res) => res.json()) - return sessions.length === 1 - }) - - const createdAt = session.startedAt - await clickhouse.insert({ - table: 'socket_events', - values: { - event_type: 'v1.channels.message', - req_or_res: 'req', - code: null, - game_id: game.id, - player_alias_id: player.aliases[0].id, - dev_build: false, - created_at: formatDateForClickHouse(createdAt) - }, - format: 'JSON' - }) - - await cleanupOnlinePlayers() - - let finishedSession: ClickHousePlayerSession | undefined - await vi.waitUntil(async () => { - finishedSession = await clickhouse.query({ - query: `SELECT * FROM player_sessions WHERE player_id = '${player.id}'`, - format: 'JSONEachRow' - }).then((res) => res.json()).then((res) => res[0]) - return !!finishedSession + return sessions.length === 0 }) - - assert(finishedSession?.ended_at) - // if session length is less than a minute, it should be set to 1 minute after startedAt - expect(new Date(finishedSession.ended_at).getTime()).toBe((addMinutes(session.startedAt, 1)).getTime()) }) it('should set presence to offline if the latest session ended after the presence was updated', async () => { @@ -418,6 +165,7 @@ describe('cleanupOnlinePlayers', () => { }) it('should delete sessions for a deleted player', async () => { + // Use the time from beforeEach (2 days ago) to ensure session is old enough const deletedPlayerId = crypto.randomUUID() await clickhouse.insert({ @@ -425,7 +173,7 @@ describe('cleanupOnlinePlayers', () => { values: { player_id: deletedPlayerId, player_alias_id: 123, - started_at: formatDateForClickHouse(new Date()), + started_at: formatDateForClickHouse(subDays(new Date(), 2)), ended_at: null, dev_build: true },