Skip to content

Commit

Permalink
TT-5741 PrometheusPump write optimizations (#452)
Browse files Browse the repository at this point in the history
* prometheus write optms

* refactoring prometheus code + adding tests

* linting comment

* removing unused var

* linting test errors

* solving base metrics init + adding more code comment

* fixing TestPromtheusCreateBasicMetrics test
  • Loading branch information
tbuchaillot committed Jul 6, 2022
1 parent 793ad91 commit 2622d9b
Show file tree
Hide file tree
Showing 2 changed files with 495 additions and 76 deletions.
273 changes: 203 additions & 70 deletions pumps/prometheus.go
Expand Up @@ -5,8 +5,9 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"

"github.com/TykTechnologies/logrus"
"github.com/TykTechnologies/tyk-pump/analytics"

"github.com/mitchellh/mapstructure"
Expand All @@ -23,7 +24,7 @@ type PrometheusPump struct {
OauthStatusMetrics *prometheus.CounterVec
TotalLatencyMetrics *prometheus.HistogramVec

customMetrics []*PrometheusMetric
allMetrics []*PrometheusMetric

CommonPumpConfig
}
Expand All @@ -35,6 +36,9 @@ type PrometheusConf struct {
Addr string `json:"listen_address" mapstructure:"listen_address"`
// The path to the Prometheus collection. For example `/metrics`.
Path string `json:"path" mapstructure:"path"`
// This will enable an experimental feature that will aggregate the histogram metrics request time values before exposing them to prometheus.
// Enabling this will reduce the CPU usage of your prometheus pump but you will loose histogram precision. Experimental.
AggregateObservations bool `json:"aggregate_observations" mapstructure:"aggregate_observations"`
// Custom Prometheus metrics.
CustomMetrics []PrometheusMetric `json:"custom_metrics" mapstructure:"custom_metrics"`
}
Expand All @@ -58,61 +62,76 @@ type PrometheusMetric struct {
enabled bool
counterVec *prometheus.CounterVec
histogramVec *prometheus.HistogramVec

counterMap map[string]uint64

histogramMap map[string]histogramCounter
aggregatedObservations bool
}

//histogramCounter is a helper struct to mantain the totalRequestTime and hits in memory
type histogramCounter struct {
totalRequestTime uint64
hits uint64
}

const COUNTER_TYPE = "counter"
const HISTOGRAM_TYPE = "histogram"

var prometheusPrefix = "prometheus-pump"
var prometheusDefaultENV = PUMPS_ENV_PREFIX + "_PROMETHEUS"

var buckets = []float64{1, 2, 5, 7, 10, 15, 20, 25, 30, 40, 50, 60, 70, 80, 90, 100, 200, 300, 400, 500, 1000, 2000, 5000, 10000, 30000, 60000}

func (p *PrometheusPump) New() Pump {
newPump := PrometheusPump{}
newPump.TotalStatusMetrics = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tyk_http_status",
Help: "HTTP status codes per API",
},
[]string{"code", "api"},
)
newPump.PathStatusMetrics = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tyk_http_status_per_path",
Help: "HTTP status codes per API path and method",
},
[]string{"code", "api", "path", "method"},
)
newPump.KeyStatusMetrics = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tyk_http_status_per_key",
Help: "HTTP status codes per API key",
},
[]string{"code", "key"},
)
newPump.OauthStatusMetrics = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tyk_http_status_per_oauth_client",
Help: "HTTP status codes per oAuth client id",
},
[]string{"code", "client_id"},
)
newPump.TotalLatencyMetrics = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "tyk_latency",
Help: "Latency added by Tyk, Total Latency, and upstream latency per API",
Buckets: buckets,
},
[]string{"type", "api"},
)

prometheus.MustRegister(newPump.TotalStatusMetrics)
prometheus.MustRegister(newPump.PathStatusMetrics)
prometheus.MustRegister(newPump.KeyStatusMetrics)
prometheus.MustRegister(newPump.OauthStatusMetrics)
prometheus.MustRegister(newPump.TotalLatencyMetrics)

newPump.CreateBasicMetrics()

return &newPump
}

