Skip to content

Commit

Permalink
use shogo82148/go-retry for posting metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiwara committed May 14, 2024
1 parent 4eac513 commit da3e18c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 24 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/hashicorp/logutils v1.0.0
github.com/mackerelio/mackerel-client-go v0.21.1
github.com/pkg/errors v0.9.1
github.com/shogo82148/go-retry v1.2.0
github.com/tatsushid/go-fastping v0.0.0-20160109021039-d7bb493dee3e
go.opentelemetry.io/otel v1.17.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.40.0
Expand Down Expand Up @@ -42,6 +43,6 @@ require (
golang.org/x/tools/cmd/cover v0.1.0-deprecated // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/grpc v1.57.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:Om
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shirou/gopsutil/v3 v3.22.4/go.mod h1:D01hZJ4pVHPpCTZ3m3T2+wDF2YAGfd+H4ifUguaQzHM=
github.com/shogo82148/go-retry v1.2.0 h1:A/LFdbZKJ+tsT1gF4OrzM4P10FGK7VUExpb07/U03aE=
github.com/shogo82148/go-retry v1.2.0/go.mod h1:wttfgfwCMQvNqv4kOpqIvDDJeSmwU+AEIpUyG+5Ca6M=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
54 changes: 31 additions & 23 deletions maprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

mackerel "github.com/mackerelio/mackerel-client-go"
"github.com/shogo82148/go-retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata"
otelresource "go.opentelemetry.io/otel/sdk/resource"
Expand All @@ -24,12 +25,16 @@ var (
sem = make(chan struct{}, MaxConcurrency)
clientSem = make(chan struct{}, MaxClientConcurrency)
ProbeInterval = 60 * time.Second
mackerelRetryInterval = 10 * time.Second
otelReryInterval = 10 * time.Second
metricTimeMargin = -3 * time.Minute
MackerelAPIKey string
)

var retryPolicy = retry.Policy{
MinDelay: 1 * time.Second,
MaxDelay: 10 * time.Second,
MaxCount: 5,
}

func lock() {
sem <- struct{}{}
log.Printf("[trace] locked. concurrency: %d", len(sem))
Expand Down Expand Up @@ -63,22 +68,22 @@ func Run(ctx context.Context, wg *sync.WaitGroup, configPath string, once bool)
if conf.PostProbedMetrics {
if conf.Destination.Mackerel.Enabled {
wg.Add(2)
go postHostMetricWorker(wg, client, chs)
go postServiceMetricWorker(wg, client, chs)
go postHostMetricWorker(ctx, wg, client, chs)
go postServiceMetricWorker(ctx, wg, client, chs)
}
if conf.Destination.Otel.Enabled {
wg.Add(1)
go postOtelMetricWorker(wg, chs, conf.Destination.Otel)
go postOtelMetricWorker(ctx, wg, chs, conf.Destination.Otel)
}
} else {
if conf.Destination.Mackerel.Enabled {
wg.Add(2)
go dumpHostMetricWorker(wg, chs)
go dumpServiceMetricWorker(wg, chs)
go dumpHostMetricWorker(ctx, wg, chs)
go dumpServiceMetricWorker(ctx, wg, chs)
}
if conf.Destination.Otel.Enabled {
wg.Add(1)
go dumpOtelMetricWorker(wg, chs)
go dumpOtelMetricWorker(ctx, wg, chs)
}
}
}
Expand All @@ -88,12 +93,12 @@ func Run(ctx context.Context, wg *sync.WaitGroup, configPath string, once bool)
if conf.Destination.Mackerel.Enabled {
// aggregates are posted to Mackerel only
wg.Add(1)
go postServiceMetricWorker(wg, client, chs)
go postServiceMetricWorker(ctx, wg, client, chs)
}
// TODO: aggregates are not posted to OTel yet
} else {
wg.Add(1)
go dumpServiceMetricWorker(wg, chs)
go dumpServiceMetricWorker(ctx, wg, chs)
}
}

Expand Down Expand Up @@ -134,7 +139,7 @@ func Run(ctx context.Context, wg *sync.WaitGroup, configPath string, once bool)
}
}

