-
Notifications
You must be signed in to change notification settings - Fork 554
/
local_event_manager.ts
107 lines (91 loc) · 3.25 KB
/
local_event_manager.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os from 'node:os';
import log from '@apify/log';
import { betterClearInterval, betterSetInterval } from '@apify/utilities';
import { getMemoryInfo } from '@crawlee/utils';
import { EventManager, EventType } from './event_manager';
import type { SystemInfo } from '../autoscaling';
export class LocalEventManager extends EventManager {
private previousTicks = { idle: 0, total: 0 };
/**
* Initializes the EventManager and sets up periodic `systemInfo` and `persistState` events.
* This is automatically called at the beginning of `crawler.run()`.
*/
override async init() {
if (this.initialized) {
return;
}
await super.init();
const systemInfoIntervalMillis = this.config.get('systemInfoIntervalMillis')!;
this.emitSystemInfoEvent = this.emitSystemInfoEvent.bind(this);
this.intervals.systemInfo = betterSetInterval(this.emitSystemInfoEvent.bind(this), systemInfoIntervalMillis);
}
/**
* @inheritDoc
*/
override async close() {
if (!this.initialized) {
return;
}
await super.close();
betterClearInterval(this.intervals.systemInfo!);
}
/**
* @internal
*/
async emitSystemInfoEvent(intervalCallback: () => unknown) {
const info = await this.createSystemInfo({
maxUsedCpuRatio: this.config.get('maxUsedCpuRatio'),
});
this.events.emit(EventType.SYSTEM_INFO, info);
intervalCallback();
}
private getCurrentCpuTicks() {
const cpus = os.cpus();
return cpus.reduce((acc, cpu) => {
const cpuTimes = Object.values(cpu.times);
return {
idle: acc.idle + cpu.times.idle,
total: acc.total + cpuTimes.reduce((sum, num) => sum + num),
};
}, { idle: 0, total: 0 });
}
/**
* Creates a SystemInfo object based on local metrics.
*/
private async createSystemInfo(options: { maxUsedCpuRatio: number }) {
return {
createdAt: new Date(),
...this.createCpuInfo(options),
...await this.createMemoryInfo(),
} as SystemInfo;
}
private createCpuInfo(options: { maxUsedCpuRatio: number }) {
const ticks = this.getCurrentCpuTicks();
const idleTicksDelta = ticks.idle - this.previousTicks!.idle;
const totalTicksDelta = ticks.total - this.previousTicks!.total;
const usedCpuRatio = totalTicksDelta ? 1 - (idleTicksDelta / totalTicksDelta) : 0;
Object.assign(this.previousTicks, ticks);
return {
cpuCurrentUsage: usedCpuRatio * 100,
isCpuOverloaded: usedCpuRatio > options.maxUsedCpuRatio,
};
}
private async createMemoryInfo() {
try {
const memInfo = await this._getMemoryInfo();
const { mainProcessBytes, childProcessesBytes } = memInfo;
return {
memCurrentBytes: mainProcessBytes + childProcessesBytes,
};
} catch (err) {
log.exception(err as Error, 'Memory snapshot failed.');
return {};
}
}
/**
* Helper method for easier mocking.
*/
private async _getMemoryInfo() {
return getMemoryInfo();
}
}