Skip to content

Commit

Permalink
Retry for posting metrics (#61)
Browse files Browse the repository at this point in the history
* use shogo82148/go-retry for posting metrics

* impl doRetry myself, add tests

go-retry.Do() does not retry when net.Err.Temorary() returns false.
We want to retry always.
  • Loading branch information
fujiwara authored May 14, 2024
1 parent 4eac513 commit cd34bd3
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 32 deletions.
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"net/http"
"os"
"sync"
"time"

Expand All @@ -29,6 +30,10 @@ func newClient(apiKey string, backupStream string) *Client {
streamName: backupStream,
}
}
if os.Getenv("EMULATE_FAILURE") != "" {
// force fail for POST requests
c.mackerel.HTTPClient.Transport = &postFailureTransport{}
}
return c
}

Expand Down
2 changes: 2 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ package maprobe

var (
ParseMetricLine = parseMetricLine
DoRetry = doRetry
NewClient = newClient
)
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/hashicorp/logutils v1.0.0
github.com/mackerelio/mackerel-client-go v0.21.1
github.com/pkg/errors v0.9.1
github.com/shogo82148/go-retry v1.2.0
github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e
go.opentelemetry.io/otel v1.17.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.40.0
Expand Down Expand Up @@ -42,6 +43,6 @@ require (
golang.org/x/tools/cmd/cover v0.1.0-deprecated // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/grpc v1.57.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:Om
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shirou/gopsutil/v3 v3.22.4/go.mod h1:D01hZJ4pVHPpCTZ3m3T2+wDF2YAGfd+H4ifUguaQzHM=
github.com/shogo82148/go-retry v1.2.0 h1:A/LFdbZKJ+tsT1gF4OrzM4P10FGK7VUExpb07/U03aE=
github.com/shogo82148/go-retry v1.2.0/go.mod h1:wttfgfwCMQvNqv4kOpqIvDDJeSmwU+AEIpUyG+5Ca6M=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
80 changes: 49 additions & 31 deletions maprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package maprobe
import (
"context"
"encoding/json"
"fmt"
"log"
"math"
"net/url"
"os"
"sync"
"time"

mackerel "github.com/mackerelio/mackerel-client-go"
"github.com/shogo82148/go-retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata"
otelresource "go.opentelemetry.io/otel/sdk/resource"
Expand All @@ -24,12 +25,16 @@ var (
sem = make(chan struct{}, MaxConcurrency)
clientSem = make(chan struct{}, MaxClientConcurrency)
ProbeInterval = 60 * time.Second
mackerelRetryInterval = 10 * time.Second
otelReryInterval = 10 * time.Second
metricTimeMargin = -3 * time.Minute
MackerelAPIKey string
)

var retryPolicy = retry.Policy{
MinDelay: 1 * time.Second,
MaxDelay: 10 * time.Second,
MaxCount: 5,
}

func lock() {
sem <- struct{}{}
log.Printf("[trace] locked. concurrency: %d", len(sem))
Expand All @@ -51,10 +56,6 @@ func Run(ctx context.Context, wg *sync.WaitGroup, configPath string, once bool)
}
log.Println("[debug]", conf.String())
client := newClient(MackerelAPIKey, conf.Backup.FirehoseStreamName)
if os.Getenv("EMULATE_FAILURE") != "" {
// force fail for POST requests
client.mackerel.HTTPClient.Transport = &postFailureTransport{}
}

chs := NewChannels(conf.Destination)
defer chs.Close()
Expand All @@ -63,22 +64,22 @@ func Run(ctx context.Context, wg *sync.WaitGroup, configPath string, once bool)
if conf.PostProbedMetrics {
if conf.Destination.Mackerel.Enabled {
wg.Add(2)
go postHostMetricWorker(wg, client, chs)
go postServiceMetricWorker(wg, client, chs)
go postHostMetricWorker(ctx, wg, client, chs)
go postServiceMetricWorker(ctx, wg, client, chs)
}
if conf.Destination.Otel.Enabled {
wg.Add(1)
go postOtelMetricWorker(wg, chs, conf.Destination.Otel)
go postOtelMetricWorker(ctx, wg, chs, conf.Destination.Otel)
}
} else {
if conf.Destination.Mackerel.Enabled {
wg.Add(2)
go dumpHostMetricWorker(wg, chs)
go dumpServiceMetricWorker(wg, chs)
go dumpHostMetricWorker(ctx, wg, chs)
go dumpServiceMetricWorker(ctx, wg, chs)
}
if conf.Destination.Otel.Enabled {
wg.Add(1)
go dumpOtelMetricWorker(wg, chs)
go dumpOtelMetricWorker(ctx, wg, chs)
}
}
}
Expand All @@ -88,12 +89,12 @@ func Run(ctx context.Context, wg *sync.WaitGroup, configPath string, once bool)
if conf.Destination.Mackerel.Enabled {
// aggregates are posted to Mackerel only
wg.Add(1)
go postServiceMetricWorker(wg, client, chs)
go postServiceMetricWorker(ctx, wg, client, chs)
}
// TODO: aggregates are not posted to OTel yet
} else {
wg.Add(1)
go dumpServiceMetricWorker(wg, chs)
go dumpServiceMetricWorker(ctx, wg, chs)
}
}

