-
Notifications
You must be signed in to change notification settings - Fork 121
/
ClusterManager.ts
116 lines (103 loc) · 3.29 KB
/
ClusterManager.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import type { Worker } from 'cluster';
import cluster from 'cluster';
import { cpus } from 'os';
import { getLoggerFor } from '../../logging/LogUtil';
import { InternalServerError } from '../../util/errors/InternalServerError';
/**
* Different cluster modes.
*/
enum ClusterMode {
/** Scales in relation to `core_count`. */
autoScale,
/** Single threaded mode, no clustering */
singleThreaded,
/** Fixed amount of workers being forked. (limited to core_count) */
fixed
}
/**
* Convert workers amount to {@link ClusterMode}
* @param workers - Amount of workers
* @returns ClusterMode enum value
*/
function toClusterMode(workers: number): ClusterMode {
if (workers <= 0) {
return ClusterMode.autoScale;
}
if (workers === 1) {
return ClusterMode.singleThreaded;
}
return ClusterMode.fixed;
}
/**
* This class is responsible for deciding how many affective workers are needed.
* It also contains the logic for respawning workers when they are killed by the os.
*
* The workers values are interpreted as follows:
* value | actual workers |
* ------|--------------|
* `-m` | `num_cores - m` workers _(autoscale)_ (`m < num_cores`) |
* `-1` | `num_cores - 1` workers _(autoscale)_ |
* `0` | `num_cores` workers _(autoscale)_ |
* `1` | `single threaded mode` _(default)_ |
* `n` | `n` workers |
*/
export class ClusterManager {
private readonly logger = getLoggerFor(this);
private readonly workers: number;
private readonly clusterMode: ClusterMode;
public constructor(workers: number) {
const cores = cpus().length;
if (workers <= -cores) {
throw new InternalServerError('Invalid workers value (should be in the interval ]-num_cores, +∞).');
}
this.workers = toClusterMode(workers) === ClusterMode.autoScale ? cores + workers : workers;
this.clusterMode = toClusterMode(this.workers);
}
/**
* Spawn all required workers.
*/
public spawnWorkers(): void {
let counter = 0;
this.logger.info(`Setting up ${this.workers} workers`);
for (let i = 0; i < this.workers; i++) {
cluster.fork().on('message', (msg: string): void => {
this.logger.info(msg);
});
}
cluster.on('online', (worker: Worker): void => {
this.logger.info(`Worker ${worker.process.pid} is listening`);
counter += 1;
if (counter === this.workers) {
this.logger.info(`All ${this.workers} requested workers have been started.`);
}
});
cluster.on('exit', (worker: Worker, code: number, signal: string): void => {
this.logger.warn(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
this.logger.warn('Starting a new worker');
cluster.fork().on('message', (msg: string): void => {
this.logger.info(msg);
});
});
}
/**
* Check whether the CSS server was booted in single threaded mode.
* @returns True is single threaded.
*/
public isSingleThreaded(): boolean {
return this.clusterMode === ClusterMode.singleThreaded;
}
/**
* Whether the calling process is the primary process.
* @returns True if primary
*/
public isPrimary(): boolean {
return cluster.isMaster;
}
/**
* Whether the calling process is a worker process.
* @returns True if worker
*/
public isWorker(): boolean {
return cluster.isWorker;
}
}