diff --git a/cmd/poller/plugin/aggregator/aggregator.go b/cmd/poller/plugin/aggregator/aggregator.go index de9ba03cb..50cd201d7 100644 --- a/cmd/poller/plugin/aggregator/aggregator.go +++ b/cmd/poller/plugin/aggregator/aggregator.go @@ -29,7 +29,7 @@ type rule struct { checkRegex *regexp.Regexp includeLabels []string allLabels bool - counts map[string]map[string]int + counts map[string]map[string]float64 } func (a *Aggregator) Init() error { @@ -128,7 +128,7 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) { matrices[i].UUID += ".Aggregator" matrices[i].SetExportOptions(matrix.DefaultExportOptions()) matrices[i].SetExportable(true) - rule.counts = make(map[string]map[string]int) + rule.counts = make(map[string]map[string]float64) } // create instances and summarize metric values @@ -137,6 +137,8 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) { objName, objKey string objInstance *matrix.Instance objMetric *matrix.Metric + opsMetric *matrix.Metric + opsValue float64 value float64 ok bool err error @@ -182,7 +184,7 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) { a.Logger.Trace().Msgf("instance (%s= %s): formatted key [%s]", rule.label, objName, objKey) if objInstance = matrices[i].GetInstance(objKey); objInstance == nil { - rule.counts[objKey] = make(map[string]int) + rule.counts[objKey] = make(map[string]float64) if objInstance, err = matrices[i].NewInstance(objKey); err != nil { return nil, err } @@ -209,11 +211,28 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) { continue } - if err = objMetric.AddValueFloat64(objInstance, value); err != nil { - a.Logger.Error().Stack().Err(err).Msgf("add value [%s] [%s]:", key, objName) + // latency metric: weighted sum + if strings.Contains(key, "_latency") { + opsKey := objMetric.GetComment() + if opsMetric = data.GetMetric(opsKey); opsMetric == nil { + a.Logger.Warn().Msgf("metric [%s] not found in [%s] response", opsKey, rule.label) + continue + } + if opsValue, ok = opsMetric.GetValueFloat64(instance); !ok { + continue + } + if err = objMetric.AddValueFloat64(objInstance, opsValue*value); err != nil { + a.Logger.Error().Err(err).Msgf("add value [%s] [%s]:", key, objName) + continue + } + rule.counts[objKey][key] += opsValue + } else { + if err = objMetric.AddValueFloat64(objInstance, value); err != nil { + a.Logger.Error().Err(err).Msgf("add value [%s] [%s]:", key, objName) + continue + } + rule.counts[objKey][key]++ } - - rule.counts[objKey][key]++ } } } @@ -224,8 +243,8 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) { for mk, metric := range m.GetMetrics() { var ( - value float64 - count int + v float64 + count float64 ok, avg bool err error ) @@ -247,7 +266,7 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) { for key, instance := range m.GetInstances() { - if value, ok = metric.GetValueFloat64(instance); !ok { + if v, ok = metric.GetValueFloat64(instance); !ok { continue } @@ -255,7 +274,7 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) { continue } - if err = metric.SetValueFloat64(instance, value/float64(count)); err != nil { + if err = metric.SetValueFloat64(instance, v/count); err != nil { a.Logger.Error().Stack().Err(err).Msgf("set value [%s] [%s]:", mn, key) } } diff --git a/cmd/poller/plugin/aggregator/aggregator_test.go b/cmd/poller/plugin/aggregator/aggregator_test.go index 10efd3e03..9f5ff6ec7 100644 --- a/cmd/poller/plugin/aggregator/aggregator_test.go +++ b/cmd/poller/plugin/aggregator/aggregator_test.go @@ -12,6 +12,7 @@ import ( ) var p *Aggregator + var m *matrix.Matrix func TestInitPlugin(t *testing.T) { @@ -325,3 +326,106 @@ func TestComplexRuleRegex(t *testing.T) { t.Errorf("instance [%s] was added, however should not match regex", key) } } + +func TestRuleSimpleLatencyAggregation(t *testing.T) { + + params := node.NewS("Aggregator") + params.NewChildS("", "node") + + p.Params = params + + if err := p.Init(); err != nil { + t.Fatal(err) + } + + // create artificial data + m = matrix.New("", "", "") + var n *matrix.Matrix + + metricA, err := m.NewMetricUint8("read_latency") + if err != nil { + t.Fatal(err) + } + metricA.SetComment("total_read_ops") + metricA.SetProperty("average") + + metricB, err := m.NewMetricUint8("total_read_ops") + if err != nil { + t.Fatal(err) + } + metricB.SetProperty("rate") + + instanceA, err := m.NewInstance("InstanceA") + if err != nil { + t.Fatal(err) + } + instanceA.SetLabel("node", "nodeA") + + instanceB, err := m.NewInstance("InstanceB") + if err != nil { + t.Fatal(err) + } + instanceB.SetLabel("node", "nodeA") + + if err = metricA.SetValueUint8(instanceA, 20); err != nil { + t.Fatal(err) + } + + if err = metricB.SetValueUint8(instanceA, 4); err != nil { + t.Fatal(err) + } + + if err = metricA.SetValueUint8(instanceB, 30); err != nil { + t.Fatal(err) + } + + if err = metricB.SetValueUint8(instanceB, 6); err != nil { + t.Fatal(err) + } + + // run the plugin + results, err := p.Run(m) + if err != nil { + t.Fatal(err) + } + + if len(results) == 1 { + n = results[0] + } else { + t.Fatalf("Plugin output has %d matrices, 1 was expected\n", len(results)) + } + + // check aggregated values + + if len(n.GetInstances()) != 1 { + t.Fatalf("Number of instances is %d, 1 was expected\n", len(n.GetInstances())) + } + + if instanceA = n.GetInstance("nodeA"); instanceA == nil { + t.Fatal("Instance [nodeA] missing") + } + + if metricA = n.GetMetric("read_latency"); metricA == nil { + t.Fatal("Metric [read_latency] missing") + } + + if metricB = n.GetMetric("total_read_ops"); metricB == nil { + t.Fatal("Metric [total_read_ops] missing") + } + + if value, ok := metricA.GetValueUint8(instanceA); !ok { + t.Error("Value [read_latency] missing") + } else if value != 26 { + t.Errorf("Value [read_latency] = (%d) incorrect", value) + } else { + t.Logf("Value [read_latency] = (%d) correct!", value) + } + + if value, ok := metricB.GetValueUint8(instanceA); !ok { + t.Error("Value [total_read_ops] missing") + } else if value != 10 { + t.Errorf("Value [total_read_ops] = (%d) incorrect", value) + } else { + t.Logf("Value [total_read_ops] = (%d) correct!", value) + } +} diff --git a/grafana/dashboards/cmode/node.json b/grafana/dashboards/cmode/node.json index fa871f3b2..be24606c9 100644 --- a/grafana/dashboards/cmode/node.json +++ b/grafana/dashboards/cmode/node.json @@ -71,7 +71,7 @@ "gnetId": null, "graphTooltip": 1, "id": null, - "iteration": 1663832272520, + "iteration": 1674825410278, "links": [ { "asDropdown": true, @@ -1659,14 +1659,10 @@ { "color": "green", "value": null - }, - { - "color": "red", - "value": 80 } ] }, - "unit": "µs" + "unit": "ms" }, "overrides": [] }, @@ -1695,21 +1691,21 @@ "targets": [ { "exemplar": false, - "expr": "avg(volume_read_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"})", + "expr": "avg(node_volume_read_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"}) / 1000", "interval": "", "legendFormat": "Read", "refId": "A" }, { "exemplar": false, - "expr": "avg(volume_write_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"})", + "expr": "avg(node_volume_write_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"}) / 1000", "interval": "", "legendFormat": "Write", "refId": "B" }, { "exemplar": false, - "expr": "avg(volume_other_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"})", + "expr": "avg(node_volume_other_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"}) / 1000", "interval": "", "legendFormat": "Other", "refId": "C" @@ -4901,5 +4897,5 @@ "timezone": "", "title": "ONTAP: Node", "uid": "", - "version": 4 -} + "version": 5 +} \ No newline at end of file