diff --git a/push/manager.go b/push/manager.go index c8a4d27..09d09df 100644 --- a/push/manager.go +++ b/push/manager.go @@ -111,6 +111,7 @@ func Setup(conf *config.Config, logger *logrus.Logger) error { if cli.Ping().Err() != nil { return errors.New("can not connect to admin redis") } + setupMetrics() _manager, err = NewManger(cli, logger) return err } diff --git a/push/metrics.go b/push/metrics.go new file mode 100644 index 0000000..c44cfd1 --- /dev/null +++ b/push/metrics.go @@ -0,0 +1,53 @@ +package push + +import "github.com/prometheus/client_golang/prometheus" + +type PerformanceMetrics struct { + ConsumeLatencies *prometheus.SummaryVec + PushLatencies *prometheus.SummaryVec + PushHTTPCodes *prometheus.CounterVec +} + +var metrics *PerformanceMetrics + +func setupMetrics() { + metrics = &PerformanceMetrics{} + consumeLatencies := prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "infra", + Subsystem: "lmstfy_pusher", + Name: "consume_latency_milliseconds", + Help: "latencies", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.001}, + }, + []string{"pool", "namespace", "queue"}, + ) + + pushLatencies := prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "infra", + Subsystem: "lmstfy_pusher", + Name: "push_latency_milliseconds", + Help: "push http latencies", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.001}, + }, + []string{"pool", "namespace", "queue"}, + ) + + httpCodes := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "infra", + Subsystem: "lmstfy_pusher", + Name: "push_http_codes", + Help: "push response code", + }, + []string{"pool", "namespace", "queue", "code"}, + ) + + prometheus.MustRegister(consumeLatencies) + prometheus.MustRegister(pushLatencies) + prometheus.MustRegister(httpCodes) + metrics.ConsumeLatencies = consumeLatencies + metrics.PushLatencies = pushLatencies + metrics.PushHTTPCodes = httpCodes +} diff --git a/push/pusher.go b/push/pusher.go index 8da89f0..f77249b 100644 --- a/push/pusher.go +++ b/push/pusher.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" "sync" "time" @@ -75,6 +76,7 @@ func (p *Pusher) pollQueue() { engine := engine.GetEngine(p.Pool) for { + now := time.Now() job, err := engine.ConsumeByPush(p.Namespace, p.Queue, p.Timeout, 3) if err != nil { p.logger.WithFields(logrus.Fields{ @@ -86,6 +88,10 @@ func (p *Pusher) pollQueue() { } select { case p.jobCh <- job: + metrics.ConsumeLatencies.WithLabelValues( + p.Pool, + p.Namespace, + p.Queue).Observe(time.Since(now).Seconds() * 1000) /* do nothing */ case <-p.stopCh: p.logger.WithFields(logrus.Fields{ @@ -151,6 +157,19 @@ func (p *Pusher) startWorker(num int) { } func (p *Pusher) sendJobToUser(job engine.Job) error { + var statusCode int + defer func(t time.Time) { + metrics.PushLatencies.WithLabelValues( + p.Pool, + p.Namespace, + p.Queue).Observe(time.Since(t).Seconds() * 1000) + metrics.PushHTTPCodes.WithLabelValues( + p.Pool, + p.Namespace, + p.Queue, + strconv.Itoa(statusCode), + ).Inc() + }(time.Now()) jobBytes, _ := job.MarshalText() req, err := http.NewRequest(http.MethodPost, p.Endpoint, bytes.NewReader(jobBytes)) if err != nil { @@ -163,7 +182,8 @@ func (p *Pusher) sendJobToUser(job engine.Job) error { } ioutil.ReadAll(resp.Body) resp.Body.Close() - if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { + statusCode = resp.StatusCode + if statusCode >= http.StatusOK && statusCode < http.StatusMultipleChoices { e := engine.GetEngine(p.Pool) return e.Delete(p.Namespace, p.Queue, job.ID()) }