Skip to content
This repository was archived by the owner on Apr 17, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions libs/gateway/src/websocket/Cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import {
WebsocketConnection,
WebsocketConnectionStatus,
WebsocketConnectionOptions,
WebsocketConnectionDestroyOptions
WebsocketConnectionDestroyOptions,
WebsocketConnectionConnectOptions
} from './WebsocketConnection';
import { stripIndent } from 'common-tags';
import { Rest, MemoryMutex, RedisMutex } from '@cordis/rest';
Expand Down Expand Up @@ -219,8 +220,9 @@ export class Cluster extends EventEmitter {

/**
* Spawns all of the shards (if not yet spawned) and connects each one to the gateway
* @param options Array of connection options for each shard
*/
public async connect() {
public async connect(options?: (WebsocketConnectionConnectOptions | undefined)[]) {
if (!this.shards.length) {
const {
url,
Expand All @@ -245,7 +247,7 @@ export class Cluster extends EventEmitter {
}
}

return Promise.all(this.shards.map(shard => shard.connect()));
return Promise.all(this.shards.map((shard, i) => shard.connect(options?.[i])));
}

/**
Expand Down
62 changes: 33 additions & 29 deletions libs/gateway/src/websocket/WebsocketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ export interface WebsocketConnectionOptions {
intents?: Intents | IntentKeys | IntentKeys[] | bigint;
}

export interface WebsocketConnectionConnectOptions {
sessionId?: string;
sequence?: number;
}

export interface WebsocketConnectionDestroyOptions {
/**
* whether the shard should try to reconnect or not
Expand Down Expand Up @@ -304,8 +309,8 @@ export class WebsocketConnection {
* b) The timeout that waits for guilds is hit & the rest of the pending guilds are deemed unavailable.
* It should be noted that in either case when this function resolves this shard's status becomes 6 (ready)
*/
public connect(): Promise<void> {
return new Promise((resolve, reject) => {
public connect(options?: WebsocketConnectionConnectOptions) {
return new Promise<void>((resolve, reject) => {
switch (this.status) {
case WebsocketConnectionStatus.connecting:
case WebsocketConnectionStatus.disconnecting:
Expand All @@ -321,7 +326,14 @@ export class WebsocketConnection {
this._connectResolve = this._wrapResolve(resolve);
this._connectReject = this._wrapReject(reject);

if (this.status !== WebsocketConnectionStatus.reconnecting) this.status = WebsocketConnectionStatus.connecting;
if (options?.sequence && options.sessionId) {
this.status = WebsocketConnectionStatus.reconnecting;
this._sequence = options.sequence;
this._sessionId = options.sessionId;
} else if (this.status !== WebsocketConnectionStatus.reconnecting) {
this.status = WebsocketConnectionStatus.connecting;
}

this.debug(stripIndent`
[CONNECT]
Gateway : ${this._url}
Expand Down Expand Up @@ -659,41 +671,33 @@ export class WebsocketConnection {
this._clearTimeout('hello');
this.debug('Clearing HELLO timeout');

let reconnecting = this.status === WebsocketConnectionStatus.reconnecting;
const reconnecting = this.status === WebsocketConnectionStatus.reconnecting;
this.debug(`Setting heartbeat interval of ${packet.d.heartbeat_interval}ms`);

this._registerInterval('heartbeat', () => this._heartbeat(false), packet.d.heartbeat_interval);

const necessary = this._sequence != null && this._sessionId != null;

if (reconnecting) {
const necessary = this._sequence != null && this._sessionId != null;

this.debug(stripIndent`
Trying to resume with
Trying to resume
data is present: ${necessary}
session : ${this._sessionId}
sequence : ${this._sequence}
`);
}

if (reconnecting && necessary) {
await this.send({
op: GatewayOPCodes.Resume,
d: {
token: this.cluster.auth,
// eslint-disable-next-line @typescript-eslint/naming-convention
session_id: this._sessionId!,
seq: this._sequence!
}
});
} else {
// In case the necessary state wasn't present for reconnecting make sure to properly clean up
if (reconnecting && !necessary) {
await this.destroy({ reason: 'Tried to resume but was missing essential state', reconnect: true, fatal: true });
if (necessary) {
await this.send({
op: GatewayOPCodes.Resume,
d: {
token: this.cluster.auth,
session_id: this._sessionId!,
seq: this._sequence!
}
});
} else {
return this.destroy({ reason: 'Tried to resume but was missing essential state', reconnect: true, fatal: true });
}

this.status = WebsocketConnectionStatus.open;
reconnecting = false;
await this._identify();
}

if (!reconnecting) {
Expand Down Expand Up @@ -721,8 +725,6 @@ export class WebsocketConnection {
};

private _handleDispatch(payload: GatewayDispatchPayload) {
if (this._sequence == null || payload.s > this._sequence) this._sequence = payload.s;

switch (payload.t) {
case GatewayDispatchEvents.Ready: {
this._clearTimeout('discordReady');
Expand Down Expand Up @@ -765,14 +767,16 @@ export class WebsocketConnection {

case GatewayDispatchEvents.Resumed: {
this._clearTimeout('reconnecting');
this.debug(`Resumed Session ${this._sessionId}; Replayed ${payload.s - this._sequence} events`);
const replayed = this._sequence ? payload.s - this._sequence : 'an unknown amount of';
this.debug(`Resumed Session ${this._sessionId}; Replaying ${replayed} events`);
this.status = WebsocketConnectionStatus.ready;
break;
}

default: break;
}

if (this._sequence == null || payload.s > this._sequence) this._sequence = payload.s;
this.cluster.emit('dispatch', payload, this.id);
}

Expand Down