Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose more prometheus metrics #670

Merged
merged 23 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d21fb0c
add metric: check_cluster_health and sync_operation_total
Sindweller Aug 12, 2021
da1ece0
add prometheus metrics
Sindweller Sep 7, 2021
bbe13c9
add operation label to apisix_request_latencies and record all status…
Sindweller Sep 8, 2021
2a1c62c
fix test error of nil pointer
Sindweller Sep 9, 2021
e244660
expose more metrics
Sindweller Sep 10, 2021
d60022d
add metricCollector in pod_test.go
Sindweller Sep 10, 2021
faff77c
change time calculation method
Sindweller Sep 10, 2021
ed4fd31
resolve conflict
Sindweller Sep 15, 2021
2655a42
add prometheus metrics
Sindweller Sep 7, 2021
d6d3115
add operation label to apisix_request_latencies and record all status…
Sindweller Sep 8, 2021
282ec12
fix test error of nil pointer
Sindweller Sep 9, 2021
fa5dcfb
add metricCollector in pod_test.go
Sindweller Sep 10, 2021
c64e4eb
change time calculation method
Sindweller Sep 10, 2021
a366e7f
pass MetricsCollector to cluster
Sindweller Sep 30, 2021
fda277a
fix conflict
Sindweller Oct 11, 2021
4b608ce
Merge remote-tracking branch 'upstream/master' into other-metrics
Sindweller Oct 11, 2021
0369f30
fix cluster_test nil pointer err
Sindweller Oct 11, 2021
b2425e3
set metricsCollector in cluster as unexported field
Sindweller Oct 12, 2021
f5fbabb
Merge remote-tracking branch 'upstream/master' into other-metrics
Sindweller Oct 13, 2021
4461888
fix e2e test nil pointer err
Sindweller Nov 9, 2021
d825c18
merge conflicts
Sindweller Nov 9, 2021
11e39a3
resolve conflict
Sindweller Nov 17, 2021
60cd7b0
fix some err
Sindweller Nov 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type ClusterOptions struct {
BaseURL string
Timeout time.Duration
// SyncInterval is the interval to sync schema.
SyncInterval types.TimeDuration
SyncInterval types.TimeDuration
MetricsCollector metrics.Collector
tokers marked this conversation as resolved.
Show resolved Hide resolved
}

type cluster struct {
Expand Down Expand Up @@ -129,7 +130,7 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
},
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
metricsCollector: metrics.NewPrometheusCollector(),
metricsCollector: o.MetricsCollector,
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
Expand Down Expand Up @@ -160,11 +161,13 @@ func (c *cluster) syncCache(ctx context.Context) {
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("success")
} else {
log.Errorw("failed to sync cache",
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("failure")
}
}()

Expand Down Expand Up @@ -486,15 +489,19 @@ func (c *cluster) do(req *http.Request) (*http.Response, error) {
return c.cli.Do(req)
}

