Skip to content

Commit

Permalink
feat(ws): metrics (#9005)
Browse files Browse the repository at this point in the history
* feat(WebSocketManager): fetch status

* feat(WebSocketShard): heartbeat event

* chore: ci

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
didinele and kodiakhq[bot] committed Jan 6, 2023
1 parent e8b7504 commit 0ff67d8
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 20 deletions.
4 changes: 4 additions & 0 deletions packages/ws/__tests__/strategy/WorkerShardingStrategy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ vi.mock('node:worker_threads', async () => {
case WorkerSendPayloadOp.ShardCanIdentify: {
break;
}

case WorkerSendPayloadOp.FetchStatus: {
break;
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion packages/ws/src/strategies/sharding/IShardingStrategy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Collection } from '@discordjs/collection';
import type { Awaitable } from '@discordjs/util';
import type { GatewaySendPayload } from 'discord-api-types/v10';
import type { WebSocketShardDestroyOptions } from '../../ws/WebSocketShard';
import type { WebSocketShardDestroyOptions, WebSocketShardStatus } from '../../ws/WebSocketShard';

/**
* Strategies responsible for spawning, initializing connections, destroying shards, and relaying events
Expand All @@ -14,6 +15,10 @@ export interface IShardingStrategy {
* Destroys all the shards
*/
destroy(options?: Omit<WebSocketShardDestroyOptions, 'recover'>): Awaitable<void>;
/**
* Fetches the status of all the shards
*/
fetchStatus(): Awaitable<Collection<number, WebSocketShardStatus>>;
/**
* Sends a payload to a shard
*/
Expand Down
7 changes: 7 additions & 0 deletions packages/ws/src/strategies/sharding/SimpleShardingStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,11 @@ export class SimpleShardingStrategy implements IShardingStrategy {
if (!shard) throw new Error(`Shard ${shardId} not found`);
return shard.send(payload);
}

/**
* {@inheritDoc IShardingStrategy.fetchStatus}
*/
public async fetchStatus() {
return this.shards.mapValues((shard) => shard.status);
}
}
45 changes: 40 additions & 5 deletions packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Collection } from '@discordjs/collection';
import type { GatewaySendPayload } from 'discord-api-types/v10';
import { IdentifyThrottler } from '../../utils/IdentifyThrottler.js';
import type { SessionInfo, WebSocketManager } from '../../ws/WebSocketManager';
import type { WebSocketShardDestroyOptions, WebSocketShardEvents } from '../../ws/WebSocketShard';
import type { WebSocketShardDestroyOptions, WebSocketShardEvents, WebSocketShardStatus } from '../../ws/WebSocketShard';
import { managerToFetchingStrategyOptions, type FetchingStrategyOptions } from '../context/IContextFetchingStrategy.js';
import type { IShardingStrategy } from './IShardingStrategy.js';

Expand All @@ -19,9 +19,11 @@ export enum WorkerSendPayloadOp {
Send,
SessionInfoResponse,
ShardCanIdentify,
FetchStatus,
}

export type WorkerSendPayload =
| { nonce: number; op: WorkerSendPayloadOp.FetchStatus; shardId: number }
| { nonce: number; op: WorkerSendPayloadOp.SessionInfoResponse; session: SessionInfo | null }
| { nonce: number; op: WorkerSendPayloadOp.ShardCanIdentify }
| { op: WorkerSendPayloadOp.Connect; shardId: number }
Expand All @@ -35,11 +37,13 @@ export enum WorkerRecievePayloadOp {
RetrieveSessionInfo,
UpdateSessionInfo,
WaitForIdentify,
FetchStatusResponse,
}

export type WorkerRecievePayload =
// Can't seem to get a type-safe union based off of the event, so I'm sadly leaving data as any for now
| { data: any; event: WebSocketShardEvents; op: WorkerRecievePayloadOp.Event; shardId: number }
| { nonce: number; op: WorkerRecievePayloadOp.FetchStatusResponse; status: WebSocketShardStatus }
| { nonce: number; op: WorkerRecievePayloadOp.RetrieveSessionInfo; shardId: number }
| { nonce: number; op: WorkerRecievePayloadOp.WaitForIdentify }
| { op: WorkerRecievePayloadOp.Connected; shardId: number }
Expand Down Expand Up @@ -72,6 +76,8 @@ export class WorkerShardingStrategy implements IShardingStrategy {

private readonly destroyPromises = new Collection<number, () => void>();

private readonly fetchStatusPromises = new Collection<number, (status: WebSocketShardStatus) => void>();

private readonly throttler: IdentifyThrottler;

public constructor(manager: WebSocketManager, options: WorkerShardingStrategyOptions) {
Expand Down Expand Up @@ -179,18 +185,41 @@ export class WorkerShardingStrategy implements IShardingStrategy {
worker.postMessage(payload);
}

/**
* {@inheritDoc IShardingStrategy.fetchStatus}
*/
public async fetchStatus() {
const statuses = new Collection<number, WebSocketShardStatus>();

for (const [shardId, worker] of this.#workerByShardId.entries()) {
const nonce = Math.random();
const payload = {
op: WorkerSendPayloadOp.FetchStatus,
shardId,
nonce,
} satisfies WorkerSendPayload;

// eslint-disable-next-line no-promise-executor-return
const promise = new Promise<WebSocketShardStatus>((resolve) => this.fetchStatusPromises.set(nonce, resolve));
worker.postMessage(payload);

const status = await promise;
statuses.set(shardId, status);
}

return statuses;
}

private async onMessage(worker: Worker, payload: WorkerRecievePayload) {
switch (payload.op) {
case WorkerRecievePayloadOp.Connected: {
const resolve = this.connectPromises.get(payload.shardId)!;
resolve();
this.connectPromises.get(payload.shardId)?.();
this.connectPromises.delete(payload.shardId);
break;
}

case WorkerRecievePayloadOp.Destroyed: {
const resolve = this.destroyPromises.get(payload.shardId)!;
resolve();
this.destroyPromises.get(payload.shardId)?.();
this.destroyPromises.delete(payload.shardId);
break;
}
Expand Down Expand Up @@ -225,6 +254,12 @@ export class WorkerShardingStrategy implements IShardingStrategy {
worker.postMessage(response);
break;
}

case WorkerRecievePayloadOp.FetchStatusResponse: {
this.fetchStatusPromises.get(payload.nonce)?.(payload.status);
this.fetchStatusPromises.delete(payload.nonce);
break;
}
}
}
}
16 changes: 16 additions & 0 deletions packages/ws/src/strategies/sharding/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,21 @@ parentPort!
case WorkerSendPayloadOp.ShardCanIdentify: {
break;
}

case WorkerSendPayloadOp.FetchStatus: {
const shard = shards.get(payload.shardId);
if (!shard) {
throw new Error(`Shard ${payload.shardId} does not exist`);
}

const response = {
op: WorkerRecievePayloadOp.FetchStatusResponse,
status: shard.status,
nonce: payload.nonce,
} satisfies WorkerRecievePayload;

parentPort!.postMessage(response);
break;
}
}
});
6 changes: 5 additions & 1 deletion packages/ws/src/ws/WebSocketManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export class WebSocketManager extends AsyncEventEmitter<ManagerShardEventsMap> {
/**
* Strategy used to manage shards
*
* @defaultValue `SimpleManagerToShardStrategy`
* @defaultValue `SimpleShardingStrategy`
*/
private strategy: IShardingStrategy = new SimpleShardingStrategy(this);

Expand Down Expand Up @@ -300,4 +300,8 @@ export class WebSocketManager extends AsyncEventEmitter<ManagerShardEventsMap> {
public send(shardId: number, payload: GatewaySendPayload) {
return this.strategy.send(shardId, payload);
}

public fetchStatus() {
return this.strategy.fetchStatus();
}
}
39 changes: 26 additions & 13 deletions packages/ws/src/ws/WebSocketShard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export enum WebSocketShardEvents {
Closed = 'closed',
Debug = 'debug',
Dispatch = 'dispatch',
HeartbeatComplete = 'heartbeat',
Hello = 'hello',
Ready = 'ready',
Resumed = 'resumed',
Expand All @@ -54,10 +55,11 @@ export enum WebSocketShardDestroyRecovery {
export type WebSocketShardEventsMap = {
[WebSocketShardEvents.Closed]: [{ code: number }];
[WebSocketShardEvents.Debug]: [payload: { message: string }];
[WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }];
[WebSocketShardEvents.Hello]: [];
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }];
[WebSocketShardEvents.Resumed]: [];
[WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }];
[WebSocketShardEvents.HeartbeatComplete]: [payload: { ackAt: number; heartbeatAt: number; latency: number }];
};