Expand Down Expand Up @@ -134,7 +135,7 @@ func Run(ctx context.Context, wg *sync.WaitGroup, configPath string, once bool)
}
}

func runAggregates(ctx context.Context, ag *AggregateDefinition, client *Client, chs *Channels, wg *sync.WaitGroup) {
func runAggregates(_ context.Context, ag *AggregateDefinition, client *Client, chs *Channels, wg *sync.WaitGroup) {
defer wg.Done()

service := ag.Service.String()
Expand Down Expand Up @@ -234,7 +235,7 @@ func runAggregates(ctx context.Context, ag *AggregateDefinition, client *Client,
}
}

func postHostMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
func postHostMetricWorker(ctx context.Context, wg *sync.WaitGroup, client *Client, chs *Channels) {
log.Println("[info] starting postHostMetricWorker")
defer wg.Done()
ticker := time.NewTicker(10 * time.Second)
Expand All @@ -260,9 +261,10 @@ func postHostMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
log.Printf("[debug] posting %d host metrics to Mackerel", len(mvs))
b, _ := json.Marshal(mvs)
log.Println("[debug]", string(b))
if err := client.PostHostMetricValues(mvs); err != nil {
log.Println("[error] failed to post host metrics to Mackerel", err)
time.Sleep(mackerelRetryInterval)
if err := doRetry(ctx, func() error {
return client.PostHostMetricValues(mvs)
}); err != nil {
log.Printf("[error] failed to post host metrics to Mackerel %s", err)
continue
}
log.Printf("[debug] post host metrics succeeded.")
Expand All @@ -271,7 +273,7 @@ func postHostMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
}
}

func postServiceMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
func postServiceMetricWorker(ctx context.Context, wg *sync.WaitGroup, client *Client, chs *Channels) {
log.Println("[info] starting postServiceMetricWorker")
defer wg.Done()
ticker := time.NewTicker(10 * time.Second)
Expand Down Expand Up @@ -307,9 +309,10 @@ func postServiceMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels)
log.Printf("[debug] posting %d service metrics to Mackerel:%s", len(mvs), serviceName)
b, _ := json.Marshal(mvs)
log.Println("[debug]", string(b))
if err := client.PostServiceMetricValues(serviceName, mvs); err != nil {
if err := doRetry(ctx, func() error {
return client.PostServiceMetricValues(serviceName, mvs)
}); err != nil {
log.Printf("[error] failed to post service metrics to Mackerel:%s %s", serviceName, err)
time.Sleep(mackerelRetryInterval)
continue
}
log.Printf("[debug] post service succeeded.")
Expand All @@ -320,10 +323,8 @@ func postServiceMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels)
}
}

