From 6c3fbbd6bc1e9f4b4c68075786f9a390eb353a9b Mon Sep 17 00:00:00 2001 From: didinele Date: Fri, 12 Mar 2021 11:44:09 +0200 Subject: [PATCH] feat(gateway): resuming when connecting for the first time --- libs/gateway/src/websocket/Cluster.ts | 8 ++- .../src/websocket/WebsocketConnection.ts | 62 ++++++++++--------- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/libs/gateway/src/websocket/Cluster.ts b/libs/gateway/src/websocket/Cluster.ts index ac9795a..f480cc9 100644 --- a/libs/gateway/src/websocket/Cluster.ts +++ b/libs/gateway/src/websocket/Cluster.ts @@ -3,7 +3,8 @@ import { WebsocketConnection, WebsocketConnectionStatus, WebsocketConnectionOptions, - WebsocketConnectionDestroyOptions + WebsocketConnectionDestroyOptions, + WebsocketConnectionConnectOptions } from './WebsocketConnection'; import { stripIndent } from 'common-tags'; import { RestManager, MemoryMutex, RedisMutex } from '@cordis/rest'; @@ -228,8 +229,9 @@ export class Cluster extends EventEmitter { /** * Spawns all of the shards (if not yet spawned) and connects each one to the gateway + * @param options Array of connection options for each shard */ - public async connect() { + public async connect(options?: (WebsocketConnectionConnectOptions | undefined)[]) { if (!this.shards.length) { const { url, @@ -254,7 +256,7 @@ export class Cluster extends EventEmitter { } } - return Promise.all(this.shards.map(shard => shard.connect())); + return Promise.all(this.shards.map((shard, i) => shard.connect(options?.[i]))); } /** diff --git a/libs/gateway/src/websocket/WebsocketConnection.ts b/libs/gateway/src/websocket/WebsocketConnection.ts index 8e2fcf8..a75ad26 100644 --- a/libs/gateway/src/websocket/WebsocketConnection.ts +++ b/libs/gateway/src/websocket/WebsocketConnection.ts @@ -96,6 +96,11 @@ export interface WebsocketConnectionOptions { intents?: Intents | IntentKeys | IntentKeys[] | bigint; } +export interface WebsocketConnectionConnectOptions { + sessionId?: string; + sequence?: number; +} + export interface WebsocketConnectionDestroyOptions { /** * whether the shard should try to reconnect or not @@ -304,8 +309,8 @@ export class WebsocketConnection { * b) The timeout that waits for guilds is hit & the rest of the pending guilds are deemed unavailable. * It should be noted that in either case when this function resolves this shard's status becomes 6 (ready) */ - public connect(): Promise { - return new Promise((resolve, reject) => { + public connect(options?: WebsocketConnectionConnectOptions) { + return new Promise((resolve, reject) => { switch (this.status) { case WebsocketConnectionStatus.connecting: case WebsocketConnectionStatus.disconnecting: @@ -321,7 +326,14 @@ export class WebsocketConnection { this._connectResolve = this._wrapResolve(resolve); this._connectReject = this._wrapReject(reject); - if (this.status !== WebsocketConnectionStatus.reconnecting) this.status = WebsocketConnectionStatus.connecting; + if (options?.sequence && options.sessionId) { + this.status = WebsocketConnectionStatus.reconnecting; + this._sequence = options.sequence; + this._sessionId = options.sessionId; + } else if (this.status !== WebsocketConnectionStatus.reconnecting) { + this.status = WebsocketConnectionStatus.connecting; + } + this.debug(stripIndent` [CONNECT] Gateway : ${this._url} @@ -659,41 +671,33 @@ export class WebsocketConnection { this._clearTimeout('hello'); this.debug('Clearing HELLO timeout'); - let reconnecting = this.status === WebsocketConnectionStatus.reconnecting; + const reconnecting = this.status === WebsocketConnectionStatus.reconnecting; this.debug(`Setting heartbeat interval of ${packet.d.heartbeat_interval}ms`); this._registerInterval('heartbeat', () => this._heartbeat(false), packet.d.heartbeat_interval); - const necessary = this._sequence != null && this._sessionId != null; - if (reconnecting) { + const necessary = this._sequence != null && this._sessionId != null; + this.debug(stripIndent` - Trying to resume with + Trying to resume data is present: ${necessary} session : ${this._sessionId} sequence : ${this._sequence} `); - } - if (reconnecting && necessary) { - await this.send({ - op: GatewayOPCodes.Resume, - d: { - token: this.cluster.auth, - // eslint-disable-next-line @typescript-eslint/naming-convention - session_id: this._sessionId!, - seq: this._sequence! - } - }); - } else { - // In case the necessary state wasn't present for reconnecting make sure to properly clean up - if (reconnecting && !necessary) { - await this.destroy({ reason: 'Tried to resume but was missing essential state', reconnect: true, fatal: true }); + if (necessary) { + await this.send({ + op: GatewayOPCodes.Resume, + d: { + token: this.cluster.auth, + session_id: this._sessionId!, + seq: this._sequence! + } + }); + } else { + return this.destroy({ reason: 'Tried to resume but was missing essential state', reconnect: true, fatal: true }); } - - this.status = WebsocketConnectionStatus.open; - reconnecting = false; - await this._identify(); } if (!reconnecting) { @@ -721,8 +725,6 @@ export class WebsocketConnection { }; private async _handleDispatch(payload: GatewayDispatchPayload): Promise { - if (this._sequence == null || payload.s > this._sequence) this._sequence = payload.s; - switch (payload.t) { case GatewayDispatchEvents.Ready: { this._clearTimeout('discordReady'); @@ -772,7 +774,8 @@ export class WebsocketConnection { case GatewayDispatchEvents.Resumed: { this._clearTimeout('reconnecting'); - this.debug(`Resumed Session ${this._sessionId}; Replayed ${payload.s - this._sequence} events`); + const replayed = this._sequence ? payload.s - this._sequence : 'an unknown amount of'; + this.debug(`Resumed Session ${this._sessionId}; Replaying ${replayed} events`); this.status = WebsocketConnectionStatus.ready; break; } @@ -780,6 +783,7 @@ export class WebsocketConnection { default: break; } + if (this._sequence == null || payload.s > this._sequence) this._sequence = payload.s; this.cluster.emit('dispatch', payload, this.id); }