Skip to content

Commit

Permalink
feat: expose more prometheus metrics (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sindweller committed Nov 19, 2021
1 parent 774077a commit 580e7d4
Show file tree
Hide file tree
Showing 33 changed files with 379 additions and 121 deletions.
48 changes: 39 additions & 9 deletions pkg/apisix/cluster.go
Expand Up @@ -77,7 +77,8 @@ type ClusterOptions struct {
BaseURL string
Timeout time.Duration
// SyncInterval is the interval to sync schema.
SyncInterval types.TimeDuration
SyncInterval types.TimeDuration
MetricsCollector metrics.Collector
}

type cluster struct {
Expand Down Expand Up @@ -129,7 +130,7 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
},
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
metricsCollector: metrics.NewPrometheusCollector(),
metricsCollector: o.MetricsCollector,
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
Expand Down Expand Up @@ -160,11 +161,13 @@ func (c *cluster) syncCache(ctx context.Context) {
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("success")
} else {
log.Errorw("failed to sync cache",
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("failure")
}
}()

Expand Down Expand Up @@ -486,15 +489,19 @@ func (c *cluster) do(req *http.Request) (*http.Response, error) {
return c.cli.Do(req)
}

func (c *cluster) getResource(ctx context.Context, url string) (*getResponse, error) {
func (c *cluster) getResource(ctx context.Context, url, resource string) (*getResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "get")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
Expand All @@ -515,15 +522,19 @@ func (c *cluster) getResource(ctx context.Context, url string) (*getResponse, er
return &res, nil
}

func (c *cluster) listResource(ctx context.Context, url string) (*listResponse, error) {
func (c *cluster) listResource(ctx context.Context, url, resource string) (*listResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "list")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
Expand All @@ -540,15 +551,18 @@ func (c *cluster) listResource(ctx context.Context, url string) (*listResponse,
return &list, nil
}

func (c *cluster) createResource(ctx context.Context, url string, body io.Reader) (*createResponse, error) {
func (c *cluster) createResource(ctx context.Context, url, resource string, body io.Reader) (*createResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "create")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)

Expand All @@ -566,15 +580,19 @@ func (c *cluster) createResource(ctx context.Context, url string, body io.Reader
return &cr, nil
}

func (c *cluster) updateResource(ctx context.Context, url string, body io.Reader) (*updateResponse, error) {
func (c *cluster) updateResource(ctx context.Context, url, resource string, body io.Reader) (*updateResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "update")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
Expand All @@ -590,15 +608,19 @@ func (c *cluster) updateResource(ctx context.Context, url string, body io.Reader
return &ur, nil
}

func (c *cluster) deleteResource(ctx context.Context, url string) error {
func (c *cluster) deleteResource(ctx context.Context, url, resource string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
if err != nil {
return err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "delete")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
Expand Down Expand Up @@ -648,15 +670,19 @@ func readBody(r io.ReadCloser, url string) string {
}

// getSchema returns the schema of APISIX object.
func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
func (c *cluster) getSchema(ctx context.Context, url, resource string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return "", err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getSchema")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
Expand All @@ -672,15 +698,19 @@ func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
}

// getList returns a list of string.
func (c *cluster) getList(ctx context.Context, url string) ([]string, error) {
func (c *cluster) getList(ctx context.Context, url, resource string) ([]string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getList")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
Expand Down
17 changes: 11 additions & 6 deletions pkg/apisix/cluster_test.go
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand All @@ -29,22 +30,25 @@ func TestAddCluster(t *testing.T) {
assert.Nil(t, err)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
BaseURL: "http://service1:9080/apisix/admin",
BaseURL: "http://service1:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)

clusters := apisix.ListClusters()
assert.Len(t, clusters, 1)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
Name: "service2",
BaseURL: "http://service2:9080/apisix/admin",
Name: "service2",
BaseURL: "http://service2:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
Name: "service2",
AdminKey: "http://service3:9080/apisix/admin",
Name: "service2",
AdminKey: "http://service3:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Equal(t, ErrDuplicatedCluster, err)

Expand All @@ -57,7 +61,8 @@ func TestNonExistentCluster(t *testing.T) {
assert.Nil(t, err)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
BaseURL: "http://service1:9080/apisix/admin",
BaseURL: "http://service1:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)

Expand Down
16 changes: 11 additions & 5 deletions pkg/apisix/consumer.go
Expand Up @@ -65,7 +65,8 @@ func (r *consumerClient) Get(ctx context.Context, name string) (*v1.Consumer, er

// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + name
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "consumer")
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("consumer not found",
Expand Down Expand Up @@ -109,7 +110,8 @@ func (r *consumerClient) List(ctx context.Context) ([]*v1.Consumer, error) {
zap.String("cluster", "default"),
zap.String("url", r.url),
)
consumerItems, err := r.cluster.listResource(ctx, r.url)
consumerItems, err := r.cluster.listResource(ctx, r.url, "consumer")
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
log.Errorf("failed to list consumers: %s", err)
return nil, err
Expand Down Expand Up @@ -153,7 +155,8 @@ func (r *consumerClient) Create(ctx context.Context, obj *v1.Consumer) (*v1.Cons

url := r.url + "/" + obj.Username
log.Debugw("creating consumer", zap.ByteString("body", data), zap.String("url", url))
resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
resp, err := r.cluster.createResource(ctx, url, "consumer", bytes.NewReader(data))
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
log.Errorf("failed to create consumer: %s", err)
return nil, err
Expand All @@ -180,9 +183,11 @@ func (r *consumerClient) Delete(ctx context.Context, obj *v1.Consumer) error {
return err
}
url := r.url + "/" + obj.Username
if err := r.cluster.deleteResource(ctx, url); err != nil {
if err := r.cluster.deleteResource(ctx, url, "consumer"); err != nil {
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
return err
}
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err := r.cluster.cache.DeleteConsumer(obj); err != nil {
log.Errorf("failed to reflect consumer delete to cache: %s", err)
if err != cache.ErrNotFound {
Expand All @@ -208,7 +213,8 @@ func (r *consumerClient) Update(ctx context.Context, obj *v1.Consumer) (*v1.Cons
}
url := r.url + "/" + obj.Username
log.Debugw("updating username", zap.ByteString("body", body), zap.String("url", url))
resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
resp, err := r.cluster.updateResource(ctx, url, "consumer", bytes.NewReader(body))
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/apisix/consumer_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand Down Expand Up @@ -151,10 +152,11 @@ func TestConsumerClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newConsumerClient(&cluster{
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
metricsCollector: metrics.NewPrometheusCollector(),
})

// Create
Expand Down
16 changes: 11 additions & 5 deletions pkg/apisix/global_rule.go
Expand Up @@ -67,7 +67,8 @@ func (r *globalRuleClient) Get(ctx context.Context, name string) (*v1.GlobalRule

// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + rid
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "globalRule")
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("global_rule not found",
Expand Down Expand Up @@ -111,7 +112,8 @@ func (r *globalRuleClient) List(ctx context.Context) ([]*v1.GlobalRule, error) {
zap.String("cluster", "default"),
zap.String("url", r.url),
)
globalRuleItems, err := r.cluster.listResource(ctx, r.url)
globalRuleItems, err := r.cluster.listResource(ctx, r.url, "globalRule")
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
log.Errorf("failed to list global_rules: %s", err)
return nil, err
Expand Down Expand Up @@ -155,7 +157,8 @@ func (r *globalRuleClient) Create(ctx context.Context, obj *v1.GlobalRule) (*v1.

url := r.url + "/" + obj.ID
log.Debugw("creating global_rule", zap.ByteString("body", data), zap.String("url", url))
resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
resp, err := r.cluster.createResource(ctx, url, "globalRule", bytes.NewReader(data))
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
log.Errorf("failed to create global_rule: %s", err)
return nil, err
Expand All @@ -182,9 +185,11 @@ func (r *globalRuleClient) Delete(ctx context.Context, obj *v1.GlobalRule) error
return err
}
url := r.url + "/" + obj.ID
if err := r.cluster.deleteResource(ctx, url); err != nil {
if err := r.cluster.deleteResource(ctx, url, "globalRule"); err != nil {
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
return err
}
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err := r.cluster.cache.DeleteGlobalRule(obj); err != nil {
log.Errorf("failed to reflect global_rule delete to cache: %s", err)
if err != cache.ErrNotFound {
Expand All @@ -210,7 +215,8 @@ func (r *globalRuleClient) Update(ctx context.Context, obj *v1.GlobalRule) (*v1.
}
url := r.url + "/" + obj.ID
log.Debugw("updating global_rule", zap.ByteString("body", body), zap.String("url", url))
resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
resp, err := r.cluster.updateResource(ctx, url, "globalRule", bytes.NewReader(body))
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/apisix/global_rule_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand Down Expand Up @@ -151,10 +152,11 @@ func TestGlobalRuleClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newGlobalRuleClient(&cluster{
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
metricsCollector: metrics.NewPrometheusCollector(),
})

// Create
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/plugin.go
Expand Up @@ -41,7 +41,7 @@ func (p *pluginClient) List(ctx context.Context) ([]string, error) {
zap.String("cluster", "default"),
zap.String("url", p.url),
)
pluginList, err := p.cluster.getList(ctx, p.url+"/list")
pluginList, err := p.cluster.getList(ctx, p.url+"/list", "plugin")
if err != nil {
log.Errorf("failed to list plugins' names: %s", err)
return nil, err
Expand Down

0 comments on commit 580e7d4

Please sign in to comment.