-
Notifications
You must be signed in to change notification settings - Fork 2
/
distribute.ts
97 lines (92 loc) · 2.87 KB
/
distribute.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import { type Update, type UserFromGetMe } from "./deps.deno.ts";
import {
createThread,
type ModuleSpecifier,
type Thread,
} from "./platform.deno.ts";
class ThreadPool {
public readonly threads: Thread<Update, number>[] = [];
public readonly tasks = new Map<number, () => void>();
constructor(
specifier: ModuleSpecifier,
me: UserFromGetMe,
private readonly count = 4,
) {
for (let i = 0; i < count; i++) {
const thread = createThread<Update, number, UserFromGetMe>(
specifier,
me,
);
thread.onMessage((update_id) => {
const task = this.tasks.get(update_id);
task?.();
this.tasks.delete(update_id);
});
this.threads.push(thread);
}
}
async process(update: { update_id: number }) {
const i = update.update_id % this.count;
this.threads[i].postMessage(update);
await new Promise<void>((resolve) => {
this.tasks.set(update.update_id, resolve);
});
}
}
const workers = new Map<ModuleSpecifier, ThreadPool>();
function getWorker(
specifier: ModuleSpecifier,
me: UserFromGetMe,
count?: number,
) {
let worker = workers.get(specifier);
if (worker === undefined) {
worker = new ThreadPool(specifier, me, count);
workers.set(specifier, worker);
}
return worker;
}
/**
* Creates middleware that distributes updates across cores.
*
* This function should be used in combination with the `BotWorker` class.
* Create an instance of `BotWorker` in a separate file. Let's assume that this
* file is called `worker.ts`. This will define your actual bot logic.
*
* You can now do
*
* ```ts
* const bot = new Bot("");
*
* // Deno:
* bot.use(distribute(new URL("./worker.ts", import.meta.url)));
* // Node:
* bot.use(distribute(__dirname + "/worker"));
* ```
*
* in a central place to use the bot worker in `worker.ts` and send updates to
* it.
*
* Under the hood, `distribute` will create several web workers (Deno) or worker
* threads (Node) using `worker.ts`. Updates are distributed among them in a
* round-robin fashion.
*
* You can adjust the number of workers via `count` in an options object which
* is passed as a second argument, i.e. `distribute(specifier, { count: 8 })`.
* By default, 4 workers are created.
*
* @param specifier Module specifier to a file which creates a `BotWorker`
* @param options Further options to control the number of workers
*/
export function distribute<
C extends { update: { update_id: number }; me: UserFromGetMe },
>(
specifier: ModuleSpecifier,
options?: {
/** Number of workers to create */
count?: number;
},
) {
const count = options?.count;
return (ctx: C) => getWorker(specifier, ctx.me, count).process(ctx.update);
}