Skip to content

Commit

Permalink
feat(ws): support for custom worker messaging (#10241)
Browse files Browse the repository at this point in the history
  • Loading branch information
didinele committed May 3, 2024
1 parent 6cf094c commit 728164e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
8 changes: 8 additions & 0 deletions packages/ws/README.md
Expand Up @@ -132,6 +132,10 @@ const manager = new WebSocketManager({
new WorkerShardingStrategy(manager, {
shardsPerWorker: 2,
workerPath: './worker.js',
// Optionally, if you you have custom messaging, like for analytic collection, you can use this:
async unknownPayloadHandler(data: any) {
// handle data here :3
},
}),
});
```
Expand All @@ -140,6 +144,7 @@ And your `worker.ts` file:

```ts
import { WorkerBootstrapper, WebSocketShardEvents } from '@discordjs/ws';
import { parentPort } from 'node:worker_threads';

const bootstrapper = new WorkerBootstrapper();
void bootstrapper.bootstrap({
Expand All @@ -158,6 +163,9 @@ void bootstrapper.bootstrap({
});
},
});

// This will go to `unknownPayloadHandler` in the main thread, or be ignored if not provided
parentPort!.postMessage({ custom: 'data' });
```

## Links
Expand Down
17 changes: 16 additions & 1 deletion packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts
Expand Up @@ -66,6 +66,10 @@ export interface WorkerShardingStrategyOptions {
* Dictates how many shards should be spawned per worker thread.
*/
shardsPerWorker: number | 'all';
/**
* Handles a payload not recognized by the handler.
*/
unknownPayloadHandler?(payload: any): unknown;
/**
* Path to the worker file to use. The worker requires quite a bit of setup, it is recommended you leverage the {@link WorkerBootstrapper} class.
*/
Expand Down Expand Up @@ -225,7 +229,13 @@ export class WorkerShardingStrategy implements IShardingStrategy {
.on('messageerror', (err) => {
throw err;
})
.on('message', async (payload: WorkerReceivePayload) => this.onMessage(worker, payload));
.on('message', async (payload: any) => {
if ('op' in payload) {
await this.onMessage(worker, payload);
} else {
await this.options.unknownPayloadHandler?.(payload);
}
});

this.#workers.push(worker);
for (const shardId of workerData.shardIds) {
Expand Down Expand Up @@ -347,6 +357,11 @@ export class WorkerShardingStrategy implements IShardingStrategy {

break;
}

default: {
await this.options.unknownPayloadHandler?.(payload);
break;
}
}
}

Expand Down

0 comments on commit 728164e

Please sign in to comment.