func (c *cluster) getResource(ctx context.Context, url string) (*getResponse, error) {
func (c *cluster) getResource(ctx context.Context, url, resource string) (*getResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "get")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
Expand All @@ -515,15 +522,19 @@ func (c *cluster) getResource(ctx context.Context, url string) (*getResponse, er
return &res, nil
}

func (c *cluster) listResource(ctx context.Context, url string) (*listResponse, error) {
func (c *cluster) listResource(ctx context.Context, url, resource string) (*listResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "list")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
err = multierr.Append(err, fmt.Errorf("unexpected status code %d", resp.StatusCode))
Expand All @@ -540,15 +551,18 @@ func (c *cluster) listResource(ctx context.Context, url string) (*listResponse,
return &list, nil
}

func (c *cluster) createResource(ctx context.Context, url string, body io.Reader) (*createResponse, error) {
func (c *cluster) createResource(ctx context.Context, url, resource string, body io.Reader) (*createResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
Sindweller marked this conversation as resolved.
Show resolved Hide resolved
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "create")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)

Expand All @@ -566,15 +580,19 @@ func (c *cluster) createResource(ctx context.Context, url string, body io.Reader
return &cr, nil
}

func (c *cluster) updateResource(ctx context.Context, url string, body io.Reader) (*updateResponse, error) {
func (c *cluster) updateResource(ctx context.Context, url, resource string, body io.Reader) (*updateResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "update")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
Expand All @@ -590,15 +608,19 @@ func (c *cluster) updateResource(ctx context.Context, url string, body io.Reader
return &ur, nil
}

func (c *cluster) deleteResource(ctx context.Context, url string) error {
func (c *cluster) deleteResource(ctx context.Context, url, resource string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
if err != nil {
return err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "delete")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
Expand Down Expand Up @@ -648,15 +670,19 @@ func readBody(r io.ReadCloser, url string) string {
}

// getSchema returns the schema of APISIX object.
func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
func (c *cluster) getSchema(ctx context.Context, url, resource string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return "", err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getSchema")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
Expand All @@ -672,15 +698,19 @@ func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
}

// getList returns a list of string.
func (c *cluster) getList(ctx context.Context, url string) ([]string, error) {
func (c *cluster) getList(ctx context.Context, url, resource string) ([]string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getList")
c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)

defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
Expand Down
17 changes: 11 additions & 6 deletions pkg/apisix/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand All @@ -29,22 +30,25 @@ func TestAddCluster(t *testing.T) {
assert.Nil(t, err)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
BaseURL: "http://service1:9080/apisix/admin",
BaseURL: "http://service1:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)

clusters := apisix.ListClusters()
assert.Len(t, clusters, 1)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
Name: "service2",
BaseURL: "http://service2:9080/apisix/admin",
Name: "service2",
BaseURL: "http://service2:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
Name: "service2",
AdminKey: "http://service3:9080/apisix/admin",
Name: "service2",
AdminKey: "http://service3:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Equal(t, ErrDuplicatedCluster, err)

Expand All @@ -57,7 +61,8 @@ func TestNonExistentCluster(t *testing.T) {
assert.Nil(t, err)

err = apisix.AddCluster(context.Background(), &ClusterOptions{
BaseURL: "http://service1:9080/apisix/admin",
BaseURL: "http://service1:9080/apisix/admin",
MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)

Expand Down
16 changes: 11 additions & 5 deletions pkg/apisix/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (r *consumerClient) Get(ctx context.Context, name string) (*v1.Consumer, er

// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + name
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "consumer")
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("consumer not found",
Expand Down Expand Up @@ -109,7 +110,8 @@ func (r *consumerClient) List(ctx context.Context) ([]*v1.Consumer, error) {
zap.String("cluster", "default"),
zap.String("url", r.url),
)
consumerItems, err := r.cluster.listResource(ctx, r.url)
consumerItems, err := r.cluster.listResource(ctx, r.url, "consumer")
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
log.Errorf("failed to list consumers: %s", err)
return nil, err
Expand Down Expand Up @@ -153,7 +155,8 @@ func (r *consumerClient) Create(ctx context.Context, obj *v1.Consumer) (*v1.Cons

url := r.url + "/" + obj.Username
log.Debugw("creating consumer", zap.ByteString("body", data), zap.String("url", url))
resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
resp, err := r.cluster.createResource(ctx, url, "consumer", bytes.NewReader(data))
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
log.Errorf("failed to create consumer: %s", err)
return nil, err
Expand All @@ -180,9 +183,11 @@ func (r *consumerClient) Delete(ctx context.Context, obj *v1.Consumer) error {
return err
}
url := r.url + "/" + obj.Username
if err := r.cluster.deleteResource(ctx, url); err != nil {
if err := r.cluster.deleteResource(ctx, url, "consumer"); err != nil {
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
return err
}
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err := r.cluster.cache.DeleteConsumer(obj); err != nil {
log.Errorf("failed to reflect consumer delete to cache: %s", err)
if err != cache.ErrNotFound {
Expand All @@ -208,7 +213,8 @@ func (r *consumerClient) Update(ctx context.Context, obj *v1.Consumer) (*v1.Cons
}
url := r.url + "/" + obj.Username
log.Debugw("updating username", zap.ByteString("body", body), zap.String("url", url))
resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
resp, err := r.cluster.updateResource(ctx, url, "consumer", bytes.NewReader(body))
r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/apisix/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand Down Expand Up @@ -151,10 +152,11 @@ func TestConsumerClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newConsumerClient(&cluster{
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
metricsCollector: metrics.NewPrometheusCollector(),
})

// Create
Expand Down
16 changes: 11 additions & 5 deletions pkg/apisix/global_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (r *globalRuleClient) Get(ctx context.Context, name string) (*v1.GlobalRule

// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + rid
resp, err := r.cluster.getResource(ctx, url)
resp, err := r.cluster.getResource(ctx, url, "globalRule")
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("global_rule not found",
Expand Down Expand Up @@ -111,7 +112,8 @@ func (r *globalRuleClient) List(ctx context.Context) ([]*v1.GlobalRule, error) {
zap.String("cluster", "default"),
zap.String("url", r.url),
)
globalRuleItems, err := r.cluster.listResource(ctx, r.url)
globalRuleItems, err := r.cluster.listResource(ctx, r.url, "globalRule")
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
log.Errorf("failed to list global_rules: %s", err)
return nil, err
Expand Down Expand Up @@ -155,7 +157,8 @@ func (r *globalRuleClient) Create(ctx context.Context, obj *v1.GlobalRule) (*v1.

url := r.url + "/" + obj.ID
log.Debugw("creating global_rule", zap.ByteString("body", data), zap.String("url", url))
resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
resp, err := r.cluster.createResource(ctx, url, "globalRule", bytes.NewReader(data))
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
log.Errorf("failed to create global_rule: %s", err)
return nil, err
Expand All @@ -182,9 +185,11 @@ func (r *globalRuleClient) Delete(ctx context.Context, obj *v1.GlobalRule) error
return err
}
url := r.url + "/" + obj.ID
if err := r.cluster.deleteResource(ctx, url); err != nil {
if err := r.cluster.deleteResource(ctx, url, "globalRule"); err != nil {
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
return err
}
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err := r.cluster.cache.DeleteGlobalRule(obj); err != nil {
log.Errorf("failed to reflect global_rule delete to cache: %s", err)
if err != cache.ErrNotFound {
Expand All @@ -210,7 +215,8 @@ func (r *globalRuleClient) Update(ctx context.Context, obj *v1.GlobalRule) (*v1.
}
url := r.url + "/" + obj.ID
log.Debugw("updating global_rule", zap.ByteString("body", body), zap.String("url", url))
resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
resp, err := r.cluster.updateResource(ctx, url, "globalRule", bytes.NewReader(body))
r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/apisix/global_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand Down Expand Up @@ -151,10 +152,11 @@ func TestGlobalRuleClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newGlobalRuleClient(&cluster{
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
baseURL: u.String(),
cli: http.DefaultClient,
cache: &dummyCache{},
cacheSynced: closedCh,
metricsCollector: metrics.NewPrometheusCollector(),
})

// Create
Expand Down
2 changes: 1 addition & 1 deletion pkg/apisix/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (p *pluginClient) List(ctx context.Context) ([]string, error) {
zap.String("cluster", "default"),
zap.String("url", p.url),
)
pluginList, err := p.cluster.getList(ctx, p.url+"/list")
pluginList, err := p.cluster.getList(ctx, p.url+"/list", "plugin")
if err != nil {
log.Errorf("failed to list plugins' names: %s", err)
return nil, err
Expand Down
Loading