//CreateBasicMetrics stores all the predefined pump metrics in allMetrics slice
func (p *PrometheusPump) CreateBasicMetrics() {

//counter metrics
totalStatusMetric := &PrometheusMetric{
Name: "tyk_http_status",
Help: "HTTP status codes per API",
MetricType: COUNTER_TYPE,
Labels: []string{"code", "api"},
}
pathStatusMetrics := &PrometheusMetric{
Name: "tyk_http_status_per_path",
Help: "HTTP status codes per API path and method",
MetricType: COUNTER_TYPE,
Labels: []string{"code", "api", "path", "method"},
}
keyStatusMetrics := &PrometheusMetric{
Name: "tyk_http_status_per_key",
Help: "HTTP status codes per API key",
MetricType: COUNTER_TYPE,
Labels: []string{"code", "key"},
}
oauthStatusMetrics := &PrometheusMetric{
Name: "tyk_http_status_per_oauth_client",
Help: "HTTP status codes per oAuth client id",
MetricType: COUNTER_TYPE,
Labels: []string{"code", "client_id"},
}

//histogram metrics
totalLatencyMetrics := &PrometheusMetric{
Name: "tyk_latency",
Help: "Latency added by Tyk, Total Latency, and upstream latency per API",
MetricType: HISTOGRAM_TYPE,
Buckets: buckets,
Labels: []string{"type", "api"},
}

p.allMetrics = append(p.allMetrics, totalStatusMetric, pathStatusMetrics, keyStatusMetrics, oauthStatusMetrics, totalLatencyMetrics)
}

func (p *PrometheusPump) GetName() string {
return "Prometheus Pump"
}
Expand Down Expand Up @@ -140,16 +159,30 @@ func (p *PrometheusPump) Init(conf interface{}) error {
return errors.New("Prometheus listen_addr not set")
}

//first we init the base metrics
for _, metric := range p.allMetrics {
metric.aggregatedObservations = p.conf.AggregateObservations
errInit := metric.InitVec()
if errInit != nil {
p.log.Error(errInit)
}
}

//then we check the custom ones
if len(p.conf.CustomMetrics) > 0 {
customMetrics := []*PrometheusMetric{}
for _, metric := range p.conf.CustomMetrics {
newMetric := &metric
newMetric.aggregatedObservations = p.conf.AggregateObservations
errInit := newMetric.InitVec()
if errInit != nil {
p.log.Error(errInit)
} else {
p.customMetrics = append(p.customMetrics, newMetric)
customMetrics = append(customMetrics, newMetric)
}
}

p.allMetrics = append(p.allMetrics, customMetrics...)
}

p.log.Info("Starting prometheus listener on:", p.conf.Addr)
Expand All @@ -175,38 +208,54 @@ func (p *PrometheusPump) WriteData(ctx context.Context, data []interface{}) erro
default:
}
record := item.(analytics.AnalyticsRecord)
code := strconv.Itoa(record.ResponseCode)

p.TotalStatusMetrics.WithLabelValues(code, record.APIID).Inc()
p.PathStatusMetrics.WithLabelValues(code, record.APIID, record.Path, record.Method).Inc()
p.KeyStatusMetrics.WithLabelValues(code, record.APIKey).Inc()
if record.OauthID != "" {
p.OauthStatusMetrics.WithLabelValues(code, record.OauthID).Inc()
}
p.TotalLatencyMetrics.WithLabelValues("total", record.APIID).Observe(float64(record.RequestTime))

for _, customMetric := range p.customMetrics {
if customMetric.enabled {
p.log.Debug("Processing metric:", customMetric.Name)

switch customMetric.MetricType {
case "counter":
if customMetric.counterVec != nil {
values := customMetric.GetLabelsValues(record)
customMetric.counterVec.WithLabelValues(values...).Inc()
//we loop through all the metrics avaialble.
for _, metric := range p.allMetrics {
if metric.enabled {
p.log.Debug("Processing metric:", metric.Name)
//we get the values for that metric required labels
values := metric.GetLabelsValues(record)

switch metric.MetricType {
case COUNTER_TYPE:
if metric.counterVec != nil {
//if the metric is a counter, we increment the counter memory map
err := metric.Inc(values...)
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": metric.MetricType,
"metric_name": metric.Name,
}).Error("error incrementing prometheus metric value:", err)
}
}
case "histogram":
if customMetric.histogramVec != nil {
values := customMetric.GetLabelsValues(record)
customMetric.histogramVec.WithLabelValues(values...).Observe(float64(record.RequestTime))
case HISTOGRAM_TYPE:
if metric.histogramVec != nil {
//if the metric is an histogram, we Observe the request time with the given values
err := metric.Observe(record.RequestTime, values...)
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": metric.MetricType,
"metric_name": metric.Name,
}).Error("error incrementing prometheus metric value:", err)
}
}
default:
p.log.Debug("trying to process an invalid prometheus metric type:", metric.MetricType)
}
} else {
p.log.Info("DISABLED")
}
}
}

//after looping through all the analytics records, we expose the metrics to prometheus endpoint
for _, customMetric := range p.allMetrics {
err := customMetric.Expose()
if err != nil {
p.log.WithFields(logrus.Fields{
"metric_type": customMetric.MetricType,
"metric_name": customMetric.Name,
}).Error("error writing prometheus metric:", err)
}
}

