Skip to content

Commit

Permalink
add operation label to apisix_request_latencies and record all status…
Browse files Browse the repository at this point in the history
… codes, also fix TLS typo
  • Loading branch information
Sindweller committed Sep 8, 2021
1 parent da1ece0 commit bbe13c9
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 34 deletions.
31 changes: 17 additions & 14 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,15 @@ func (c *cluster) do(req *http.Request) (*http.Response, error) {
return c.cli.Do(req)
}

func (c *cluster) getResource(ctx context.Context, url string) (*getResponse, error) {
func (c *cluster) getResource(ctx context.Context, url, resource string) (*getResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start))
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start), "get")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -526,14 +527,14 @@ func (c *cluster) listResource(ctx context.Context, url, resource string) (*list
}
start := time.Now()
resp, err := c.do(req)
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start))
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start), "list")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
if err != nil {
return nil, err
}
defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
return nil, err
}
Expand All @@ -554,7 +555,8 @@ func (c *cluster) createResource(ctx context.Context, url, resource string, body
}
start := time.Now()
resp, err := c.do(req)
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start))
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start), "create")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
if err != nil {
return nil, err
}
Expand All @@ -563,7 +565,6 @@ func (c *cluster) createResource(ctx context.Context, url, resource string, body

if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
return nil, err
}
Expand All @@ -583,15 +584,15 @@ func (c *cluster) updateResource(ctx context.Context, url, resource string, body
}
start := time.Now()
resp, err := c.do(req)
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start))
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start), "update")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
if err != nil {
return nil, err
}
defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
err = multierr.Append(err, fmt.Errorf("error message: %s", readBody(resp.Body, url)))
return nil, err
}
Expand All @@ -610,15 +611,15 @@ func (c *cluster) deleteResource(ctx context.Context, url, resource string) erro
}
start := time.Now()
resp, err := c.do(req)
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start))
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start), "delete")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
if err != nil {
return err
}
defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
message := readBody(resp.Body, url)
err = multierr.Append(err, fmt.Errorf("error message: %s", message))
if strings.Contains(message, "still using") {
Expand Down Expand Up @@ -664,14 +665,15 @@ func readBody(r io.ReadCloser, url string) string {
}

// getSchema returns the schema of APISIX object.
func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
func (c *cluster) getSchema(ctx context.Context, url, resource string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
start := time.Now()
resp, err := c.do(req)
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start))
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start), "getSchema")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
if err != nil {
return "", err
}
Expand All @@ -690,14 +692,15 @@ func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
}

