Skip to content

Commit

Permalink
fix concurrent recording by using a waitgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanotorresi committed Apr 7, 2020
1 parent ad72196 commit 06e18fa
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
6 changes: 2 additions & 4 deletions collector/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ type alertCollector struct {
func (c *alertCollector) Collect(ch chan<- prometheus.Metric) {
log.Debugln("Collecting Alert metrics")

var err error

err = collector.RecordConcurrently([]func(ch chan<- prometheus.Metric) error{
errs := collector.RecordConcurrently([]func(ch chan<- prometheus.Metric) error{
c.recordHAConfigChecks,
c.recordHAFailoverConfigChecks,
c.recordHAFailoverActive,
}, ch)

if err != nil {
for _, err := range errs {
log.Warnf("Alert Collector scrape failed: %s", err)
}
}
Expand Down
36 changes: 25 additions & 11 deletions collector/default_collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package collector

import (
"sync"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -56,22 +58,34 @@ func (c *DefaultCollector) makeMetric(name string, value float64, valueType prom
return prometheus.MustNewConstMetric(desc, valueType, value, labelValues...)
}

// run multiple metric recording functions concurrently
func RecordConcurrently(recorders []func(ch chan<- prometheus.Metric) error, ch chan<- prometheus.Metric) error {
errs := make(chan error, len(recorders))
// Run multiple metric recording functions concurrently
func RecordConcurrently(recorders []func(ch chan<- prometheus.Metric) error, ch chan<- prometheus.Metric) []error {
results := make(chan error, len(recorders))
var errs []error
var wg sync.WaitGroup

// For each recorder we start a goroutine which will send its result in a channel.
// A Waitgroup is used to later wait on all of them.
for _, recorder := range recorders {
go func(recorder func(ch chan<- prometheus.Metric) error) {
errs <- recorder(ch)
}(recorder)
wg.Add(1)
go func(recorder func(ch chan<- prometheus.Metric) error, wg *sync.WaitGroup) {
defer wg.Done()
results <- recorder(ch)
}(recorder, &wg)
}
// we wait for all the metric recorders, and return as soon as one sends an error
for range recorders {
err := <-errs

// As soon as all the goroutines in the Waitgroup are done, close the channel where the errors are sent
go func() {
wg.Wait()
close(results)
}()

// Scroll the results channel and store potential errors in an array. This will block until the channel is closed.
for err := range results {
if err != nil {
return err
errs = append(errs, err)
}
}

return nil
return errs
}
10 changes: 5 additions & 5 deletions collector/default_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestRecordConcurrently(t *testing.T) {
return nil
}

err := RecordConcurrently([]func(ch chan<- prometheus.Metric) error{recorder1, recorder2}, metrics)
assert.NoError(t, err)
errs := RecordConcurrently([]func(ch chan<- prometheus.Metric) error{recorder1, recorder2}, metrics)
assert.Len(t, errs, 0)
assert.Equal(t, metric2, <-metrics)
assert.Equal(t, metric1, <-metrics)
}
Expand All @@ -52,8 +52,8 @@ func TestRecordConcurrentlyErrors(t *testing.T) {
return nil
}

err := RecordConcurrently([]func(ch chan<- prometheus.Metric) error{recorder1, recorder2}, metrics)
assert.Equal(t, expectedError, err)
time.Sleep(time.Millisecond * 50)
errs := RecordConcurrently([]func(ch chan<- prometheus.Metric) error{recorder1, recorder2}, metrics)
assert.Len(t, errs, 1)
assert.Equal(t, expectedError, errs[0])
assert.Equal(t, metric2, <-metrics) // even if the first recorder returned an error, the second one should still run to completion
}

0 comments on commit 06e18fa

Please sign in to comment.