Skip to content

Commit

Permalink
extra/metric/statsd examples
Browse files Browse the repository at this point in the history
  • Loading branch information
cfchou committed Aug 24, 2017
1 parent af0ecf0 commit 8a65337
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 29 deletions.
159 changes: 157 additions & 2 deletions extra/metric/example_statsd_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,161 @@
package metric

import "testing"
import (
"context"
"errors"
"github.com/cactus/go-statsd-client/statsd"
log2 "github.com/cfchou/go-gentle/extra/log"
"gopkg.in/cfchou/go-gentle.v3/gentle"
"gopkg.in/inconshreveable/log15.v2"
"log"
"net"
"sync"
"time"
)

func TestNewStatsdMetric(t *testing.T) {
func ExampleNewStatsdMetric() {
subPath := "example.test1"
client, _ := statsd.NewClient("127.0.0.1:8125", "extra")
opts := gentle.NewRateLimitedStreamOpts("", "test1",
gentle.NewTokenBucketRateLimit(10*time.Millisecond, 1))
opts.Metric = NewStatsdMetric(subPath, client)
opts.Log = log2.NewLog15Adapter(log15.New())

var upstream gentle.SimpleStream = func(ctx context.Context) (gentle.Message, error) {
return gentle.SimpleMessage(""), nil
}
stream := gentle.NewRateLimitedStream(opts, upstream)

// listen like "nc 8125 -l -u"
conn, err := net.ListenPacket("udp", ":8125")
if err != nil {
log.Fatalf("ListenPacket err: %s\n", err)
}
defer conn.Close()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
bs := make([]byte, 256)
// read 5 times should be enough
for i := 0; i < 5; i++ {
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, _, err := conn.ReadFrom(bs)
if err != nil {
log.Printf("ReadFrom err: %s\n", err)
continue
}
// timer:
// extra.rate.test1.ok:0.020875|ms
// counter:
// extra.rate.test1.ok:1|c
log.Println(string(bs[:n]))
}
}()

stream.Get(context.Background())
wg.Wait()
// Output:
}

func ExampleNewStatsdRetryMetric() {
subPath := "example.test2.result"
retrySubPath := "example.test2.retry"
client, _ := statsd.NewClient("127.0.0.1:8125", "extra")
factory := gentle.NewConstantBackOffFactory(
gentle.NewConstantBackOffFactoryOpts(10*time.Millisecond, time.Second))
opts := gentle.NewRetryStreamOpts("", "test2", factory)
opts.RetryMetric = NewStatsdRetryMetric(subPath, retrySubPath, client)
opts.Log = log2.NewLog15Adapter(log15.New())

count := 5
var upstream gentle.SimpleStream = func(ctx context.Context) (gentle.Message, error) {
if count == 0 {
return gentle.SimpleMessage(""), nil
}
count--
return nil, errors.New("fake err")
}
stream := gentle.NewRetryStream(opts, upstream)

// listen like "nc 8125 -l -u"
conn, err := net.ListenPacket("udp", ":8125")
if err != nil {
log.Fatalf("ListenPacket err: %s\n", err)
}
defer conn.Close()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
bs := make([]byte, 256)
// read 5 times should be enough
for i := 0; i < 5; i++ {
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, _, err := conn.ReadFrom(bs)
if err != nil {
log.Printf("ReadFrom err: %s\n", err)
continue
}
// timer:
// extra.example.test2.result.ok:56.73204|ms
// counter:
// extra.example.test2.result.ok:1|c
// retry-counter:
// extra.example.test2.retry.ok:5|c
log.Println(string(bs[:n]))
}
}()

stream.Get(context.Background())
wg.Wait()
// Output:
}

