-
Notifications
You must be signed in to change notification settings - Fork 139
/
queues.ts
116 lines (96 loc) · 2.26 KB
/
queues.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import { makeRequester, Requester } from './http_req';
import { BaseMetric } from './metrics';
import { IOptions } from './options';
import { hasTdigest, TDigestStatGroups } from './tdshared';
const FLUSH_INTERVAL = 15000; // 15 seconds
interface IQueueKey {
queue: string;
time: Date;
}
export class QueueMetric extends BaseMetric {
queue: string;
constructor(queue: string) {
super();
this.queue = queue;
this.startTime = new Date();
}
}
export class QueuesStats {
_opt: IOptions;
_url: string;
_requester: Requester;
_m: { [key: string]: TDigestStatGroups } = {};
_timer;
constructor(opt: IOptions) {
this._opt = opt;
this._url = `${opt.host}/api/v5/projects/${opt.projectId}/queues-stats?key=${opt.projectKey}`;
this._requester = makeRequester(opt);
}
notify(q: QueueMetric): void {
if (!hasTdigest) {
return;
}
if (!this._opt.performanceStats) {
return;
}
let ms = q._duration();
if (ms === 0) {
ms = 0.00001;
}
const minute = 60 * 1000;
let startTime = new Date(
Math.floor(q.startTime.getTime() / minute) * minute
);
let key: IQueueKey = {
queue: q.queue,
time: startTime,
};
let keyStr = JSON.stringify(key);
let stat = this._m[keyStr];
if (!stat) {
stat = new TDigestStatGroups();
this._m[keyStr] = stat;
}
stat.addGroups(ms, q._groups);
if (this._timer) {
return;
}
this._timer = setTimeout(() => {
this._flush();
}, FLUSH_INTERVAL);
}
_flush(): void {
let queues = [];
for (let keyStr in this._m) {
if (!this._m.hasOwnProperty(keyStr)) {
continue;
}
let key: IQueueKey = JSON.parse(keyStr);
let v = {
...key,
...this._m[keyStr].toJSON(),
};
queues.push(v);
}
this._m = {};
this._timer = null;
let outJSON = JSON.stringify({
environment: this._opt.environment,
queues,
});
let req = {
method: 'POST',
url: this._url,
body: outJSON,
};
this._requester(req)
.then((_resp) => {
// nothing
})
.catch((err) => {
if (console.error) {
console.error('can not report queues breakdowns', err);
}
});
}
}