Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
refactor ReadAll to Read and fix concurrency issue. (#1056)
Browse files Browse the repository at this point in the history
* refactor ReadAll to Read and fix concurrency issue.

* use sync.Map instead of RWMutex
  • Loading branch information
rghetia authored and Bogdan Drutu committed Mar 12, 2019
1 parent 2eaaf3a commit d1aebdc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
36 changes: 28 additions & 8 deletions metric/gauge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestGauge(t *testing.T) {
f.GetEntry(metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}).Add(1)
f.GetEntry(metricdata.NewLabelValue("k1v1"), metricdata.LabelValue{}).Add(1)
f.GetEntry(metricdata.NewLabelValue("k1v2"), metricdata.NewLabelValue("k2v2")).Add(1)
m := r.ReadAll()
m := r.Read()
want := []*metricdata.Metric{
{
Descriptor: metricdata.Descriptor{
Expand Down Expand Up @@ -79,17 +79,17 @@ func TestFloat64Entry_Add(t *testing.T) {
r := NewRegistry()
g := r.AddFloat64Gauge("g", "", metricdata.UnitDimensionless)
g.GetEntry().Add(0)
ms := r.ReadAll()
ms := r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(float64), 0.0; got != want {
t.Errorf("value = %v, want %v", got, want)
}
g.GetEntry().Add(1)
ms = r.ReadAll()
ms = r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(float64), 1.0; got != want {
t.Errorf("value = %v, want %v", got, want)
}
g.GetEntry().Add(-1)
ms = r.ReadAll()
ms = r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(float64), 0.0; got != want {
t.Errorf("value = %v, want %v", got, want)
}
Expand All @@ -99,7 +99,7 @@ func TestFloat64Gauge_Add_NegativeTotals(t *testing.T) {
r := NewRegistry()
g := r.AddFloat64Gauge("g", "", metricdata.UnitDimensionless)
g.GetEntry().Add(-1.0)
ms := r.ReadAll()
ms := r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(float64), float64(0); got != want {
t.Errorf("value = %v, want %v", got, want)
}
Expand All @@ -109,12 +109,12 @@ func TestInt64GaugeEntry_Add(t *testing.T) {
r := NewRegistry()
g := r.AddInt64Gauge("g", "", metricdata.UnitDimensionless)
g.GetEntry().Add(0)
ms := r.ReadAll()
ms := r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(int64), int64(0); got != want {
t.Errorf("value = %v, want %v", got, want)
}
g.GetEntry().Add(1)
ms = r.ReadAll()
ms = r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(int64), int64(1); got != want {
t.Errorf("value = %v, want %v", got, want)
}
Expand All @@ -124,7 +124,7 @@ func TestInt64Gauge_Add_NegativeTotals(t *testing.T) {
r := NewRegistry()
g := r.AddInt64Gauge("g", "", metricdata.UnitDimensionless)
g.GetEntry().Add(-1)
ms := r.ReadAll()
ms := r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(int64), int64(0); got != want {
t.Errorf("value = %v, want %v", got, want)
}
Expand Down Expand Up @@ -156,6 +156,26 @@ func TestMapKey(t *testing.T) {
}
}

func TestRaceCondition(t *testing.T) {
r := NewRegistry()

// start reader before adding Gauge metric.
var ms = []*metricdata.Metric{}
for i := 0; i < 5; i++ {
go func(k int) {
for j := 0; j < 5; j++ {
g := r.AddInt64Gauge(fmt.Sprintf("g%d%d", k, j), "", metricdata.UnitDimensionless)
g.GetEntry().Add(1)
}
}(i)
}
time.Sleep(1 * time.Second)
ms = r.Read()
if got, want := ms[0].TimeSeries[0].Points[0].Value.(int64), int64(1); got != want {
t.Errorf("value = %v, want %v", got, want)
}
}

func ignoreTimes(_, _ time.Time) bool {
return true
}
Expand Down
27 changes: 15 additions & 12 deletions metric/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@
package metric

import (
"go.opencensus.io/metric/metricdata"
"log"
"sync"
"time"

"go.opencensus.io/metric/metricdata"
)

// Registry creates and manages a set of gauges.
// External synchronization is required if you want to add gauges to the same
// registry from multiple goroutines.
type Registry struct {
gauges map[string]*gauge
gauges sync.Map
}

// NewRegistry initializes a new Registry.
func NewRegistry() *Registry {
return &Registry{
gauges: make(map[string]*gauge),
}
return &Registry{}
}

// AddFloat64Gauge creates and adds a new float64-valued gauge to this registry.
Expand All @@ -53,8 +53,9 @@ func (r *Registry) AddInt64Gauge(name, description string, unit metricdata.Unit,
}

func (r *Registry) initGauge(g *gauge, labelKeys []string, name string, description string, unit metricdata.Unit) *gauge {
existing, ok := r.gauges[name]
val, ok := r.gauges.Load(name)
if ok {
existing := val.(*gauge)
if existing.isFloat != g.isFloat {
log.Panicf("Gauge with name %s already exists with a different type", name)
}
Expand All @@ -67,15 +68,17 @@ func (r *Registry) initGauge(g *gauge, labelKeys []string, name string, descript
Unit: unit,
LabelKeys: labelKeys,
}
r.gauges[name] = g
r.gauges.Store(name, g)
return g
}

// ReadAll reads all gauges in this registry and returns their values as metrics.
func (r *Registry) ReadAll() []*metricdata.Metric {
ms := make([]*metricdata.Metric, 0, len(r.gauges))
for _, g := range r.gauges {
// Read reads all gauges in this registry and returns their values as metrics.
func (r *Registry) Read() []*metricdata.Metric {
ms := []*metricdata.Metric{}
r.gauges.Range(func(k, v interface{}) bool {
g := v.(*gauge)
ms = append(ms, g.read())
}
return true
})
return ms
}

0 comments on commit d1aebdc

Please sign in to comment.