From c3a75f5ff2d2db9f8c8f274765dfa872dfdb4cd1 Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Fri, 2 Dec 2022 16:09:25 +0800 Subject: [PATCH] Add skywalking metrics analysis provider Signed-off-by: kezhenxu94 --- artifacts/flagger/crd.yaml | 1 + charts/flagger/crds/crd.yaml | 1 + docs/gitbook/usage/metrics.md | 56 ++++++++-- go.mod | 1 + go.sum | 2 + kustomize/base/flagger/crd.yaml | 1 + pkg/metrics/providers/factory.go | 2 + pkg/metrics/providers/skywalking.go | 136 +++++++++++++++++++++++ pkg/metrics/providers/skywalking_test.go | 112 +++++++++++++++++++ 9 files changed, 304 insertions(+), 8 deletions(-) create mode 100644 pkg/metrics/providers/skywalking.go create mode 100644 pkg/metrics/providers/skywalking_test.go diff --git a/artifacts/flagger/crd.yaml b/artifacts/flagger/crd.yaml index ed2ce257d..aa145ff41 100644 --- a/artifacts/flagger/crd.yaml +++ b/artifacts/flagger/crd.yaml @@ -1160,6 +1160,7 @@ spec: - newrelic - graphite - dynatrace + - skywalking address: description: API address of this provider type: string diff --git a/charts/flagger/crds/crd.yaml b/charts/flagger/crds/crd.yaml index ed2ce257d..aa145ff41 100644 --- a/charts/flagger/crds/crd.yaml +++ b/charts/flagger/crds/crd.yaml @@ -1160,6 +1160,7 @@ spec: - newrelic - graphite - dynatrace + - skywalking address: description: API address of this provider type: string diff --git a/docs/gitbook/usage/metrics.md b/docs/gitbook/usage/metrics.md index 854a8e90a..c74ba273f 100644 --- a/docs/gitbook/usage/metrics.md +++ b/docs/gitbook/usage/metrics.md @@ -246,7 +246,7 @@ spec: destination_workload:{{ target }}, !response_code:404 }.as_count() - / + / sum:istio.mesh.request.count{ reporter:destination, destination_workload_namespace:{{ namespace }}, @@ -381,11 +381,11 @@ spec: secretRef: name: newrelic query: | - SELECT - filter(sum(nginx_ingress_controller_requests), WHERE status >= '500') / + SELECT + filter(sum(nginx_ingress_controller_requests), WHERE status >= '500') / sum(nginx_ingress_controller_requests) * 100 - FROM Metric - WHERE metricName = 'nginx_ingress_controller_requests' + FROM Metric + WHERE metricName = 'nginx_ingress_controller_requests' AND ingress = '{{ ingress }}' AND namespace = '{{ namespace }}' ``` @@ -481,7 +481,7 @@ spec: ## Google Cloud Monitoring (Stackdriver) Enable Workload Identity on your cluster, create a service account key that has read access to the -Cloud Monitoring API and then create an IAM policy binding between the GCP service account and the Flagger +Cloud Monitoring API and then create an IAM policy binding between the GCP service account and the Flagger service account on Kubernetes. You can take a look at this [guide](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity) Annotate the flagger service account @@ -500,7 +500,7 @@ your [service account json](https://cloud.google.com/docs/authentication/product kubectl create secret generic gcloud-sa --from-literal=project= ``` -Then reference the secret in the metric template. +Then reference the secret in the metric template. Note: The particular MQL query used here works if [Istio is installed on GKE](https://cloud.google.com/istio/docs/istio-on-gke/installing). ```yaml apiVersion: flagger.app/v1beta1 @@ -511,7 +511,7 @@ metadata: spec: provider: type: stackdriver - secretRef: + secretRef: name: gcloud-sa query: | fetch k8s_container @@ -611,3 +611,43 @@ Reference the template in the canary analysis: max: 1000 interval: 1m ``` + +## Apache SkyWalking + +You can create custom metric checks using the Apache SkyWalking provider. + +SkyWalking metric template example: + +```yaml +apiVersion: flagger.app/v1beta1 +kind: MetricTemplate +metadata: + name: apdex + namespace: istio-system +spec: + provider: + type: skywalking + address: http://skywalking-oap.istio-system.cluster.local:12800 + query: >- # + query queryData($duration: Duration!) { + service_apdex: readMetricsValues( + condition: { name: "service_apdex", entity: { scope: Service, serviceName: "{{ target }}", normal: true } }, + duration: $duration) { + label values { values { value } } + } + } +``` + +Reference the template in the canary analysis: + +```yaml + analysis: + metrics: + - name: apdex + templateRef: + name: apdex + namespace: istio-system + thresholdRange: + max: 9900 + interval: 1m +``` diff --git a/go.mod b/go.mod index 511a0a79b..061bc1d9b 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/machinebox/graphql v0.2.2 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index a6dfd746d..307bba29f 100644 --- a/go.sum +++ b/go.sum @@ -290,6 +290,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= +github.com/machinebox/graphql v0.2.2 h1:dWKpJligYKhYKO5A2gvNhkJdQMNZeChZYyBbrZkBZfo= +github.com/machinebox/graphql v0.2.2/go.mod h1:F+kbVMHuwrQ5tYgU9JXlnskM8nOaFxCAEolaQybkjWA= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= diff --git a/kustomize/base/flagger/crd.yaml b/kustomize/base/flagger/crd.yaml index ed2ce257d..aa145ff41 100644 --- a/kustomize/base/flagger/crd.yaml +++ b/kustomize/base/flagger/crd.yaml @@ -1160,6 +1160,7 @@ spec: - newrelic - graphite - dynatrace + - skywalking address: description: API address of this provider type: string diff --git a/pkg/metrics/providers/factory.go b/pkg/metrics/providers/factory.go index db5bd9f5b..7bf77ae9e 100644 --- a/pkg/metrics/providers/factory.go +++ b/pkg/metrics/providers/factory.go @@ -44,6 +44,8 @@ func (factory Factory) Provider( return NewInfluxdbProvider(provider, credentials) case "dynatrace": return NewDynatraceProvider(metricInterval, provider, credentials) + case "skywalking": + return NewSkyWalkingProvider(metricInterval, provider, credentials) default: return NewPrometheusProvider(provider, credentials) } diff --git a/pkg/metrics/providers/skywalking.go b/pkg/metrics/providers/skywalking.go new file mode 100644 index 000000000..933559d1b --- /dev/null +++ b/pkg/metrics/providers/skywalking.go @@ -0,0 +1,136 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "context" + "fmt" + "io" + "net/http" + "time" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + "github.com/machinebox/graphql" +) + +const ( + healthCheckEndpoint = "/internal/l7check" + queryEndpoint = "/graphql" +) + +type skywalkingResponse = map[string]struct { + Label string `json:"label"` + Values struct { + Values []struct { + Value *float64 `json:"value"` + } `json:"values"` + } `json:"values"` +} + +type duration struct { + Start string `json:"start"` + End string `json:"end"` + Step string `json:"step"` +} + +// SkyWalkingProvider executes SkyWalking render URL API queries. +type SkyWalkingProvider struct { + address string + timeout time.Duration + client graphql.Client + interval time.Duration +} + +// NewSkyWalkingProvider takes a provider spec and credentials map, +// validates the address, extracts the credentials map's username +// and password values if provided, and returns a skywalking client +// ready to execute queries against the skywalking render URL API. +func NewSkyWalkingProvider(metricInterval string, provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte) (*SkyWalkingProvider, error) { + md, err := time.ParseDuration(metricInterval) + if err != nil { + return nil, fmt.Errorf("error parsing metric interval: %w", err) + } + + sw := SkyWalkingProvider{ + timeout: 5 * time.Second, + address: provider.Address, + client: *graphql.NewClient(provider.Address + queryEndpoint), + interval: time.Duration(md.Nanoseconds()), + } + + return &sw, nil +} + +// RunQuery executes the skywalking render URL API query and returns the +// the first result as float64. +func (p *SkyWalkingProvider) RunQuery(query string) (float64, error) { + req := graphql.NewRequest(query) + req.Var("duration", duration{ + Start: time.Now().Add(-p.interval).Format("2006-01-02 1504"), + End: time.Now().Format("2006-01-02 1504"), + Step: "MINUTE", + }) + res := skywalkingResponse{} + + ctx, cancel := context.WithTimeout(context.Background(), p.timeout) + defer cancel() + + err := p.client.Run(ctx, req, &res) + if err != nil { + return 0, fmt.Errorf("request failed: %w", err) + } + + for _, r := range res { + for _, v := range r.Values.Values { + if v.Value != nil { + return *v.Value, nil + } + } + } + + return 0, ErrNoValuesFound +} + +// IsOnline runs a simple skywalking render URL API query and returns +// an error if the API is unreachable. +func (p *SkyWalkingProvider) IsOnline() (bool, error) { + req, err := http.NewRequest("GET", p.address+healthCheckEndpoint, nil) + if err != nil { + return false, fmt.Errorf("error http.NewRequest: %w", err) + } + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return false, fmt.Errorf("request failed: %w", err) + } + + defer r.Body.Close() + + b, err := io.ReadAll(r.Body) + if err != nil { + return false, fmt.Errorf("error reading body: %w", err) + } + + // healthCheckEndpoint is added recently, for older versions of skywalking GET method is not allowed. + if r.StatusCode != http.StatusOK && r.StatusCode != http.StatusMethodNotAllowed { + return false, fmt.Errorf("error response: %s", string(b)) + } + + return true, nil +} diff --git a/pkg/metrics/providers/skywalking_test.go b/pkg/metrics/providers/skywalking_test.go new file mode 100644 index 000000000..266e8cf63 --- /dev/null +++ b/pkg/metrics/providers/skywalking_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "errors" + "net/http" + "net/http/httptest" + "testing" + "time" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewSkyWalkingProvider(t *testing.T) { + sw, err := NewSkyWalkingProvider("1m", flaggerv1.MetricTemplateProvider{ + Address: "http://skywalking-oap.istio-system.svc.cluster.local:12800", + }, nil) + require.NoError(t, err) + + assert.Equal(t, 1*time.Minute, sw.interval) +} + +func TestSkywalkingProvider_RunQuery(t *testing.T) { + t.Run("ok", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := `{"data":{"service_apdex0":{"values":{"values":[{"value":10000},{"value":10010}]}}}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + dp, err := NewSkyWalkingProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + nil, + ) + require.NoError(t, err) + + f, err := dp.RunQuery(`{ "query": "query queryData($duration: Duration!) { service_apdex: readMetricsValues( condition: { name: \"service_apdex\", entity: { scope: Service, serviceName: \"agent::songs\", normal: true } }, duration: $duration) { label values { values { value } } } }" }`) + require.NoError(t, err) + + assert.Equal(t, 10000.0, f) + }) + + t.Run("no values", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := `{"series": [{"pointlist": []}]}` + w.Write([]byte(json)) + })) + defer ts.Close() + + sw, err := NewSkyWalkingProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + nil, + ) + require.NoError(t, err) + _, err = sw.RunQuery("") + require.True(t, errors.Is(err, ErrNoValuesFound)) + }) +} + +func TestSkywalkingProvider_IsOnline(t *testing.T) { + t.Run("ok if method not allowed", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusMethodNotAllowed) + })) + defer ts.Close() + + sw, err := NewSkyWalkingProvider("1m", flaggerv1.MetricTemplateProvider{ + Address: ts.URL, + }, nil) + require.NoError(t, err) + + ok, err := sw.IsOnline() + require.NoError(t, err) + + assert.True(t, ok) + }) + + t.Run("ok if 200", func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := `healthy` + w.Write([]byte(response)) + })) + defer ts.Close() + + sw, err := NewSkyWalkingProvider("1m", flaggerv1.MetricTemplateProvider{ + Address: ts.URL, + }, nil) + require.NoError(t, err) + + ok, err := sw.IsOnline() + require.NoError(t, err) + + assert.Equal(t, true, ok) + }) +}