Skip to content

Commit

Permalink
Add skywalking metrics analysis provider
Browse files Browse the repository at this point in the history
Signed-off-by: kezhenxu94 <kezhenxu94@apache.org>
  • Loading branch information
kezhenxu94 committed Dec 2, 2022
1 parent 89b0487 commit c3a75f5
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 8 deletions.
1 change: 1 addition & 0 deletions artifacts/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ spec:
- newrelic
- graphite
- dynatrace
- skywalking
address:
description: API address of this provider
type: string
Expand Down
1 change: 1 addition & 0 deletions charts/flagger/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ spec:
- newrelic
- graphite
- dynatrace
- skywalking
address:
description: API address of this provider
type: string
Expand Down
56 changes: 48 additions & 8 deletions docs/gitbook/usage/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ spec:
destination_workload:{{ target }},
!response_code:404
}.as_count()
/
/
sum:istio.mesh.request.count{
reporter:destination,
destination_workload_namespace:{{ namespace }},
Expand Down Expand Up @@ -381,11 +381,11 @@ spec:
secretRef:
name: newrelic
query: |
SELECT
filter(sum(nginx_ingress_controller_requests), WHERE status >= '500') /
SELECT
filter(sum(nginx_ingress_controller_requests), WHERE status >= '500') /
sum(nginx_ingress_controller_requests) * 100
FROM Metric
WHERE metricName = 'nginx_ingress_controller_requests'
FROM Metric
WHERE metricName = 'nginx_ingress_controller_requests'
AND ingress = '{{ ingress }}' AND namespace = '{{ namespace }}'
```

Expand Down Expand Up @@ -481,7 +481,7 @@ spec:
## Google Cloud Monitoring (Stackdriver)

Enable Workload Identity on your cluster, create a service account key that has read access to the
Cloud Monitoring API and then create an IAM policy binding between the GCP service account and the Flagger
Cloud Monitoring API and then create an IAM policy binding between the GCP service account and the Flagger
service account on Kubernetes. You can take a look at this [guide](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity)

Annotate the flagger service account
Expand All @@ -500,7 +500,7 @@ your [service account json](https://cloud.google.com/docs/authentication/product
kubectl create secret generic gcloud-sa --from-literal=project=<project-id>
```

Then reference the secret in the metric template.
Then reference the secret in the metric template.
Note: The particular MQL query used here works if [Istio is installed on GKE](https://cloud.google.com/istio/docs/istio-on-gke/installing).
```yaml
apiVersion: flagger.app/v1beta1
Expand All @@ -511,7 +511,7 @@ metadata:
spec:
provider:
type: stackdriver
secretRef:
secretRef:
name: gcloud-sa
query: |
fetch k8s_container
Expand Down Expand Up @@ -611,3 +611,43 @@ Reference the template in the canary analysis:
max: 1000
interval: 1m
```

## Apache SkyWalking

You can create custom metric checks using the Apache SkyWalking provider.

SkyWalking metric template example:

```yaml
apiVersion: flagger.app/v1beta1
kind: MetricTemplate
metadata:
name: apdex
namespace: istio-system
spec:
provider:
type: skywalking
address: http://skywalking-oap.istio-system.cluster.local:12800
query: >- #
query queryData($duration: Duration!) {
service_apdex: readMetricsValues(
condition: { name: "service_apdex", entity: { scope: Service, serviceName: "{{ target }}", normal: true } },
duration: $duration) {
label values { values { value } }
}
}
```

Reference the template in the canary analysis:

```yaml
analysis:
metrics:
- name: apdex
templateRef:
name: apdex
namespace: istio-system
thresholdRange:
max: 9900
interval: 1m
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/machinebox/graphql v0.2.2 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/machinebox/graphql v0.2.2 h1:dWKpJligYKhYKO5A2gvNhkJdQMNZeChZYyBbrZkBZfo=
github.com/machinebox/graphql v0.2.2/go.mod h1:F+kbVMHuwrQ5tYgU9JXlnskM8nOaFxCAEolaQybkjWA=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
Expand Down
1 change: 1 addition & 0 deletions kustomize/base/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ spec:
- newrelic
- graphite
- dynatrace
- skywalking
address:
description: API address of this provider
type: string
Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/providers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (factory Factory) Provider(
return NewInfluxdbProvider(provider, credentials)
case "dynatrace":
return NewDynatraceProvider(metricInterval, provider, credentials)
case "skywalking":
return NewSkyWalkingProvider(metricInterval, provider, credentials)
default:
return NewPrometheusProvider(provider, credentials)
}
Expand Down
136 changes: 136 additions & 0 deletions pkg/metrics/providers/skywalking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package providers

import (
"context"
"fmt"
"io"
"net/http"
"time"

flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
"github.com/machinebox/graphql"
)

const (
healthCheckEndpoint = "/internal/l7check"
queryEndpoint = "/graphql"
)

type skywalkingResponse = map[string]struct {
Label string `json:"label"`
Values struct {
Values []struct {
Value *float64 `json:"value"`
} `json:"values"`
} `json:"values"`
}

type duration struct {
Start string `json:"start"`
End string `json:"end"`
Step string `json:"step"`
}

// SkyWalkingProvider executes SkyWalking render URL API queries.
type SkyWalkingProvider struct {
address string
timeout time.Duration
client graphql.Client
interval time.Duration
}

// NewSkyWalkingProvider takes a provider spec and credentials map,
// validates the address, extracts the credentials map's username
// and password values if provided, and returns a skywalking client
// ready to execute queries against the skywalking render URL API.
func NewSkyWalkingProvider(metricInterval string, provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte) (*SkyWalkingProvider, error) {
md, err := time.ParseDuration(metricInterval)
if err != nil {
return nil, fmt.Errorf("error parsing metric interval: %w", err)
}

sw := SkyWalkingProvider{
timeout: 5 * time.Second,
address: provider.Address,
client: *graphql.NewClient(provider.Address + queryEndpoint),
interval: time.Duration(md.Nanoseconds()),
}

return &sw, nil
}

// RunQuery executes the skywalking render URL API query and returns the
// the first result as float64.
func (p *SkyWalkingProvider) RunQuery(query string) (float64, error) {
req := graphql.NewRequest(query)
req.Var("duration", duration{
Start: time.Now().Add(-p.interval).Format("2006-01-02 1504"),
End: time.Now().Format("2006-01-02 1504"),
Step: "MINUTE",
})
res := skywalkingResponse{}

ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

err := p.client.Run(ctx, req, &res)
if err != nil {
return 0, fmt.Errorf("request failed: %w", err)
}

for _, r := range res {
for _, v := range r.Values.Values {
if v.Value != nil {
return *v.Value, nil
}
}
}

return 0, ErrNoValuesFound
}

// IsOnline runs a simple skywalking render URL API query and returns
// an error if the API is unreachable.
func (p *SkyWalkingProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", p.address+healthCheckEndpoint, nil)
if err != nil {
return false, fmt.Errorf("error http.NewRequest: %w", err)
}

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return false, fmt.Errorf("request failed: %w", err)
}

defer r.Body.Close()

b, err := io.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %w", err)
}

// healthCheckEndpoint is added recently, for older versions of skywalking GET method is not allowed.
if r.StatusCode != http.StatusOK && r.StatusCode != http.StatusMethodNotAllowed {
return false, fmt.Errorf("error response: %s", string(b))
}

return true, nil
}
112 changes: 112 additions & 0 deletions pkg/metrics/providers/skywalking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package providers

import (
"errors"
"net/http"
"net/http/httptest"
"testing"
"time"

flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewSkyWalkingProvider(t *testing.T) {
sw, err := NewSkyWalkingProvider("1m", flaggerv1.MetricTemplateProvider{
Address: "http://skywalking-oap.istio-system.svc.cluster.local:12800",
}, nil)
require.NoError(t, err)

assert.Equal(t, 1*time.Minute, sw.interval)
}

func TestSkywalkingProvider_RunQuery(t *testing.T) {
t.Run("ok", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json := `{"data":{"service_apdex0":{"values":{"values":[{"value":10000},{"value":10010}]}}}}`
w.Write([]byte(json))
}))
defer ts.Close()

dp, err := NewSkyWalkingProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
nil,
)
require.NoError(t, err)

f, err := dp.RunQuery(`{ "query": "query queryData($duration: Duration!) { service_apdex: readMetricsValues( condition: { name: \"service_apdex\", entity: { scope: Service, serviceName: \"agent::songs\", normal: true } }, duration: $duration) { label values { values { value } } } }" }`)
require.NoError(t, err)

assert.Equal(t, 10000.0, f)
})

t.Run("no values", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json := `{"series": [{"pointlist": []}]}`
w.Write([]byte(json))
}))
defer ts.Close()

sw, err := NewSkyWalkingProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
nil,
)
require.NoError(t, err)
_, err = sw.RunQuery("")
require.True(t, errors.Is(err, ErrNoValuesFound))
})
}

func TestSkywalkingProvider_IsOnline(t *testing.T) {
t.Run("ok if method not allowed", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusMethodNotAllowed)
}))
defer ts.Close()

sw, err := NewSkyWalkingProvider("1m", flaggerv1.MetricTemplateProvider{
Address: ts.URL,
}, nil)
require.NoError(t, err)

ok, err := sw.IsOnline()
require.NoError(t, err)

assert.True(t, ok)
})

t.Run("ok if 200", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := `healthy`
w.Write([]byte(response))
}))
defer ts.Close()

sw, err := NewSkyWalkingProvider("1m", flaggerv1.MetricTemplateProvider{
Address: ts.URL,
}, nil)
require.NoError(t, err)

ok, err := sw.IsOnline()
require.NoError(t, err)

assert.Equal(t, true, ok)
})
}

0 comments on commit c3a75f5

Please sign in to comment.