Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Enable metrics publishing for mt-gateway #1645

Merged
merged 9 commits into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions cmd/mt-gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"fmt"
"github.com/grafana/metrictank/stats"
"net/http"
"net/http/httputil"
"strconv"
"strings"
"time"

"github.com/grafana/metrictank/cmd/mt-gateway/ingest"
"github.com/grafana/metrictank/publish"
Expand Down Expand Up @@ -78,25 +80,35 @@ func (api Api) Mux() *http.ServeMux {

//Add logging and default orgId middleware to the http handler
func withMiddleware(svc string, base http.Handler) http.Handler {
return defaultOrgIdMiddleware(loggingMiddleware(svc, base))
return defaultOrgIdMiddleware(statsMiddleware(loggingMiddleware(svc, base)))
}

//http.ResponseWriter that saves the status code
type statusRecorder struct {
http.ResponseWriter
status int
}
//add request metrics to the given handler
func statsMiddleware(base http.Handler) http.Handler {
stats := requestStats{
responseCounts: make(map[string]map[int]*stats.CounterRate32),
latencyHistograms: make(map[string]*stats.LatencyHistogram15s32),
sizeMeters: make(map[string]*stats.Meter32),
}

//delegate to the main response writer, but save the code
func (rec *statusRecorder) WriteHeader(code int) {
rec.status = code
rec.ResponseWriter.WriteHeader(code)
return http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) {
start := time.Now()
recorder := responseRecorder{w, -1, 0}
base.ServeHTTP(&recorder, request)
path := pathSlug(request.URL.Path)
stats.PathLatency(path, time.Since(start))
stats.PathStatusCount(path, recorder.status)
// only record the request size if the request succeeded.
if recorder.status < 300 {
stats.PathSize(path, recorder.size)
}
})
}

//add request logging to the given handler
func loggingMiddleware(svc string, base http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) {
recorder := statusRecorder{w, -1}
recorder := responseRecorder{w, -1, 0}
base.ServeHTTP(&recorder, request)
log.WithField("service", svc).
WithField("method", request.Method).
Expand Down
20 changes: 20 additions & 0 deletions cmd/mt-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"net/url"
"os"
"runtime"
"strings"
"time"

"github.com/grafana/globalconf"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/stats"
log "github.com/sirupsen/logrus"
)

Expand All @@ -23,6 +26,14 @@ var (
importerURL = flag.String("importer-url", "", "mt-whisper-importer-writer address")
defaultOrgId = flag.Int("default-org-id", -1, "default org ID to send to downstream services if none is provided")
brokers = flag.String("kafka-tcp-addr", "localhost:9092", "kafka tcp address(es) for metrics, in csv host[:port] format")

// stats
statsEnabled = flag.Bool("stats-enabled", false, "enable sending graphite messages for instrumentation")
statsPrefix = flag.String("stats-prefix", "mt-gateway.stats.default.$hostname", "stats prefix (will add trailing dot automatically if needed)")
statsAddr = flag.String("stats-addr", "localhost:2003", "graphite address")
statsInterval = flag.Int("stats-interval", 10, "interval in seconds to send statistics")
statsBufferSize = flag.Int("stats-buffer-size", 20000, "how many messages (holding all measurements from one interval) to buffer up in case graphite endpoint is unavailable.")
statsTimeout = flag.Duration("stats-timeout", time.Second*10, "timeout after which a write is considered not successful")
)

type Urls struct {
Expand Down Expand Up @@ -71,6 +82,15 @@ func main() {
return
}

if *statsEnabled {
stats.NewMemoryReporter()
hostname, _ := os.Hostname()
prefix := strings.Replace(*statsPrefix, "$hostname", strings.Replace(hostname, ".", "_", -1), -1)
stats.NewGraphite(prefix, *statsAddr, *statsInterval, *statsBufferSize, *statsTimeout)
} else {
stats.NewDevnull()
}

urls := Urls{}
urls.kafkaBrokers = *brokers

Expand Down
87 changes: 87 additions & 0 deletions cmd/mt-gateway/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"fmt"
"net/http"
"path"
"strings"
"sync"
"time"

"github.com/grafana/metrictank/stats"
)

