Skip to content

Commit

Permalink
fix: aggregating latency metrics returns NaN when base counter is 0 (#…
Browse files Browse the repository at this point in the history
…1781)

* fix: aggregating latency metrics returns NaN when base counter is 0

* fix: address review comments
  • Loading branch information
rahulguptajss committed Feb 27, 2023
1 parent 36d31ef commit 7488348
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 13 deletions.
34 changes: 21 additions & 13 deletions cmd/poller/plugin/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,20 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {
// latency metric: weighted sum
if strings.Contains(key, "_latency") {
opsKey := objMetric.GetComment()
if opsMetric = data.GetMetric(opsKey); opsMetric == nil {
a.Logger.Warn().Msgf("metric [%s] not found in [%s] response", opsKey, rule.label)
continue
}
if opsValue, ok = opsMetric.GetValueFloat64(instance); !ok {
continue
}
if err = objMetric.AddValueFloat64(objInstance, opsValue*value); err != nil {
a.Logger.Error().Err(err).Msgf("add value [%s] [%s]:", key, objName)
continue
if opsKey != "" {
if opsMetric = data.GetMetric(opsKey); opsMetric == nil {
a.Logger.Warn().Msgf("metric [%s] not found in [%s] response", opsKey, rule.label)
continue
}
if opsValue, ok = opsMetric.GetValueFloat64(instance); !ok {
continue
}
if err = objMetric.AddValueFloat64(objInstance, opsValue*value); err != nil {
a.Logger.Error().Err(err).Msgf("add value [%s] [%s]:", key, objName)
continue
}
rule.counts[objKey][key] += opsValue
}
rule.counts[objKey][key] += opsValue
} else {
if err = objMetric.AddValueFloat64(objInstance, value); err != nil {
a.Logger.Error().Err(err).Msgf("add value [%s] [%s]:", key, objName)
Expand Down Expand Up @@ -274,8 +276,14 @@ func (a *Aggregator) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {
continue
}

if err = metric.SetValueFloat64(instance, v/count); err != nil {
a.Logger.Error().Stack().Err(err).Msgf("set value [%s] [%s]:", mn, key)
// if no ops happened
if count == 0 {
err = metric.SetValueFloat64(instance, 0)
} else {
err = metric.SetValueFloat64(instance, v/count)
}
if err != nil {
a.Logger.Error().Err(err).Str("mn", mn).Str("key", key).Msg("set value")
}
}
}
Expand Down
103 changes: 103 additions & 0 deletions cmd/poller/plugin/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,106 @@ func TestRuleSimpleLatencyAggregation(t *testing.T) {
t.Logf("Value [total_read_ops] = (%d) correct!", value)
}
}

func TestRuleSimpleLatencyZeroAggregation(t *testing.T) {

params := node.NewS("Aggregator")
params.NewChildS("", "node")

p.Params = params

if err := p.Init(); err != nil {
t.Fatal(err)
}

// create artificial data
m = matrix.New("", "", "")
var n *matrix.Matrix

metricA, err := m.NewMetricUint8("read_latency")
if err != nil {
t.Fatal(err)
}
metricA.SetComment("total_read_ops")
metricA.SetProperty("average")

metricB, err := m.NewMetricUint8("total_read_ops")
if err != nil {
t.Fatal(err)
}
metricB.SetProperty("rate")

instanceA, err := m.NewInstance("InstanceA")
if err != nil {
t.Fatal(err)
}
instanceA.SetLabel("node", "nodeA")

instanceB, err := m.NewInstance("InstanceB")
if err != nil {
t.Fatal(err)
}
instanceB.SetLabel("node", "nodeA")

if err = metricA.SetValueUint8(instanceA, 20); err != nil {
t.Fatal(err)
}

if err = metricB.SetValueUint8(instanceA, 0); err != nil {
t.Fatal(err)
}

if err = metricA.SetValueUint8(instanceB, 21); err != nil {
t.Fatal(err)
}

if err = metricB.SetValueUint8(instanceB, 0); err != nil {
t.Fatal(err)
}

// run the plugin
results, err := p.Run(m)
if err != nil {
t.Fatal(err)
}

if len(results) == 1 {
n = results[0]
} else {
t.Fatalf("Plugin output has %d matrices, 1 was expected\n", len(results))
}

// check aggregated values

if len(n.GetInstances()) != 1 {
t.Fatalf("Number of instances is %d, 1 was expected\n", len(n.GetInstances()))
}

if instanceA = n.GetInstance("nodeA"); instanceA == nil {
t.Fatal("Instance [nodeA] missing")
}

if metricA = n.GetMetric("read_latency"); metricA == nil {
t.Fatal("Metric [read_latency] missing")
}

if metricB = n.GetMetric("total_read_ops"); metricB == nil {
t.Fatal("Metric [total_read_ops] missing")
}

if value, ok := metricA.GetValueUint8(instanceA); !ok {
t.Error("Value [read_latency] missing")
} else if value != 0 {
t.Errorf("Value [read_latency] = (%d) incorrect", value)
} else {
t.Logf("Value [read_latency] = (%d) correct!", value)
}

if value, ok := metricB.GetValueUint8(instanceA); !ok {
t.Error("Value [total_read_ops] missing")
} else if value != 0 {
t.Errorf("Value [total_read_ops] = (%d) incorrect", value)
} else {
t.Logf("Value [total_read_ops] = (%d) correct!", value)
}
}

0 comments on commit 7488348

Please sign in to comment.