forked from openconfig/gnmi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
latency.go
352 lines (318 loc) · 9.44 KB
/
latency.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
/*
Copyright 2021 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package latency supports exporting latency stats (avg/max/min) of a set
// of time windows as metadata.
package latency
import (
"fmt"
"sync"
"time"
"github.com/openconfig/gnmi/metadata"
)
const (
// ElemLatency is the container of all latency metadata about a target.
ElemLatency = "latency"
// ElemWindow contains latency metadatas (avg, max, min) of a particular
// window size.
ElemWindow = "window"
// ElemAvg is the average latency during a time window.
ElemAvg = "avg"
// ElemMax is the maximum latency during a time window.
ElemMax = "max"
// ElemMin is the minimum latency during a time window.
ElemMin = "min"
metaName = "LatencyWindow"
)
var now = time.Now
// StatType is the type of latency statistics supported for a time window.
type StatType int
const (
// Avg is the average latency of a time window.
Avg = StatType(iota)
// Max is the maximum latency of a time window.
Max
// Min is the minimum latency of a time window.
Min
)
// CompactDurationString returns a compact string for a time window d. It
// removes unnecessary suffixes like "0m0s" and "0s" from the Golang
// fmt.Sprint generated string of a time.Duration.
func CompactDurationString(d time.Duration) string {
s := fmt.Sprint(d)
switch n := len(s); {
case n >= 6 && s[n-5:] == "h0m0s":
return s[:n-4]
case n >= 4 && s[n-3:] == "m0s":
return s[:n-2]
}
return s
}
// String returns the string representation of the StatType.
func (st StatType) String() string {
switch st {
case Avg:
return ElemAvg
case Max:
return ElemMax
case Min:
return ElemMin
}
return "unknown"
}
// stat represents a latency statistic of a time window.
type stat struct {
window time.Duration // Window size.
typ StatType // Type of the latency stats.
}
// metaName returns the metadata name of the stat s.
func (s stat) metaName() string {
return fmt.Sprintf("%s%s%s", s.typ, metaName, CompactDurationString(s.window))
}
// metaPath returns the metadata path corresponding to the Stat s.
func (s stat) metaPath() []string {
return []string{metadata.Root, ElemLatency, ElemWindow, CompactDurationString(s.window), s.typ.String()}
}
// Path returns the metadata path for the latency statistics of window w
// and type typ.
func Path(w time.Duration, typ StatType) []string {
return stat{window: w, typ: typ}.metaPath()
}
// MetadataName returns the metadata name for the latency statistics
// of window w and type typ.
func MetadataName(w time.Duration, typ StatType) string {
return stat{window: w, typ: typ}.metaName()
}
type slot struct {
total time.Duration // cumulative latency of this time slot
max time.Duration // maximum latency of this time slot
min time.Duration // minimum latency of this time slot
count int64 // number of updates
start time.Time // the start time of the time slot
end time.Time // the end time of the time slot
}
type window struct {
stats map[string]func(string, *metadata.Metadata)
size time.Duration // window size
total time.Duration // cumulative latency of this time window
sf int64 // scaling factor of total
count int64 // number of updates
slots []*slot // time slots of latencies
covered bool // have received latencies covering a full window
}
func newWindow(size time.Duration, sf int64) *window {
w := &window{
stats: map[string]func(string, *metadata.Metadata){},
size: size,
sf: sf,
}
for st, f := range map[StatType]func(string, *metadata.Metadata){
Avg: w.setAvg,
Max: w.setMax,
Min: w.setMin} {
stat := stat{window: size, typ: st}
w.stats[stat.metaName()] = f
}
return w
}
func (w *window) add(ls *slot) {
if ls == nil || ls.count == 0 {
return
}
w.total = w.total + ls.total
w.count = w.count + ls.count
w.slots = append(w.slots, ls)
}
func (w *window) setAvg(name string, m *metadata.Metadata) {
if w.count == 0 {
return
}
avg := w.total / time.Duration(w.count)
if n := avg.Nanoseconds(); n != 0 {
m.SetInt(name, n*w.sf)
}
}
func (w *window) setMax(name string, m *metadata.Metadata) {
var max time.Duration
for _, slot := range w.slots {
if slot.max > max {
max = slot.max
}
}
if n := max.Nanoseconds(); n != 0 {
m.SetInt(name, n)
}
}
func (w *window) setMin(name string, m *metadata.Metadata) {
if len(w.slots) == 0 {
return
}
min := w.slots[0].min
for _, slot := range w.slots[1:] {
if slot.min < min {
min = slot.min
}
}
if n := min.Nanoseconds(); n != 0 {
m.SetInt(name, n)
}
}
func (w *window) slide(ts time.Time) {
cutoff := ts.Add(-w.size)
start := 0
for _, s := range w.slots {
if !s.end.After(cutoff) {
w.count = w.count - s.count
w.total = w.total - s.total
start++
}
}
w.slots = w.slots[start:]
}
func (w *window) isCovered(ts time.Time) bool {
if w.covered {
return true
}
if len(w.slots) == 0 { // no updates received
return false
}
if ts.Sub(w.slots[0].start) >= w.size {
w.covered = true
return true
}
return false
}
func (w *window) updateMeta(m *metadata.Metadata, ts time.Time) {
if !w.isCovered(ts) {
return
}
w.slide(ts)
for name, f := range w.stats {
f(name, m)
}
}
// Latency supports calculating and exporting latency stats for a specified
// set of time windows.
type Latency struct {
mu sync.Mutex
start time.Time // start time of the current batch of cumulated latency stats
scaleFactor time.Duration // scaling factor of totalDiff
totalDiff time.Duration // cumulative difference in timestamps from device
count int64 // number of updates in latency count
min time.Duration // minimum latency
max time.Duration // maximum latency
windows []*window
}
// Options contains the options for creating a Latency.
type Options struct {
// Precision for the avg stats. If unspecified, the precision is nanoseconds.
// The exported latency stats are always in nanoseconds no matter what
// precision is set here. Setting precision at a more coarse time duration
// than nanosecond is to avoid overflowing of int64 for the accumulated time
// durations needed to calculate averages. The precision of the Max and Min
// stats are not affected by this setting.
AvgPrecision time.Duration
}
// New returns a Latency object supporting latency stats for time windows
// specified in windowSizes.
func New(windowSizes []time.Duration, opts *Options) *Latency {
precision := time.Nanosecond
if opts != nil && opts.AvgPrecision.Nanoseconds() != 0 {
precision = opts.AvgPrecision
}
sf := precision / time.Nanosecond
var windows []*window
for _, size := range windowSizes {
windows = append(windows, newWindow(size, sf.Nanoseconds()))
}
return &Latency{windows: windows, scaleFactor: sf}
}
// Compute calculates the time difference between now and ts (the timestamp
// of an update) and updates the latency stats saved in Latency.
func (l *Latency) Compute(ts time.Time) {
l.mu.Lock()
defer l.mu.Unlock()
nowTime := now()
lat := nowTime.Sub(ts)
l.totalDiff += lat / l.scaleFactor
l.count++
if lat > l.max {
l.max = lat
}
if lat < l.min || l.min == 0 {
l.min = lat
}
if l.start.IsZero() {
l.start = nowTime
}
}
// UpdateReset use the latencies saved during the last interval to update
// the latency stats of all the supported time windows. And then it updates
// the corresponding stats in Metadata m.
// UpdateReset is expected to be called periodically at a fixed interval
// (e.g. 2s) of which the time windows should be multiples of this interval.
func (l *Latency) UpdateReset(m *metadata.Metadata) {
l.mu.Lock()
defer l.mu.Unlock()
ts := now()
defer func() {
for _, window := range l.windows {
window.updateMeta(m, ts)
}
l.start = ts
}()
if l.count == 0 {
return
}
s := &slot{
total: l.totalDiff,
count: l.count,
max: l.max,
min: l.min,
start: l.start,
end: ts,
}
for _, window := range l.windows {
window.add(s)
}
l.totalDiff = 0
l.count = 0
l.min = 0
l.max = 0
}
// RegisterMetadata registers latency stats metadata for time windows
// specified in windowSizes. RegisterMetadata is not thread-safe and
// should be called before any metadata.Metadata is instantiated.
func RegisterMetadata(windowSizes []time.Duration) {
for _, size := range windowSizes {
for _, typ := range []StatType{Avg, Max, Min} {
st := stat{window: size, typ: typ}
metadata.RegisterIntValue(st.metaName(), &metadata.IntValue{Path: st.metaPath()})
}
}
}
// ParseWindows parses the time durations of latency windows and verify they
// are multiples of the metadata update period.
func ParseWindows(tds []string, metaUpdatePeriod time.Duration) ([]time.Duration, error) {
var durs []time.Duration
for _, td := range tds {
dur, err := time.ParseDuration(td)
if err != nil {
return nil, fmt.Errorf("parsing %s: %v", td, err)
}
if dur.Nanoseconds()%metaUpdatePeriod.Nanoseconds() != 0 {
return nil, fmt.Errorf("latency stats window %s is not a multiple of metadata update period %v", td, metaUpdatePeriod)
}
durs = append(durs, dur)
}
return durs, nil
}