/
cluster.ts
96 lines (83 loc) · 2.53 KB
/
cluster.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import * as childProcess from 'child_process';
import * as path from 'path';
import * as process from 'process';
import { RpcProvider } from 'worker-rpc';
import { NormalizedMessage } from './NormalizedMessage';
import { Message } from './Message';
import { RunPayload, RunResult, RUN } from './RpcTypes';
// fork workers...
const division = parseInt(process.env.WORK_DIVISION || '', 10);
const workers: childProcess.ChildProcess[] = [];
for (let num = 0; num < division; num++) {
workers.push(
childProcess.fork(path.resolve(__dirname, './service.js'), [], {
execArgv: ['--max-old-space-size=' + process.env.MEMORY_LIMIT],
env: { ...process.env, WORK_NUMBER: num.toString() },
stdio: ['inherit', 'inherit', 'inherit', 'ipc']
})
);
}
// communication with parent process
const parentRpc = new RpcProvider(message => {
try {
process.send!(message);
} catch (e) {
// channel closed...
process.exit();
}
});
process.on('message', message => parentRpc.dispatch(message));
// communication with worker processes
const workerRpcs = workers.map(worker => {
const rpc = new RpcProvider(message => {
try {
worker.send(message);
} catch (e) {
// channel closed - something went wrong - close cluster...
process.exit();
}
});
worker.on('message', message => rpc.dispatch(message));
return rpc;
});
parentRpc.registerRpcHandler<RunPayload, RunResult>(RUN, async message => {
const workerResults = await Promise.all(
workerRpcs.map(workerRpc =>
workerRpc.rpc<RunPayload, RunResult>(RUN, message)
)
);
function workerFinished(
workerResult: (Message | undefined)[]
): workerResult is Message[] {
return workerResult.every(result => typeof result !== 'undefined');
}
if (!workerFinished(workerResults)) {
return undefined;
}
const merged: Message = workerResults.reduce(
(innerMerged: Message, innerResult: Message) => ({
diagnostics: innerMerged.diagnostics.concat(
innerResult.diagnostics.map(NormalizedMessage.createFromJSON)
),
lints: innerMerged.lints.concat(
innerResult.lints.map(NormalizedMessage.createFromJSON)
)
}),
{ diagnostics: [], lints: [] }
);
merged.diagnostics = NormalizedMessage.deduplicate(merged.diagnostics);
merged.lints = NormalizedMessage.deduplicate(merged.lints);
return merged;
});
process.on('SIGINT', () => {
process.exit();
});
process.on('exit', () => {
workers.forEach(worker => {
try {
worker.kill();
} catch (e) {
// do nothing...
}
});
});