func postOtelMetricWorker(wg *sync.WaitGroup, chs *Channels, oc *OtelConfig) {
func postOtelMetricWorker(ctx context.Context, wg *sync.WaitGroup, chs *Channels, oc *OtelConfig) {
defer wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
exporter, endpointURL, err := newOtelExporter(ctx, oc)
if err != nil {
log.Printf("[error] failed to create OpenTelemetry meter exporter: %v", err)
Expand Down Expand Up @@ -361,9 +362,10 @@ func postOtelMetricWorker(wg *sync.WaitGroup, chs *Channels, oc *OtelConfig) {
{Metrics: mvs},
},
}
if err := exporter.Export(ctx, rms); err != nil {
if err := doRetry(ctx, func() error {
return exporter.Export(ctx, rms)
}); err != nil {
log.Printf("[error] failed to export otel metrics: %v", err)
time.Sleep(otelReryInterval)
continue
}
log.Printf("[debug] post otel metrics succeeded.")
Expand Down Expand Up @@ -399,7 +401,7 @@ func newOtelExporter(ctx context.Context, oc *OtelConfig) (*otlpmetricgrpc.Expor
return exporter, endpointURL.String(), nil
}

func dumpHostMetricWorker(wg *sync.WaitGroup, chs *Channels) {
func dumpHostMetricWorker(_ context.Context, wg *sync.WaitGroup, chs *Channels) {
defer wg.Done()
log.Println("[info] starting dumpHostMetricWorker")
for m := range chs.HostMetrics {
Expand All @@ -408,7 +410,7 @@ func dumpHostMetricWorker(wg *sync.WaitGroup, chs *Channels) {
}
}

func dumpServiceMetricWorker(wg *sync.WaitGroup, chs *Channels) {
func dumpServiceMetricWorker(_ context.Context, wg *sync.WaitGroup, chs *Channels) {
defer wg.Done()
log.Println("[info] starting dumpServiceMetricWorker")
for m := range chs.ServiceMetrics {
Expand All @@ -417,7 +419,7 @@ func dumpServiceMetricWorker(wg *sync.WaitGroup, chs *Channels) {
}
}

func dumpOtelMetricWorker(wg *sync.WaitGroup, chs *Channels) {
func dumpOtelMetricWorker(_ context.Context, wg *sync.WaitGroup, chs *Channels) {
defer wg.Done()
log.Println("[info] starting dumpOtelMetricWorker")
for m := range chs.OtelMetrics {
Expand All @@ -428,3 +430,19 @@ func dumpOtelMetricWorker(wg *sync.WaitGroup, chs *Channels) {
type templateParam struct {
Host *mackerel.Host
}

func doRetry(ctx context.Context, f func() error) error {
r := retryPolicy.Start(ctx)
var err error
for r.Continue() {
err = f()
if err == nil {
return nil
}
log.Printf("[warn] retrying: %s", err)
}
if r.Err() != nil {
return r.Err()
}
return fmt.Errorf("retry failed: %w", err)
}
30 changes: 30 additions & 0 deletions maprobe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package maprobe_test

import (
"context"
"testing"
"time"

"github.com/fujiwara/maprobe"
)

func TestDoRetry(t *testing.T) {
t.Setenv("EMULATE_FAILURE", "true")
client := maprobe.NewClient("dummy", "")
tries := 0
start := time.Now()
err := maprobe.DoRetry(context.Background(), func() error {
tries++
return client.PostHostMetricValues(nil)
})
elapsed := time.Since(start)
if err == nil {
t.Errorf("error expected")
}
if tries <= 1 {
t.Errorf("retry expected")
}
if elapsed < 10*time.Second {
t.Errorf("retry delay expected")
}
}

0 comments on commit cd34bd3

Please sign in to comment.