Skip to content

Commit

Permalink
feat: weighted avg support in aggregator plugin (#1672)
Browse files Browse the repository at this point in the history
* feat: add weighted avg support in aggregator plugin

* feat: dashboard changes

* feat: aggregator changes

* feat: add tests and address review comments
  • Loading branch information
rahulguptajss committed Jan 30, 2023
1 parent 88bc5a2 commit 5448a77
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 22 deletions.
41 changes: 30 additions & 11 deletions cmd/poller/plugin/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type rule struct {
checkRegex *regexp.Regexp
includeLabels []string
allLabels bool
counts map[string]map[string]int
counts map[string]map[string]float64
}

func (a *Aggregator) Init() error {
Expand Down Expand Up @@ -128,7 +128,7 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {
matrices[i].UUID += ".Aggregator"
matrices[i].SetExportOptions(matrix.DefaultExportOptions())
matrices[i].SetExportable(true)
rule.counts = make(map[string]map[string]int)
rule.counts = make(map[string]map[string]float64)
}

// create instances and summarize metric values
Expand All @@ -137,6 +137,8 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {
objName, objKey string
objInstance *matrix.Instance
objMetric *matrix.Metric
opsMetric *matrix.Metric
opsValue float64
value float64
ok bool
err error
Expand Down Expand Up @@ -182,7 +184,7 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {
a.Logger.Trace().Msgf("instance (%s= %s): formatted key [%s]", rule.label, objName, objKey)

if objInstance = matrices[i].GetInstance(objKey); objInstance == nil {
rule.counts[objKey] = make(map[string]int)
rule.counts[objKey] = make(map[string]float64)
if objInstance, err = matrices[i].NewInstance(objKey); err != nil {
return nil, err
}
Expand All @@ -209,11 +211,28 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {
continue
}

if err = objMetric.AddValueFloat64(objInstance, value); err != nil {
a.Logger.Error().Stack().Err(err).Msgf("add value [%s] [%s]:", key, objName)
// latency metric: weighted sum
if strings.Contains(key, "_latency") {
opsKey := objMetric.GetComment()
if opsMetric = data.GetMetric(opsKey); opsMetric == nil {
a.Logger.Warn().Msgf("metric [%s] not found in [%s] response", opsKey, rule.label)
continue
}
if opsValue, ok = opsMetric.GetValueFloat64(instance); !ok {
continue
}
if err = objMetric.AddValueFloat64(objInstance, opsValue*value); err != nil {
a.Logger.Error().Err(err).Msgf("add value [%s] [%s]:", key, objName)
continue
}
rule.counts[objKey][key] += opsValue
} else {
if err = objMetric.AddValueFloat64(objInstance, value); err != nil {
a.Logger.Error().Err(err).Msgf("add value [%s] [%s]:", key, objName)
continue
}
rule.counts[objKey][key]++
}

rule.counts[objKey][key]++
}
}
}
Expand All @@ -224,8 +243,8 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {
for mk, metric := range m.GetMetrics() {

var (
value float64
count int
v float64
count float64
ok, avg bool
err error
)
Expand All @@ -247,15 +266,15 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {

for key, instance := range m.GetInstances() {

if value, ok = metric.GetValueFloat64(instance); !ok {
if v, ok = metric.GetValueFloat64(instance); !ok {
continue
}

if count, ok = a.rules[i].counts[key][mk]; !ok {
continue
}

if err = metric.SetValueFloat64(instance, value/float64(count)); err != nil {
if err = metric.SetValueFloat64(instance, v/count); err != nil {
a.Logger.Error().Stack().Err(err).Msgf("set value [%s] [%s]:", mn, key)
}
}
Expand Down
104 changes: 104 additions & 0 deletions cmd/poller/plugin/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

var p *Aggregator

var m *matrix.Matrix

func TestInitPlugin(t *testing.T) {
Expand Down Expand Up @@ -325,3 +326,106 @@ func TestComplexRuleRegex(t *testing.T) {
t.Errorf("instance [%s] was added, however should not match regex", key)
}
}

func TestRuleSimpleLatencyAggregation(t *testing.T) {

params := node.NewS("Aggregator")
params.NewChildS("", "node")

p.Params = params

if err := p.Init(); err != nil {
t.Fatal(err)
}

// create artificial data
m = matrix.New("", "", "")
var n *matrix.Matrix

metricA, err := m.NewMetricUint8("read_latency")
if err != nil {
t.Fatal(err)
}
metricA.SetComment("total_read_ops")
metricA.SetProperty("average")

metricB, err := m.NewMetricUint8("total_read_ops")
if err != nil {
t.Fatal(err)
}
metricB.SetProperty("rate")

instanceA, err := m.NewInstance("InstanceA")
if err != nil {
t.Fatal(err)
}
instanceA.SetLabel("node", "nodeA")

instanceB, err := m.NewInstance("InstanceB")
if err != nil {
t.Fatal(err)
}
instanceB.SetLabel("node", "nodeA")

if err = metricA.SetValueUint8(instanceA, 20); err != nil {
t.Fatal(err)
}

if err = metricB.SetValueUint8(instanceA, 4); err != nil {
t.Fatal(err)
}

if err = metricA.SetValueUint8(instanceB, 30); err != nil {
t.Fatal(err)
}

if err = metricB.SetValueUint8(instanceB, 6); err != nil {
t.Fatal(err)
}

// run the plugin
results, err := p.Run(m)
if err != nil {
t.Fatal(err)
}

if len(results) == 1 {
n = results[0]
} else {
t.Fatalf("Plugin output has %d matrices, 1 was expected\n", len(results))
}

// check aggregated values

if len(n.GetInstances()) != 1 {
t.Fatalf("Number of instances is %d, 1 was expected\n", len(n.GetInstances()))
}

if instanceA = n.GetInstance("nodeA"); instanceA == nil {
t.Fatal("Instance [nodeA] missing")
}

if metricA = n.GetMetric("read_latency"); metricA == nil {
t.Fatal("Metric [read_latency] missing")
}

if metricB = n.GetMetric("total_read_ops"); metricB == nil {
t.Fatal("Metric [total_read_ops] missing")
}

if value, ok := metricA.GetValueUint8(instanceA); !ok {
t.Error("Value [read_latency] missing")
} else if value != 26 {
t.Errorf("Value [read_latency] = (%d) incorrect", value)
} else {
t.Logf("Value [read_latency] = (%d) correct!", value)
}

if value, ok := metricB.GetValueUint8(instanceA); !ok {
t.Error("Value [total_read_ops] missing")
} else if value != 10 {
t.Errorf("Value [total_read_ops] = (%d) incorrect", value)
} else {
t.Logf("Value [total_read_ops] = (%d) correct!", value)
}
}
18 changes: 7 additions & 11 deletions grafana/dashboards/cmode/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"gnetId": null,
"graphTooltip": 1,
"id": null,
"iteration": 1663832272520,
"iteration": 1674825410278,
"links": [
{
"asDropdown": true,
Expand Down Expand Up @@ -1659,14 +1659,10 @@
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "µs"
"unit": "ms"
},
"overrides": []
},
Expand Down Expand Up @@ -1695,21 +1691,21 @@
"targets": [
{
"exemplar": false,
"expr": "avg(volume_read_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"})",
"expr": "avg(node_volume_read_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"}) / 1000",
"interval": "",
"legendFormat": "Read",
"refId": "A"
},
{
"exemplar": false,
"expr": "avg(volume_write_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"})",
"expr": "avg(node_volume_write_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"}) / 1000",
"interval": "",
"legendFormat": "Write",
"refId": "B"
},
{
"exemplar": false,
"expr": "avg(volume_other_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"})",
"expr": "avg(node_volume_other_latency{datacenter=~\"$Datacenter\",cluster=~\"$Cluster\",node=~\"$Node\"}) / 1000",
"interval": "",
"legendFormat": "Other",
"refId": "C"
Expand Down Expand Up @@ -4901,5 +4897,5 @@
"timezone": "",
"title": "ONTAP: Node",
"uid": "",
"version": 4
}
"version": 5
}

0 comments on commit 5448a77

Please sign in to comment.