-
Notifications
You must be signed in to change notification settings - Fork 2
/
manager.go
391 lines (312 loc) · 12.3 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
/*
* Copyright 2020, 2021, 2022 Hewlett Packard Enterprise Development LP
* Other additional copyright holders may be indicated within.
*
* The entirety of this work is licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package telemetry
import (
"fmt"
"time"
"github.com/senseyeio/duration"
ec "github.com/NearNodeFlash/nnf-ec/pkg/ec"
sf "github.com/NearNodeFlash/nnf-ec/pkg/rfsf/pkg/models"
)
var TelemetryManager = manager{}
type manager struct {
metrics []metric
nextReportTime time.Time // The next time at which the manager will wake and perform a measurement of the necessary metrics
timer *time.Timer // Timer used to wake the manager when necessary
}
// Metric Definition contains the definition, metadata, or characteristics for a metric.
// It contains links to the metric properties to which the definition applies.
type MetricDefinition = sf.MetricDefinitionV110MetricDefinition
// Metric Report Definition specifies the metric reports that are generated.
type MetricReportDefinition = sf.MetricReportDefinitionV133MetricReportDefinition
// Metric Report contains the readings and results of a Metric Report Definition.
type MetricReport = sf.MetricReportV140MetricReport
// Metric Report Value defines the metric data reported from the metric
type MetricReportValue = sf.MetricReportV140MetricValue
// Metric Report Generator defines the function interface for recording a metric
// based on the metric report definition.
type MetricReportGenerator func(*MetricReportDefinition) ([]MetricReportValue, error)
const (
DefaultReportDuration time.Duration = time.Minute * 2
)
// Initialize the Telemetry Manager. The manager is responsible for periodically gathering
// metrics that have been registered through calls to RegisterMetric() and have a Periodic
// metric definition type.
func (m *manager) Initialize() error {
m.timer = time.NewTimer(DefaultReportDuration)
go m.run()
return nil
}
// Register Metric allows users of the Telemetry Manager to create a new Metric that will be managed and
// reported by the Telemetry Manager.
func (m *manager) RegisterMetric(definition *MetricDefinition, reportDefinition *MetricReportDefinition, generator MetricReportGenerator) error {
id := definition.Id
if len(id) == 0 {
return fmt.Errorf("metric id is missing. specify a unique id in the metric definition")
} else if metric := findMetric(id); metric != nil {
for idx := range m.metrics {
if m.metrics[idx].id == metric.id {
m.metrics = append(m.metrics[:idx], m.metrics[idx+1:]...)
break
}
}
}
name := definition.Name
if len(name) == 0 {
return fmt.Errorf("metric %s has no name. Provide a descriptive name for this metric", id)
}
now := time.Now()
createReportDefinitionWildcards := func(d *MetricDefinition) []sf.MetricReportDefinitionV133Wildcard {
wildcards := make([]sf.MetricReportDefinitionV133Wildcard, len(d.Wildcards))
for idx, wc := range d.Wildcards {
wildcards[idx] = sf.MetricReportDefinitionV133Wildcard{
Name: wc.Name,
Values: wc.Values,
}
}
return wildcards
}
// Since we're using deep-copies of the definitions and reports, we need to initialize
// some standard redfish values in all the data structures
definition.OdataId = fmt.Sprintf("/redfish/v1/TelemetryService/MetricDefinitions/%s", id)
definition.OdataType = "##MetricDefinition.v1_0_0.MetricDefinition"
reportDefinition.Id = id
reportDefinition.Name = fmt.Sprintf("%s Report Definition", name)
reportDefinition.OdataId = fmt.Sprintf("/redfish/v1/TelemetryService/MetricReportDefinitions/%s", id)
reportDefinition.OdataType = "#MetricReportDefinition.v1_3_3.MetricReportDefinition"
reportDefinition.MetricReport = sf.OdataV4IdRef{OdataId: fmt.Sprintf("/redfish/v1/TelemetryService/MetricReports/%s", id)}
reportDefinition.Wildcards = createReportDefinitionWildcards(definition)
reportDefinition.MetricProperties = definition.MetricProperties
metric := metric{
id: id,
definition: definition,
reportDefinition: reportDefinition,
generator: generator,
report: sf.MetricReportV140MetricReport{
Id: id,
Name: fmt.Sprintf("%s Report", name),
OdataId: fmt.Sprintf("/redfish/v1/TelemetryService/MetricReports/%s", id),
OdataType: "#MetricReport.v1_4_0.MetricReport",
MetricReportDefinition: sf.OdataV4IdRef{OdataId: reportDefinition.OdataId},
MetricValues: make([]sf.MetricReportV140MetricValue, 0, reportDefinition.AppendLimit),
Timestamp: &now,
},
}
// Compute the first report time for this metric
metric.nextReportTime = metric.computeNextReportTime(now)
TelemetryManager.metrics = append(TelemetryManager.metrics, metric)
// If this metric arrives before our current expiration time, adjust the
// metric timer to expire at this metrics desired time.
if !metric.nextReportTime.IsZero() && metric.nextReportTime.Before(m.nextReportTime) {
m.timer.Reset(metric.nextReportTime.Sub(now))
}
return nil
}
// Run is meant to capture metrics that have been registered with the Telemetry Manager through calls
// to RegisterMetric(). While run executes, it will periodically wake up to record metrics per their
// defined Metric Definition Type and Schedule.
func (m *manager) run() {
for {
select {
case currentTime := <-m.timer.C:
nextReportTime := currentTime.Add(DefaultReportDuration)
for idx := range m.metrics {
metric := &m.metrics[idx]
if metric.nextReportTime.IsZero() {
continue
}
if currentTime.After(metric.nextReportTime) {
metric.lastReportTime = currentTime
metric.nextReportTime = metric.computeNextReportTime(currentTime)
if metric.reportDefinition.MetricReportDefinitionEnabled {
// Generate the metric report and handle the results
values, err := metric.generator(metric.reportDefinition)
if err != nil {
metric.reportDefinition.Status.Health = sf.CRITICAL_RH
} else {
metric.record(values)
}
}
}
if metric.nextReportTime.Before(nextReportTime) {
nextReportTime = metric.nextReportTime
}
}
// Reset our timer to expire at the next metric time
m.nextReportTime = nextReportTime
m.timer.Reset(nextReportTime.Sub(currentTime))
}
}
}
type metric struct {
id string
definition *MetricDefinition
reportDefinition *MetricReportDefinition
report MetricReport
generator MetricReportGenerator
lastReportTime time.Time
nextReportTime time.Time
}
func findMetric(id string) *metric {
for idx := range TelemetryManager.metrics {
if TelemetryManager.metrics[idx].id == id {
return &TelemetryManager.metrics[idx]
}
}
return nil
}
func (metric *metric) computeNextReportTime(current time.Time) time.Time {
switch metric.reportDefinition.MetricReportDefinitionType {
case sf.PERIODIC_MRDV133MRDT:
d, err := duration.ParseISO8601(metric.reportDefinition.Schedule.RecurrenceInterval)
if err == nil {
return d.Shift(current)
}
case sf.ON_REQUEST_MRDV133MRDT:
return time.Time{} // WARNING: time.Unix(0, 0) does not resolve to IsZero()
}
return current.Add(DefaultReportDuration)
}
func (metric *metric) record(vals []MetricReportValue) {
definition := metric.reportDefinition
report := &metric.report
// When a metric report is updated - overwrite the entire report
if definition.ReportUpdates == sf.OVERWRITE_MRDV133RUE {
report.MetricValues = vals
return
}
// The metric report is generated when the metric values change. Record the changed
// values, and capture any new values that need to be appended
if definition.MetricReportDefinitionType == sf.ON_CHANGE_MRDV133MRDT {
vals = metric.recordUnchanged(vals)
}
remaining := int(definition.AppendLimit) - len(report.MetricValues)
switch definition.ReportUpdates {
case sf.OVERWRITE_MRDV133RUE:
panic("this case should be handled prior on-change events are recorded")
case sf.APPEND_STOPS_WHEN_FULL_MRDV133RUE:
if remaining <= 0 {
// Full - do nothing
} else if len(vals) < remaining {
metric.append(vals[:remaining])
} else {
metric.append(vals)
}
case sf.APPEND_WRAPS_WHEN_FULL_MRDV133RUE:
if len(vals) < int(definition.AppendLimit) { // special case where we're capturing more than the allowed limit
report.MetricValues = vals[:definition.AppendLimit]
} else if remaining >= len(vals) {
metric.append(vals)
} else if remaining < 0 {
report.MetricValues = report.MetricValues[:len(report.MetricValues)+remaining]
metric.append(vals)
}
case sf.NEW_REPORT_MRDV133RUE:
// TODO: I'm not sure we're going to want to do this
default:
return
}
}
// Record any unchanged metrics in the supplied values and return the list of report values
// that have changed since the last
func (metric *metric) recordUnchanged(newVals []MetricReportValue) []MetricReportValue {
changedVals := make([]MetricReportValue, 0)
for _, newVal := range newVals {
valChanged := true
for curIdx := range metric.report.MetricValues {
curVal := metric.report.MetricValues[len(metric.report.MetricValues)-curIdx-1]
if curVal.MetricProperty == newVal.MetricProperty {
if curVal.MetricValue == newVal.MetricProperty {
valChanged = false
}
}
}
if valChanged {
changedVals = append(changedVals, newVal)
}
}
return changedVals
}
func (metric *metric) append(vals []MetricReportValue) {
metric.report.MetricValues = append(metric.report.MetricValues, vals...)
}
func Get(model *sf.TelemetryServiceV121TelemetryService) error {
model.MetricDefinitions = sf.OdataV4IdRef{OdataId: "/redfish/v1/TelemetryService/MetricDefinitions"}
model.MetricReportDefinitions = sf.OdataV4IdRef{OdataId: "/redfish/v1/TelemetryService/MetricReportDefinitions"}
model.MetricReports = sf.OdataV4IdRef{OdataId: "/redfish/v1/TelemetryService/MetricReports"}
return nil
}
func MetricDefinitionsGet(model *sf.MetricDefinitionCollectionMetricDefinitionCollection) error {
model.MembersodataCount = int64(len(TelemetryManager.metrics))
model.Members = make([]sf.OdataV4IdRef, model.MembersodataCount)
for idx := range TelemetryManager.metrics {
def := TelemetryManager.metrics[idx].definition
model.Members[idx] = sf.OdataV4IdRef{OdataId: def.OdataId}
}
return nil
}
func MetricDefinitionIdGet(model *sf.MetricDefinitionV110MetricDefinition, id string) error {
m := findMetric(id)
if m == nil {
return ec.NewErrNotFound()
}
*model = *m.definition
return nil
}
func MetricReportDefinitionsGet(model *sf.MetricReportDefinitionCollectionMetricReportDefinitionCollection) error {
model.MembersodataCount = int64(len(TelemetryManager.metrics))
model.Members = make([]sf.OdataV4IdRef, model.MembersodataCount)
for idx := range TelemetryManager.metrics {
def := TelemetryManager.metrics[idx].reportDefinition
model.Members[idx] = sf.OdataV4IdRef{OdataId: def.OdataId}
}
return nil
}
func MetricReportDefinitionIdGet(model *sf.MetricReportDefinitionV133MetricReportDefinition, id string) error {
m := findMetric(id)
if m == nil {
return ec.NewErrNotFound()
}
*model = *m.reportDefinition
return nil
}
func MetricReportsGet(model *sf.MetricReportCollectionMetricReportCollection) error {
model.MembersodataCount = int64(len(TelemetryManager.metrics))
model.Members = make([]sf.OdataV4IdRef, model.MembersodataCount)
for idx := range TelemetryManager.metrics {
rep := &TelemetryManager.metrics[idx].report
model.Members[idx] = sf.OdataV4IdRef{OdataId: rep.OdataId}
}
return nil
}
func MetricReportIdGet(model *sf.MetricReportV140MetricReport, id string) error {
m := findMetric(id)
if m == nil {
return ec.NewErrNotFound()
}
if m.reportDefinition.MetricReportDefinitionType == sf.ON_REQUEST_MRDV133MRDT {
values, err := m.generator(m.reportDefinition)
if err != nil {
return ec.NewErrInternalServerError().WithError(err).WithCause("Failed to generate metric report")
}
m.record(values)
}
*model = m.report
return nil
}