export interface WebSocketShardDestroyOptions {
Expand Down Expand Up @@ -87,8 +89,6 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

private readonly textDecoder = new TextDecoder();

private status: WebSocketShardStatus = WebSocketShardStatus.Idle;

private replayedEvents = 0;

private isAck = true;
Expand All @@ -107,14 +107,20 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

public readonly strategy: IContextFetchingStrategy;

#status: WebSocketShardStatus = WebSocketShardStatus.Idle;

public get status(): WebSocketShardStatus {
return this.#status;
}

public constructor(strategy: IContextFetchingStrategy, id: number) {
super();
this.strategy = strategy;
this.id = id;
}

public async connect() {
if (this.status !== WebSocketShardStatus.Idle) {
if (this.#status !== WebSocketShardStatus.Idle) {
throw new Error("Tried to connect a shard that wasn't idle");
}

Expand Down Expand Up @@ -148,7 +154,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
connection.binaryType = 'arraybuffer';
this.connection = connection;

this.status = WebSocketShardStatus.Connecting;
this.#status = WebSocketShardStatus.Connecting;

this.sendRateLimitState = getInitialSendRateLimitState();

Expand All @@ -163,7 +169,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}

public async destroy(options: WebSocketShardDestroyOptions = {}) {
if (this.status === WebSocketShardStatus.Idle) {
if (this.#status === WebSocketShardStatus.Idle) {
this.debug(['Tried to destroy a shard that was idle']);
return;
}
Expand Down Expand Up @@ -221,7 +227,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']);
}

this.status = WebSocketShardStatus.Idle;
this.#status = WebSocketShardStatus.Idle;

if (options.recover !== undefined) {
return this.connect();
Expand All @@ -248,7 +254,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
throw new Error("WebSocketShard wasn't connected");
}

if (this.status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) {
if (this.#status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) {
this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']);
await once(this, WebSocketShardEvents.Ready);
}
Expand Down Expand Up @@ -320,12 +326,12 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
});

await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout);
this.status = WebSocketShardStatus.Ready;
this.#status = WebSocketShardStatus.Ready;
}

