Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Add support for specifying distributions through protobuf and modify …
Browse files Browse the repository at this point in the history
…probe config proto to provide an option to use distributions for latency.

ORIGINAL_AUTHOR=Manu Garg <manugarg@gmail.com>
PiperOrigin-RevId: 175631204
  • Loading branch information
manugarg committed Nov 14, 2017
1 parent 05c0e69 commit 079d723
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 8 deletions.
37 changes: 32 additions & 5 deletions metrics/dist.go
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"reflect"
"sort"
"strconv"
"strings"
"sync"

Expand All @@ -45,6 +46,27 @@ func NewDistribution(lowerBounds []float64) *Distribution {
}
}

// NewDistributionFromProto returns a new distribution based on the provided
// protobuf.
func NewDistributionFromProto(distProto *Distribution) (*Distribution, error) {
switch distProto.Buckets.(type) {
case *Distribution_ExplicitBuckets:
lbStringA := strings.Split(distProto.GetExplicitBuckets(), ",")
lowerBounds := make([]float64, len(lbStringA))
for i, tok := range lbStringA {
lb, err := strconv.ParseFloat(tok, 64)
if err != nil {
return nil, fmt.Errorf("invalid lower bound for bucket: %s. Err: %v", tok, err)
}
lowerBounds[i] = lb
}
return NewDistribution(lowerBounds), nil
case *Distribution_ExponentialBuckets:
return nil, errors.New("exponential buckets are not supported yet")
}
return nil, fmt.Errorf("unknown buckets type: %v", distProto.Buckets)
}