//http.ResponseWriter that saves the status code and body size
type responseRecorder struct {
http.ResponseWriter
status int
size int
}

//delegate to the main response writer, but save the code
func (rec *responseRecorder) WriteHeader(code int) {
rec.status = code
rec.ResponseWriter.WriteHeader(code)
}

//delegate to the main response writer, but record the number of bytes written
func (rec *responseRecorder) Write(data []byte) (int, error) {
size, err := rec.ResponseWriter.Write(data)
rec.size += size
return size, err
}

type requestStats struct {
sync.Mutex
responseCounts map[string]map[int]*stats.CounterRate32
latencyHistograms map[string]*stats.LatencyHistogram15s32
sizeMeters map[string]*stats.Meter32
}

func (r *requestStats) PathStatusCount(path string, status int) {
metricKey := fmt.Sprintf("api.request.%s.status.%d", path, status)
r.Lock()
p, ok := r.responseCounts[path]
if !ok {
p = make(map[int]*stats.CounterRate32)
r.responseCounts[path] = p
}
c, ok := p[status]
if !ok {
c = stats.NewCounterRate32(metricKey)
p[status] = c
}
r.Unlock()
c.Inc()
}

func (r *requestStats) PathLatency(path string, dur time.Duration) {
r.Lock()
p, ok := r.latencyHistograms[path]
if !ok {
p = stats.NewLatencyHistogram15s32(fmt.Sprintf("api.request.%s", path))
r.latencyHistograms[path] = p
}
r.Unlock()
p.Value(dur)
}

func (r *requestStats) PathSize(path string, size int) {
r.Lock()
p, ok := r.sizeMeters[path]
if !ok {
p = stats.NewMeter32(fmt.Sprintf("api.request.%s.size", path), false)
r.sizeMeters[path] = p
}
r.Unlock()
p.Value(size)
}

//convert the request path to a metrics-safe slug
func pathSlug(p string) string {
slug := strings.TrimPrefix(path.Clean(p), "/")
if slug == "" {
slug = "root"
}
return strings.Replace(strings.Replace(slug, "/", "_", -1), ".", "_", -1)
}
12 changes: 12 additions & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ Flags:
restrict publishing data belonging to org id; 0 means no restriction (may be given multiple times, once per topic specified in 'metrics-topic', as a comma-separated list)
-schemas-file string
path to carbon storage-schemas.conf file (default "/etc/metrictank/storage-schemas.conf")
-stats-addr string
graphite address (default "localhost:2003")
-stats-buffer-size int
how many messages (holding all measurements from one interval) to buffer up in case graphite endpoint is unavailable. (default 20000)
-stats-enabled
enable sending graphite messages for instrumentation
-stats-interval int
interval in seconds to send statistics (default 10)
-stats-prefix string
stats prefix (will add trailing dot automatically if needed) (default "mt-gateway.stats.default.$hostname")
-stats-timeout duration
timeout after which a write is considered not successful (default 10s)
-v2
enable optimized MetricPoint payload (default true)
-v2-clear-interval duration
Expand Down
23 changes: 23 additions & 0 deletions scripts/config/mt-gateway.ini
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,26 @@ v2-clear-interval = 1h0m0s

#encode org-id in messages
v2-org = true


###
### Stats
###

#enable sending graphite messages for instrumentation
stats-enabled = false

#stats prefix (will add trailing dot automatically if needed)
stats-prefix = mt-gateway.stats.default.$hostname

#graphite address
stats-addr = localhost:2003

#interval in seconds to send statistics
stats-interval = 10

#how many messages (holding all measurements from one interval) to buffer up in case graphite endpoint is unavailable
stats-buffer-size = 20000

#timeout after which a write is considered not successful
stats-timeout = 10s