func ExampleNewStatsdCbMetric() {
gentle.CircuitReset()
subPath := "example.test3.result"
cbSubPath := "example.test3.cb"
client, _ := statsd.NewClient("127.0.0.1:8125", "extra")
opts := gentle.NewCircuitStreamOpts("", "test3", "test_circuit")
opts.CbMetric = NewStatsdCbMetric(subPath, cbSubPath, client)
opts.Log = log2.NewLog15Adapter(log15.New())

var upstream gentle.SimpleStream = func(ctx context.Context) (gentle.Message, error) {
return nil, gentle.ErrCbOpen
}
stream := gentle.NewCircuitStream(opts, upstream)

// listen like "nc 8125 -l -u"
conn, err := net.ListenPacket("udp", ":8125")
if err != nil {
log.Fatalf("ListenPacket err: %s\n", err)
}
defer conn.Close()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
bs := make([]byte, 256)
// read 5 times should be enough
for i := 0; i < 5; i++ {
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, _, err := conn.ReadFrom(bs)
if err != nil {
log.Printf("ReadFrom err: %s\n", err)
continue
}
// timer:
// extra.example.test3.result.err:1.358839|ms
// counter:
// extra.example.test3.result.err:1|c
// cb-error-counter:
// extra.example.test3.cb.open:1|c
log.Println(string(bs[:n]))
}
}()

stream.Get(context.Background())
wg.Wait()
// Output:
}
34 changes: 11 additions & 23 deletions extra/metric/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package metric

import (
"github.com/cactus/go-statsd-client/statsd"
"github.com/cfchou/go-gentle/gentle"
"gopkg.in/cfchou/go-gentle.v3/gentle"
"time"
)

Expand Down Expand Up @@ -30,12 +30,12 @@ type StatsdMetric struct {
statter statsd.SubStatter
}

func NewStatsdMetric(subPath string, client *statsd.Client) *StatsdMetric {
func NewStatsdMetric(subPath string, statter statsd.Statter) *StatsdMetric {
return &StatsdMetric{
Rate: 1,
SubOk: "ok",
SubErr: "err",
statter: client.NewSubStatter(subPath),
statter: statter.NewSubStatter(subPath),
}
}

Expand All @@ -57,8 +57,8 @@ func (m *StatsdMetric) ObserveErr(timespan time.Duration) {
// stats.timers.foo.ok
// stats.timers.foo.err
//
// stats.foo.retry.ok
// stats.foo.retry.err
// stats.foo_retry.ok
// stats.foo_retry.err
type StatsdRetryMetric struct {
*StatsdMetric
SubOk string
Expand All @@ -67,20 +67,14 @@ type StatsdRetryMetric struct {
retryStatter statsd.SubStatter
}

func NewStatsdRetryMetric(subPath string, retrySubPath string, client *statsd.Client) *StatsdRetryMetric {
m := NewStatsdMetric(subPath, client)
var retryStatter statsd.SubStatter
if retrySubPath == "" {
retryStatter = m.statter.NewSubStatter("retry")
} else {
retryStatter = client.NewSubStatter(retrySubPath)
}
func NewStatsdRetryMetric(subPath string, retrySubPath string, statter statsd.Statter) *StatsdRetryMetric {
m := NewStatsdMetric(subPath, statter)
return &StatsdRetryMetric{
StatsdMetric: m,
SubOk: m.SubOk,
SubErr: m.SubErr,
Rate: m.Rate,
retryStatter: retryStatter,
retryStatter: statter.NewSubStatter(retrySubPath),
}
}

Expand Down Expand Up @@ -113,21 +107,15 @@ type StatsdCbMetric struct {
cbErrStatter statsd.SubStatter
}

func NewStatsdCbMetric(subPath string, cbErrSubPath string, client *statsd.Client) *StatsdCbMetric {
m := NewStatsdMetric(subPath, client)
var cbErrStatter statsd.SubStatter
if cbErrSubPath == "" {
cbErrStatter = m.statter.NewSubStatter("cbErr")
} else {
cbErrStatter = client.NewSubStatter(cbErrSubPath)
}
func NewStatsdCbMetric(subPath string, cbErrSubPath string, statter statsd.Statter) *StatsdCbMetric {
m := NewStatsdMetric(subPath, statter)
return &StatsdCbMetric{
StatsdMetric: m,
SubTimeout: "timeout",
SubMaxConcurrency: "maxConcurrency",
SubOpen: "open",
Rate: m.Rate,
cbErrStatter: cbErrStatter,
cbErrStatter: statter.NewSubStatter(cbErrSubPath),
}
}

Expand Down
4 changes: 0 additions & 4 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import:
version: ^0.8.0
subpackages:
- prometheus
- package: gopkg.in/cfchou/go-gentle.v2
version: ^2.1.0
subpackages:
- gentle
- package: golang.org/x/net
subpackages:
- context
Expand Down

0 comments on commit 8a65337

Please sign in to comment.