// getList returns a list of string.
func (c *cluster) getList(ctx context.Context, url string) ([]string, error) {
func (c *cluster) getList(ctx context.Context, url, resource string) ([]string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start))
c.metricsCollector.RecordAPISIXLatency(time.Now().Sub(start), "getList")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (r *consumerClient) Get(ctx context.Context, name string) (*v1.Consumer, er

// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + name
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "consumer")
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
if err == cache.ErrNotFound {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/global_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (r *globalRuleClient) Get(ctx context.Context, name string) (*v1.GlobalRule

// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + rid
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "globalRule")
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
if err == cache.ErrNotFound {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (p *pluginClient) List(ctx context.Context) ([]string, error) {
zap.String("cluster", "default"),
zap.String("url", p.url),
)
pluginList, err := p.cluster.getList(ctx, p.url+"/list")
pluginList, err := p.cluster.getList(ctx, p.url+"/list", "plugin")
if err != nil {
log.Errorf("failed to list plugins' names: %s", err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *routeClient) Get(ctx context.Context, name string) (*v1.Route, error) {

// TODO Add mutex here to avoid dog-pile effection.
url := r.url + "/" + rid
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "route")
r.cluster.metricsCollector.IncrAPISIXRequest("route")
if err != nil {
if err == cache.ErrNotFound {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (sc schemaClient) getSchema(ctx context.Context, name string) (*v1.Schema,
}

url := sc.url + "/" + name
content, err := sc.cluster.getSchema(ctx, url)
content, err := sc.cluster.getSchema(ctx, url, "schema")
if err != nil {
log.Errorw("failed to get schema from APISIX",
zap.String("name", name),
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *sslClient) Get(ctx context.Context, name string) (*v1.Ssl, error) {

// TODO Add mutex here to avoid dog-pile effection.
url := s.url + "/" + sid
resp, err := s.cluster.getResource(ctx, url)
resp, err := s.cluster.getResource(ctx, url, "ssl")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("ssl not found",
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/stream_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (r *streamRouteClient) Get(ctx context.Context, name string) (*v1.StreamRou

// TODO Add mutex here to avoid dog-pile effection.
url := r.url + "/" + rid
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "streamRoute")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("stream_route not found",
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (u *upstreamClient) Get(ctx context.Context, name string) (*v1.Upstream, er

// TODO Add mutex here to avoid dog-pile effection.
url := u.url + "/" + uid
resp, err := u.cluster.getResource(ctx, url)
resp, err := u.cluster.getResource(ctx, url, "upstream")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("upstream not found",
Expand Down
10 changes: 5 additions & 5 deletions pkg/ingress/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ func (c *apisixTlsController) syncSecretSSL(secretKey string, apisixTlsKey strin
func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
c.controller.metricsCollector.IncrSyncOperation("Tls", "success")
c.controller.metricsCollector.IncrSyncOperation("TLS", "success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
zap.Any("object", obj),
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
c.controller.metricsCollector.IncrSyncOperation("Tls", "failure")
c.controller.metricsCollector.IncrSyncOperation("TLS", "failure")
}

func (c *apisixTlsController) onAdd(obj interface{}) {
Expand All @@ -200,7 +200,7 @@ func (c *apisixTlsController) onAdd(obj interface{}) {
Object: key,
})

c.controller.metricsCollector.IncrEvents("Tls", "add")
c.controller.metricsCollector.IncrEvents("TLS", "add")
}

func (c *apisixTlsController) onUpdate(prev, curr interface{}) {
Expand All @@ -226,7 +226,7 @@ func (c *apisixTlsController) onUpdate(prev, curr interface{}) {
Object: key,
})

c.controller.metricsCollector.IncrEvents("Tls", "update")
c.controller.metricsCollector.IncrEvents("TLS", "update")
}

func (c *apisixTlsController) onDelete(obj interface{}) {
Expand Down Expand Up @@ -258,5 +258,5 @@ func (c *apisixTlsController) onDelete(obj interface{}) {
Tombstone: tls,
})

c.controller.metricsCollector.IncrEvents("Tls", "delete")
c.controller.metricsCollector.IncrEvents("TLS", "delete")
}
13 changes: 7 additions & 6 deletions pkg/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Collector interface {
RecordAPISIXCode(int, string)
// RecordAPISIXLatency records the latency for a round trip from ingress apisix
// to apisix.
RecordAPISIXLatency(time.Duration)
RecordAPISIXLatency(time.Duration, string)
// IncrAPISIXRequest increases the number of requests to apisix.
IncrAPISIXRequest(string)
// IncrCheckClusterHealth increases the number of cluster health check operations
Expand All @@ -55,7 +55,7 @@ type Collector interface {
// collector contains necessary messages to collect Prometheus metrics.
type collector struct {
isLeader prometheus.Gauge
apisixLatency prometheus.Summary
apisixLatency *prometheus.SummaryVec
apisixRequests *prometheus.CounterVec
apisixCodes *prometheus.GaugeVec
checkClusterHealth *prometheus.CounterVec
Expand Down Expand Up @@ -91,18 +91,19 @@ func NewPrometheusCollector() Collector {
prometheus.GaugeOpts{
Name: "apisix_bad_status_codes",
Namespace: _namespace,
Help: "Bad status codes of requests to APISIX",
Help: "Status codes of requests to APISIX",
ConstLabels: constLabels,
},
[]string{"resource", "status_code"},
),
apisixLatency: prometheus.NewSummary(
apisixLatency: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: _namespace,
Name: "apisix_request_latencies",
Help: "Request latencies with APISIX",
ConstLabels: constLabels,
},
[]string{"operation"},
),
apisixRequests: prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -196,8 +197,8 @@ func (c *collector) RecordAPISIXCode(code int, resource string) {

// RecordAPISIXLatency records the latency for a complete round trip
// from controller to APISIX.
func (c *collector) RecordAPISIXLatency(latency time.Duration) {
c.apisixLatency.Observe(float64(latency.Nanoseconds()))
func (c *collector) RecordAPISIXLatency(latency time.Duration, resource string) {
c.apisixLatency.WithLabelValues(resource).Observe(float64(latency.Nanoseconds()))
}

// IncrAPISIXRequest increases the number of requests for specific
Expand Down
4 changes: 3 additions & 1 deletion pkg/metrics/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func apisixLatencyTestHandler(t *testing.T, metrics []*io_prometheus_client.Metr
assert.Equal(t, *m[0].Label[0].Value, "default")
assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
assert.Equal(t, *m[0].Label[1].Value, "")
assert.Equal(t, *m[0].Label[2].Name, "operation")
assert.Equal(t, *m[0].Label[2].Value, "create")
}
}

Expand Down Expand Up @@ -203,7 +205,7 @@ func TestPrometheusCollector(t *testing.T) {
c.ResetLeader(true)
c.RecordAPISIXCode(404, "route")
c.RecordAPISIXCode(500, "upstream")
c.RecordAPISIXLatency(500 * time.Millisecond)
c.RecordAPISIXLatency(500*time.Millisecond, "create")
c.IncrAPISIXRequest("route")
c.IncrAPISIXRequest("route")
c.IncrAPISIXRequest("upstream")
Expand Down

0 comments on commit bbe13c9

Please sign in to comment.