Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Don't record measurements with no subscription (#600)
Browse files Browse the repository at this point in the history
Suggested by Ramon, immediately dropping the measurements from
the measures with no subscription rather than doing it at a later time
significantly improves the performance on the critical path.

Before:
BenchmarkRecord0-8            	500000000	         3.09 ns/op
BenchmarkRecord1-8            	 5000000	       366 ns/op
BenchmarkRecord8-8            	 3000000	       412 ns/op
BenchmarkRecord8_Parallel-8   	 2000000	       804 ns/op
BenchmarkRecord8_8Tags-8      	 3000000	       415 ns/op

After:
BenchmarkRecord0-8            	1000000000	         2.58 ns/op
BenchmarkRecord1-8            	30000000	        36.9 ns/op
BenchmarkRecord8-8            	20000000	        89.4 ns/op
BenchmarkRecord8_Parallel-8   	30000000	        44.8 ns/op
BenchmarkRecord8_8Tags-8      	20000000	        90.1 ns/op
  • Loading branch information
rakyll committed Mar 18, 2018
1 parent 92b618f commit 983446b
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 10 deletions.
7 changes: 4 additions & 3 deletions stats/internal/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
"go.opencensus.io/tag"
)

type Recorder func(*tag.Map, interface{})

// DefaultRecorder will be called for each Record call.
var DefaultRecorder Recorder = nil
var DefaultRecorder func(*tag.Map, interface{})

// SubscriptionReporter reports when a view subscribed with a measure.
var SubscriptionReporter func(measure string)
22 changes: 19 additions & 3 deletions stats/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"

"go.opencensus.io/stats/internal"
)
Expand All @@ -37,14 +38,27 @@ type Measure interface {
Name() string
Description() string
Unit() string

subscribe()
subscribed() bool
}

type measure struct {
subs int32 // access atomically

name string
description string
unit string
}

func (m *measure) subscribe() {
atomic.StoreInt32(&m.subs, 1)
}

func (m *measure) subscribed() bool {
return atomic.LoadInt32(&m.subs) == 1
}

// Name returns the name of the measure.
func (m *measure) Name() string {
return m.name
Expand All @@ -61,10 +75,12 @@ func (m *measure) Unit() string {
}

var (
mu sync.RWMutex
measures = make(map[string]Measure)
errDuplicate = errors.New("duplicate measure name")
mu sync.RWMutex
measures = make(map[string]Measure)
)

var (
errDuplicate = errors.New("duplicate measure name")
errMeasureNameTooLong = fmt.Errorf("measure name cannot be longer than %v", internal.MaxNameLength)
)

Expand Down
15 changes: 13 additions & 2 deletions stats/measure_float64.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@ type Float64Measure struct {
measure
}

func (f *Float64Measure) subscribe() {
f.measure.subscribe()
}

func (f *Float64Measure) subscribed() bool {
return f.measure.subscribed()
}

// M creates a new float64 measurement.
// Use Record to record measurements.
func (m *Float64Measure) M(v float64) Measurement {
return Measurement{m: m, v: v}
func (f *Float64Measure) M(v float64) Measurement {
if !f.subscribed() {
return Measurement{}
}
return Measurement{m: f, v: v}
}

// Float64 creates a new measure of type Float64Measure. It returns
Expand Down
15 changes: 13 additions & 2 deletions stats/measure_int64.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@ type Int64Measure struct {
measure
}

func (i *Int64Measure) subscribe() {
i.measure.subscribe()
}

func (i *Int64Measure) subscribed() bool {
return i.measure.subscribed()
}

// M creates a new int64 measurement.
// Use Record to record measurements.
func (m *Int64Measure) M(v int64) Measurement {
return Measurement{m: m, v: float64(v)}
func (i *Int64Measure) M(v int64) Measurement {
if !i.subscribed() {
return Measurement{}
}
return Measurement{m: i, v: float64(v)}
}

// Int64 creates a new measure of type Int64Measure. It returns an
Expand Down
18 changes: 18 additions & 0 deletions stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,30 @@ import (
"go.opencensus.io/tag"
)

func init() {
internal.SubscriptionReporter = func(measure string) {
mu.Lock()
measures[measure].subscribe()
mu.Unlock()
}
}

// Record records one or multiple measurements with the same tags at once.
// If there are any tags in the context, measurements will be tagged with them.
func Record(ctx context.Context, ms ...Measurement) {
if len(ms) == 0 {
return
}
var record bool
for _, m := range ms {
if (m != Measurement{}) {
record = true
break
}
}
if !record {
return
}
if internal.DefaultRecorder != nil {
internal.DefaultRecorder(tag.FromContext(ctx), ms)
}
Expand Down
5 changes: 5 additions & 0 deletions stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/internal"
"go.opencensus.io/tag"
)

Expand Down Expand Up @@ -57,6 +58,7 @@ func (cmd *subscribeToViewReq) handleCommand(w *worker) {
errstr = append(errstr, fmt.Sprintf("%s: %v", view.Name, err))
continue
}
internal.SubscriptionReporter(view.Measure.Name())
vi.subscribe()
}
if len(errstr) > 0 {
Expand Down Expand Up @@ -135,6 +137,9 @@ type recordReq struct {

func (cmd *recordReq) handleCommand(w *worker) {
for _, m := range cmd.ms {
if (m == stats.Measurement{}) { // not subscribed
continue
}
ref := w.getMeasureRef(m.Measure().Name())
for v := range ref.views {
v.addSample(cmd.tm, m.Value())
Expand Down

0 comments on commit 983446b

Please sign in to comment.