private async resume(session: SessionInfo) {
this.debug(['Resuming session']);
this.status = WebSocketShardStatus.Resuming;
this.#status = WebSocketShardStatus.Resuming;
this.replayedEvents = 0;
return this.send({
op: GatewayOpcodes.Resume,
Expand Down Expand Up @@ -420,7 +426,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

switch (payload.op) {
case GatewayOpcodes.Dispatch: {
if (this.status === WebSocketShardStatus.Resuming) {
if (this.#status === WebSocketShardStatus.Resuming) {
this.replayedEvents++;
}

Expand All @@ -442,7 +448,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}

case GatewayDispatchEvents.Resumed: {
this.status = WebSocketShardStatus.Ready;
this.#status = WebSocketShardStatus.Ready;
this.debug([`Resumed and replayed ${this.replayedEvents} events`]);
this.emit(WebSocketShardEvents.Resumed);
break;
Expand Down Expand Up @@ -502,7 +508,14 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

case GatewayOpcodes.HeartbeatAck: {
this.isAck = true;
this.debug([`Got heartbeat ack after ${Date.now() - this.lastHeartbeatAt}ms`]);

const ackAt = Date.now();
this.emit(WebSocketShardEvents.HeartbeatComplete, {
ackAt,
heartbeatAt: this.lastHeartbeatAt,
latency: ackAt - this.lastHeartbeatAt,
});

break;
}
}
Expand Down

0 comments on commit 0ff67d8

Please sign in to comment.