p.log.Info("Purged ", len(data), " records...")

return nil
Expand All @@ -223,6 +272,7 @@ func (pm *PrometheusMetric) InitVec() error {
},
pm.Labels,
)
pm.counterMap = make(map[string]uint64)
prometheus.MustRegister(pm.counterVec)
} else if pm.MetricType == "histogram" {
bkts := pm.Buckets
Expand All @@ -237,6 +287,7 @@ func (pm *PrometheusMetric) InitVec() error {
},
pm.Labels,
)
pm.histogramMap = make(map[string]histogramCounter)
prometheus.MustRegister(pm.histogramVec)
} else {
return errors.New("invalid metric type:" + pm.MetricType)
Expand All @@ -253,13 +304,17 @@ func (pm *PrometheusMetric) GetLabelsValues(decoded analytics.AnalyticsRecord) [
"host": decoded.Host,
"method": decoded.Method,
"path": decoded.Path,
"code": decoded.ResponseCode,
"response_code": decoded.ResponseCode,
"api_key": decoded.APIKey,
"key": decoded.APIKey,
"time_stamp": decoded.TimeStamp,
"api_version": decoded.APIVersion,
"api_name": decoded.APIName,
"api": decoded.APIID,
"api_id": decoded.APIID,
"org_id": decoded.OrgID,
"client_id": decoded.OauthID,
"oauth_id": decoded.OauthID,
"request_time": decoded.RequestTime,
"ip_address": decoded.IPAddress,
Expand All @@ -272,3 +327,81 @@ func (pm *PrometheusMetric) GetLabelsValues(decoded analytics.AnalyticsRecord) [
}
return values
}

//Inc is going to fill counterMap and histogramMap with the data from record.
func (pm *PrometheusMetric) Inc(values ...string) error {
switch pm.MetricType {
case COUNTER_TYPE:
// "response_code", "api_name", "method"
// key = map[500--apitest-GET] = 4

//map[]

pm.counterMap[strings.Join(values, "--")] += 1
default:
return errors.New("invalid metric type:" + pm.MetricType)
}

return nil
}

//Observe will fill hitogramMap with the sum of totalRequest and hits per label value if aggregate_observations is true. If aggregate_observations is set to false (default) it will execute prometheus Observe directly.
func (pm *PrometheusMetric) Observe(requestTime int64, values ...string) error {
switch pm.MetricType {
case HISTOGRAM_TYPE:
labelValues := []string{"total"}
labelValues = append(labelValues, values...)
if pm.aggregatedObservations {
key := strings.Join(labelValues, "--")

if currentValue, ok := pm.histogramMap[key]; ok {
currentValue.hits += 1
currentValue.totalRequestTime += uint64(requestTime)
pm.histogramMap[key] = currentValue
} else {
pm.histogramMap[key] = histogramCounter{
hits: 1,
totalRequestTime: uint64(requestTime),
}
}
} else {
pm.histogramVec.WithLabelValues(labelValues...).Observe(float64(requestTime))
}

default:
return errors.New("invalid metric type:" + pm.MetricType)
}
return nil
}

//Expose executes prometheus library functions using the counter/histogram vector from the PrometheusMetric struct.
//If the PrometheusMetric is COUNTER_TYPE, it will execute prometheus client Add function to add the counters from counterMap to the labels value metric
//If the PrometheusMetric is HISTOGRAM_TYPE and aggregate_observations config is true, it will calculate the average value of the metrics in the histogramMap and execute prometheus Observe.
//If aggregate_observations is false, it won't do anything since it means that we already exposed the metric.
func (pm *PrometheusMetric) Expose() error {
switch pm.MetricType {
case COUNTER_TYPE:
for key, value := range pm.counterMap {

labelsValue := strings.Split(key, "--")
pm.counterVec.WithLabelValues(labelsValue...).Add(float64(value))
}
pm.counterMap = make(map[string]uint64)
case HISTOGRAM_TYPE:
if pm.aggregatedObservations {
for key, value := range pm.histogramMap {
labelsValue := strings.Split(key, "--")
pm.histogramVec.WithLabelValues(labelsValue...).Observe(value.getAverageRequestTime())
}
pm.histogramMap = make(map[string]histogramCounter)
}
default:
return errors.New("invalid metric type:" + pm.MetricType)
}
return nil
}

//getAverageRequestTime returns the average request time of an histogramCounter dividing the sum of all the RequestTimes by the hits.
func (c histogramCounter) getAverageRequestTime() float64 {
return float64(c.totalRequestTime / c.hits)
}

0 comments on commit 2622d9b

Please sign in to comment.