func runAggregates(ctx context.Context, ag *AggregateDefinition, client *Client, chs *Channels, wg *sync.WaitGroup) {
func runAggregates(_ context.Context, ag *AggregateDefinition, client *Client, chs *Channels, wg *sync.WaitGroup) {
defer wg.Done()

service := ag.Service.String()
Expand Down Expand Up @@ -234,7 +239,7 @@ func runAggregates(ctx context.Context, ag *AggregateDefinition, client *Client,
}
}

func postHostMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
func postHostMetricWorker(ctx context.Context, wg *sync.WaitGroup, client *Client, chs *Channels) {
log.Println("[info] starting postHostMetricWorker")
defer wg.Done()
ticker := time.NewTicker(10 * time.Second)
Expand All @@ -260,9 +265,10 @@ func postHostMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
log.Printf("[debug] posting %d host metrics to Mackerel", len(mvs))
b, _ := json.Marshal(mvs)
log.Println("[debug]", string(b))
if err := client.PostHostMetricValues(mvs); err != nil {
if err := retryPolicy.Do(ctx, func() error {
return client.PostHostMetricValues(mvs)
}); err != nil {
log.Println("[error] failed to post host metrics to Mackerel", err)
time.Sleep(mackerelRetryInterval)
continue
}
log.Printf("[debug] post host metrics succeeded.")
Expand All @@ -271,7 +277,7 @@ func postHostMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
}
}

func postServiceMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels) {
func postServiceMetricWorker(ctx context.Context, wg *sync.WaitGroup, client *Client, chs *Channels) {
log.Println("[info] starting postServiceMetricWorker")
defer wg.Done()
ticker := time.NewTicker(10 * time.Second)
Expand Down Expand Up @@ -307,9 +313,10 @@ func postServiceMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels)
log.Printf("[debug] posting %d service metrics to Mackerel:%s", len(mvs), serviceName)
b, _ := json.Marshal(mvs)
log.Println("[debug]", string(b))
if err := client.PostServiceMetricValues(serviceName, mvs); err != nil {
if err := retryPolicy.Do(ctx, func() error {
return client.PostServiceMetricValues(serviceName, mvs)
}); err != nil {
log.Printf("[error] failed to post service metrics to Mackerel:%s %s", serviceName, err)
time.Sleep(mackerelRetryInterval)
continue
}
log.Printf("[debug] post service succeeded.")
Expand All @@ -320,7 +327,7 @@ func postServiceMetricWorker(wg *sync.WaitGroup, client *Client, chs *Channels)
}
}

func postOtelMetricWorker(wg *sync.WaitGroup, chs *Channels, oc *OtelConfig) {
func postOtelMetricWorker(ctx context.Context, wg *sync.WaitGroup, chs *Channels, oc *OtelConfig) {
defer wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -361,9 +368,10 @@ func postOtelMetricWorker(wg *sync.WaitGroup, chs *Channels, oc *OtelConfig) {
{Metrics: mvs},
},
}
if err := exporter.Export(ctx, rms); err != nil {
if err := retryPolicy.Do(ctx, func() error {
return exporter.Export(ctx, rms)
}); err != nil {
log.Printf("[error] failed to export otel metrics: %v", err)
time.Sleep(otelReryInterval)
continue
}
log.Printf("[debug] post otel metrics succeeded.")
Expand Down Expand Up @@ -399,7 +407,7 @@ func newOtelExporter(ctx context.Context, oc *OtelConfig) (*otlpmetricgrpc.Expor
return exporter, endpointURL.String(), nil
}

func dumpHostMetricWorker(wg *sync.WaitGroup, chs *Channels) {
func dumpHostMetricWorker(_ context.Context, wg *sync.WaitGroup, chs *Channels) {
defer wg.Done()
log.Println("[info] starting dumpHostMetricWorker")
for m := range chs.HostMetrics {
Expand All @@ -408,7 +416,7 @@ func dumpHostMetricWorker(wg *sync.WaitGroup, chs *Channels) {
}
}

func dumpServiceMetricWorker(wg *sync.WaitGroup, chs *Channels) {
func dumpServiceMetricWorker(_ context.Context, wg *sync.WaitGroup, chs *Channels) {
defer wg.Done()
log.Println("[info] starting dumpServiceMetricWorker")
for m := range chs.ServiceMetrics {
Expand All @@ -417,7 +425,7 @@ func dumpServiceMetricWorker(wg *sync.WaitGroup, chs *Channels) {
}
}

func dumpOtelMetricWorker(wg *sync.WaitGroup, chs *Channels) {
func dumpOtelMetricWorker(_ context.Context, wg *sync.WaitGroup, chs *Channels) {
defer wg.Done()
log.Println("[info] starting dumpOtelMetricWorker")
for m := range chs.OtelMetrics {
Expand Down

0 comments on commit da3e18c

Please sign in to comment.