func (d *Distribution) bucketIndex(sample float64) int {
return sort.Search(len(d.lowerBounds), func(i int) bool { return sample < d.lowerBounds[i] }) - 1
}
Expand Down Expand Up @@ -97,19 +119,19 @@ func (d *Distribution) Add(val Value) error {
// For example for a distribution with lower bounds 0.5, 2.0, 7.5 and
// bucket counts 34, 54, 121, 12, string representation will look like the
// following:
// dist:sum:899|count:221|lb:-Inf,0.500,2.000,7.5000|bc:34,54,121,12
// dist:sum:899|count:221|lb:-Inf,0.5,2,7.5|bc:34,54,121,12
func (d *Distribution) String() string {
d.mu.RLock()
defer d.mu.RUnlock()

var tokens []string

tokens = append(tokens, fmt.Sprintf("sum:%.03f", d.sum))
tokens = append(tokens, fmt.Sprintf("sum:%s", strconv.FormatFloat(d.sum, 'f', -1, 64)))
tokens = append(tokens, fmt.Sprintf("count:%d", d.count))

tok := "lb:"
for _, lb := range d.lowerBounds {
tok = fmt.Sprintf("%s%.03f,", tok, lb)
tok = fmt.Sprintf("%s%s,", tok, strconv.FormatFloat(lb, 'f', -1, 64))
}
tok = tok[:len(tok)-1] // Remove last ","
tokens = append(tokens, tok)
Expand All @@ -133,7 +155,7 @@ func (d *Distribution) StackdriverTypedValue() *monitoring.TypedValue {
BucketCounts: googleapi.Int64s(append([]int64{}, d.bucketCounts...)),
BucketOptions: &monitoring.BucketOptions{
ExplicitBuckets: &monitoring.Explicit{
Bounds: append([]float64{}, d.lowerBounds...),
Bounds: append([]float64{}, d.lowerBounds[1:]...),
},
},
Count: d.count,
Expand All @@ -146,11 +168,16 @@ func (d *Distribution) StackdriverTypedValue() *monitoring.TypedValue {
func (d *Distribution) clone() Value {
d.mu.RLock()
defer d.mu.RUnlock()
newD := NewDistribution(d.lowerBounds)
newD := NewDistribution(d.lowerBounds[1:])
newD.sum = d.sum
newD.count = d.count
for i := range d.bucketCounts {
newD.bucketCounts[i] = d.bucketCounts[i]
}
return newD
}

// Clone returns a copy of the receiver distribution.
func (d *Distribution) Clone() *Distribution {
return d.clone().(*Distribution)
}
26 changes: 26 additions & 0 deletions metrics/dist.proto
@@ -0,0 +1,26 @@
syntax = "proto2";

package cloudprober.metrics;

message Distribution {
oneof buckets {
// Comma-separated list of lower bounds, where each lower bound is a float
// value. Example: 0.5,1,2,4,8.
string explicit_buckets = 1;
// Exponential buckets are not supported yet.
// TODO: Implement support for exponential buckets.
ExponentialBuckets exponential_buckets = 2;
}
}

// ExponentialBucket defines a set of num_buckets+2 buckets:
// bucket[0] covers (−Inf, 0)
// bucket[1] covers [0, scale_factor)
// bucket[i] covers [scale_factor*base^(i−2), scale_factor*base^(i−1)) for i > 1 and i <= num_buckets
// bucket[num_buckets+1] covers [scale_factor*base^(num_buckets−1), +Inf)
// base must be at least 1.01.
message ExponentialBuckets {
optional float scale_factor = 1 [default = 1.0];
optional float base = 2 [default = 2];
optional uint32 num_buckets = 3 [default = 20];
}
31 changes: 30 additions & 1 deletion metrics/dist_test.go
Expand Up @@ -15,7 +15,11 @@
package metrics

import (
"math"
"reflect"
"testing"

"github.com/golang/protobuf/proto"
)

func verifyBucketCount(t *testing.T, d *Distribution, indices []int, counts []int64) {
Expand All @@ -27,6 +31,31 @@ func verifyBucketCount(t *testing.T, d *Distribution, indices []int, counts []in
}
}

func protoToDist(t *testing.T, testDistProtoText string) *Distribution {
testDistProto := &Distribution{}
if err := proto.UnmarshalText(testDistProtoText, testDistProto); err != nil {
t.Errorf("Failed parsing distribution proto text: %s. Err: %v", testDistProtoText, err)
return nil
}
d, err := NewDistributionFromProto(testDistProto)
if err != nil {
t.Errorf("Error while creating distrubtion from the protobuf: %s. Err: %v", testDistProtoText, err)
return nil
}
return d
}

func TestNewDistributionFromProto(t *testing.T) {
testDistProtoText := `
explicit_buckets: "1,2,4,8,16,32"
`
expectedLowerBounds := []float64{math.Inf(-1), 1, 2, 4, 8, 16, 32}
d := protoToDist(t, testDistProtoText)
if !reflect.DeepEqual(d.lowerBounds, expectedLowerBounds) {
t.Errorf("Unexpected lower bounds from proto. d.lowerBounds=%v, want=%v.", d.lowerBounds, expectedLowerBounds)
}
}

func TestAddSample(t *testing.T) {
lb := []float64{1, 5, 10, 15, 20, 30, 40, 50}
d := NewDistribution(lb)
Expand Down Expand Up @@ -73,7 +102,7 @@ func TestString(t *testing.T) {
}

s := d.String()
want := "dist:sum:21.500|count:3|lb:-Inf,1.000,5.000,15.000,30.000,45.000|bc:1,1,0,1,0,0"
want := "dist:sum:21.5|count:3|lb:-Inf,1,5,15,30,45|bc:1,1,0,1,0,0"
if s != want {
t.Errorf("String is not in expected format. d.String()=%s, want: %s", s, want)
}
Expand Down
4 changes: 4 additions & 0 deletions probes/config.proto
@@ -1,5 +1,6 @@
syntax = "proto2";

import "github.com/google/cloudprober/metrics/dist.proto";
import "github.com/google/cloudprober/probes/http/config.proto";
import "github.com/google/cloudprober/probes/dns/config.proto";
import "github.com/google/cloudprober/probes/external/config.proto";
Expand Down Expand Up @@ -35,6 +36,9 @@ message ProbeDef {
// Targets for the probe
required targets.TargetsDef targets = 6;

// Latency distribution. If specified, latency is stored as a distribution.
optional metrics.Distribution latency_distribution = 7;

oneof probe {
ping.ProbeConf ping_probe = 20;
http.ProbeConf http_probe = 21;
Expand Down
2 changes: 2 additions & 0 deletions probes/options/options.go
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/google/cloudprober/logger"
"github.com/google/cloudprober/metrics"
"github.com/google/cloudprober/targets"
)

Expand All @@ -30,4 +31,5 @@ type Options struct {
Interval, Timeout time.Duration
Logger *logger.Logger
ProbeConf interface{} // Probe-type specific config
LatencyDist *metrics.Distribution
}
10 changes: 8 additions & 2 deletions probes/probes.go
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

/*
Probes package provides an interface to initialize probes using prober config.
Package probes provides an interface to initialize probes using prober config.
*/
package probes

Expand Down Expand Up @@ -105,10 +105,16 @@ func Init(probeProtobufs []*ProbeDef, globalTargetsOpts *targets.GlobalTargetsOp
if opts.Logger, err = newLogger(p.GetName()); err != nil {
glog.Exitf("Error in initializing logger for the probe %s. Err: %v", p.GetName(), err)
}

if opts.Targets, err = targets.New(p.GetTargets(), globalTargetsOpts, globalTargetsLogger, opts.Logger); err != nil {
glog.Exit(err)
}
if latencyDist := p.GetLatencyDistribution(); latencyDist != nil {
if d, err := metrics.NewDistributionFromProto(latencyDist); err != nil {
glog.Exitf("Error creating distribution from the specification: %v. Err: %v", latencyDist, err)
} else {
opts.LatencyDist = d
}
}
probes[p.GetName()] = initProbe(p, opts)
}
return probes
Expand Down

0 comments on commit 079d723

Please sign in to comment.