diff --git a/cmd/craned/app/options/options.go b/cmd/craned/app/options/options.go index faa1ae5c0..c6a9027c7 100644 --- a/cmd/craned/app/options/options.go +++ b/cmd/craned/app/options/options.go @@ -103,6 +103,11 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.DurationVar(&o.DataSourcePromConfig.Timeout, "prometheus-timeout", 3*time.Minute, "prometheus timeout") flags.BoolVar(&o.DataSourcePromConfig.BRateLimit, "prometheus-bratelimit", false, "prometheus bratelimit") flags.IntVar(&o.DataSourcePromConfig.MaxPointsLimitPerTimeSeries, "prometheus-maxpoints", 11000, "prometheus max points limit per time series") + flags.BoolVar(&o.DataSourcePromConfig.FederatedClusterScope, "prometheus-federated-cluster-scope", false, "prometheus support federated clusters query") + flags.BoolVar(&o.DataSourcePromConfig.ThanosPartial, "prometheus-thanos-partial", false, "prometheus api to query thanos data source, hacking way, denote the thanos partial response query") + flags.BoolVar(&o.DataSourcePromConfig.ThanosDedup, "prometheus-thanos-dedup", false, "prometheus api to query thanos data source, hacking way, denote the thanos deduplicate query") + flags.StringVar(&o.DataSourcePromConfig.ClusterLabelName, "prometheus-cluster-label-name", "", "prometheus data query to distinguish the cluster label name for federated clusters datasource query") + flags.StringVar(&o.DataSourcePromConfig.ClusterLabelValue, "prometheus-cluster-label-value", "", "prometheus data query to distinguish the cluster label value for federated clusters datasource query") flags.StringVar(&o.DataSourceMockConfig.SeedFile, "seed-file", "", "mock provider seed file") flags.StringVar(&o.DataSourceGrpcConfig.Address, "grpc-ds-address", "localhost:50051", "grpc data source server address") flags.DurationVar(&o.DataSourceGrpcConfig.Timeout, "grpc-ds-timeout", time.Minute, "grpc timeout") diff --git a/go.mod b/go.mod index 7379e3690..834d00ee7 100644 --- a/go.mod +++ b/go.mod @@ -181,7 +181,6 @@ require ( golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/tools v0.1.8 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect - google.golang.org/protobuf v1.27.1 ) replace ( diff --git a/go.sum b/go.sum index 4afc15c88..535491850 100644 --- a/go.sum +++ b/go.sum @@ -310,10 +310,6 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ= github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0= -github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4 h1:vGDg3G6y661KAlhjf/8/r8JCjaIi6aV8szCP+MZRU3Y= -github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk= -github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac h1:lBKVVOA4del0Plj80PCE+nglxaJxaXanCv5N6a3laVY= -github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk= github.com/gocrane/api v0.5.1-0.20220706040335-eaadbb4b99ed h1:aARCU+Hs1ZKTqJFJT/4/or/iGR6qYwMcG99CGmBFJpg= github.com/gocrane/api v0.5.1-0.20220706040335-eaadbb4b99ed/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= diff --git a/pkg/providers/config.go b/pkg/providers/config.go index 9efd72900..405718eb0 100644 --- a/pkg/providers/config.go +++ b/pkg/providers/config.go @@ -16,6 +16,12 @@ type PromConfig struct { QueryConcurrency int BRateLimit bool MaxPointsLimitPerTimeSeries int + FederatedClusterScope bool + // for thanos query, it must when use thanos as query source https://thanos.io/tip/components/query.md/#partial-response + ThanosPartial bool + ThanosDedup bool + ClusterLabelName string + ClusterLabelValue string } // ClientAuth holds the HTTP client identity info. diff --git a/pkg/providers/grpc/grpc.go b/pkg/providers/grpc/grpc.go index 9de781405..a6114b470 100644 --- a/pkg/providers/grpc/grpc.go +++ b/pkg/providers/grpc/grpc.go @@ -13,6 +13,7 @@ import ( "github.com/gocrane/crane/pkg/metricquery" "github.com/gocrane/crane/pkg/providers" "github.com/gocrane/crane/pkg/providers/grpc/pb" + "github.com/gocrane/crane/pkg/querybuilder" ) var _ providers.Interface = &grpcClient{} @@ -30,7 +31,7 @@ type grpcClient struct { } func (g *grpcClient) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Time, endTime time.Time, step time.Duration) ([]*common.TimeSeries, error) { - m, err := grpcMetric(namer) + m, err := grpcMetric(namer, querybuilder.BuildQueryBehavior{}) if err != nil { return nil, err } @@ -84,8 +85,8 @@ func commonTimeSeriesList(tsList []*pb.TimeSeries) []*common.TimeSeries { return res } -func grpcMetric(namer metricnaming.MetricNamer) (*pb.Metric, error) { - q, err := namer.QueryBuilder().Builder(metricquery.GrpcMetricSource).BuildQuery() +func grpcMetric(namer metricnaming.MetricNamer, behavior querybuilder.BuildQueryBehavior) (*pb.Metric, error) { + q, err := namer.QueryBuilder().Builder(metricquery.GrpcMetricSource).BuildQuery(behavior) if err != nil { return nil, err } diff --git a/pkg/providers/metricserver/metricserver.go b/pkg/providers/metricserver/metricserver.go index 17d6509a4..9077873b4 100644 --- a/pkg/providers/metricserver/metricserver.go +++ b/pkg/providers/metricserver/metricserver.go @@ -1,6 +1,7 @@ package metricserver import ( + "github.com/gocrane/crane/pkg/querybuilder" cacheddiscovery "k8s.io/client-go/discovery/cached" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -27,7 +28,7 @@ type metricsServer struct { func (m *metricsServer) QueryLatestTimeSeries(metricNamer metricnaming.MetricNamer) ([]*common.TimeSeries, error) { msBuilder := metricNamer.QueryBuilder().Builder(metricquery.MetricServerMetricSource) - msQuery, err := msBuilder.BuildQuery() + msQuery, err := msBuilder.BuildQuery(querybuilder.BuildQueryBehavior{}) if err != nil { klog.Errorf("Failed to QueryLatestTimeSeries metricNamer %v, err: %v", metricNamer.BuildUniqueKey(), err) return nil, err diff --git a/pkg/providers/prom/prom.go b/pkg/providers/prom/prom.go index fd1603cfe..44b102629 100644 --- a/pkg/providers/prom/prom.go +++ b/pkg/providers/prom/prom.go @@ -4,20 +4,27 @@ import ( gocontext "context" "crypto/tls" "fmt" + "io/ioutil" "net" "net/http" "net/url" + "strings" "sync" "time" prometheus "github.com/prometheus/client_golang/api" "k8s.io/klog/v2" - "github.com/gocrane/crane/pkg/providers" + datasource "github.com/gocrane/crane/pkg/providers" ) +type ThanosConfig struct { + Partial bool + Dedup bool +} + // NewPrometheusClient returns a prometheus.Client -func NewPrometheusClient(config *providers.PromConfig) (prometheus.Client, error) { +func NewPrometheusClient(config *datasource.PromConfig) (prometheus.Client, error) { tlsConfig := &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify} @@ -34,26 +41,29 @@ func NewPrometheusClient(config *providers.PromConfig) (prometheus.Client, error }, } if config.BRateLimit { - return newPrometheusRateLimitClient(PrometheusClientID, pc, &config.Auth, config.QueryConcurrency) + return newPrometheusRateLimitClient(PrometheusClientID, pc, &config.Auth, config.QueryConcurrency, &ThanosConfig{Dedup: config.ThanosDedup, Partial: config.ThanosPartial}) } - return newPrometheusAuthClient(PrometheusClientID, pc, &config.Auth) + return newPrometheusAuthClient(PrometheusClientID, pc, &config.Auth, &ThanosConfig{Dedup: config.ThanosDedup, Partial: config.ThanosPartial}) } // prometheusAuthClient wraps the prometheus api raw client with authentication info type prometheusAuthClient struct { id string - auth *providers.ClientAuth + auth *datasource.ClientAuth client prometheus.Client + // hack here, later maybe a datasource for thanos + thanos *ThanosConfig } -func newPrometheusAuthClient(id string, config prometheus.Config, auth *providers.ClientAuth) (prometheus.Client, error) { +func newPrometheusAuthClient(id string, config prometheus.Config, auth *datasource.ClientAuth, thanos *ThanosConfig) (prometheus.Client, error) { c, err := prometheus.NewClient(config) if err != nil { return nil, err } client := &prometheusAuthClient{ + thanos: thanos, id: id, client: c, auth: auth, @@ -69,14 +79,60 @@ func (pc *prometheusAuthClient) URL(ep string, args map[string]string) *url.URL // Do implements prometheus client interface, wrapped with an auth info func (pc *prometheusAuthClient) Do(ctx gocontext.Context, req *http.Request) (*http.Response, []byte, error) { - pc.auth.Apply(req) - return pc.client.Do(ctx, req) + newReq := req + // hacking for thanos here, intercept the request and modify param + if pc.thanos != nil && (pc.thanos.Dedup || pc.thanos.Partial) { + var q url.Values + var err error + var bodyData []byte + if req.Method == http.MethodPost { + bodyReader, err := req.GetBody() + if err != nil { + return nil, nil, err + } + bodyData, err = ioutil.ReadAll(bodyReader) + if err != nil { + return nil, nil, err + } + q, err = url.ParseQuery(string(bodyData)) + if err != nil { + return nil, nil, err + } + } else if req.Method == http.MethodGet { + q = req.URL.Query() + } + + if pc.thanos.Partial { + q.Set("partial_response", "true") + } + if pc.thanos.Dedup { + q.Set("dedup", "true") + } + + klog.V(6).InfoS("Hacking thanos", "originalQueryBody", string(bodyData), "newQuery", q.Encode()) + + if req.Method == http.MethodGet { + req.URL.RawQuery = q.Encode() + } else if req.Method == http.MethodPost { + newReq, err = http.NewRequest(req.Method, req.URL.String(), strings.NewReader(q.Encode())) + if err != nil { + return nil, nil, err + } + newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded") + } + } + + pc.auth.Apply(newReq) + klog.V(6).InfoS("Query url", "url", newReq.URL, "newReq", newReq, "oldReq", req) + return pc.client.Do(ctx, newReq) } type prometheusRateLimitClient struct { id string - auth *providers.ClientAuth + auth *datasource.ClientAuth client prometheus.Client + // hack here, later maybe a datasource for thanos + thanos *ThanosConfig lock *sync.Mutex cond *sync.Cond @@ -84,7 +140,7 @@ type prometheusRateLimitClient struct { currentInFlight int } -func newPrometheusRateLimitClient(id string, config prometheus.Config, auth *providers.ClientAuth, maxInFlight int) (prometheus.Client, error) { +func newPrometheusRateLimitClient(id string, config prometheus.Config, auth *datasource.ClientAuth, maxInFlight int, thanos *ThanosConfig) (prometheus.Client, error) { c, err := prometheus.NewClient(config) if err != nil { return nil, err @@ -98,6 +154,7 @@ func newPrometheusRateLimitClient(id string, config prometheus.Config, auth *pro maxInFlight: maxInFlight, lock: lock, cond: sync.NewCond(lock), + thanos: thanos, } return client, nil @@ -110,14 +167,58 @@ func (pc *prometheusRateLimitClient) URL(ep string, args map[string]string) *url // Do implements prometheus client interface, wrapped with an auth info func (pc *prometheusRateLimitClient) Do(ctx gocontext.Context, req *http.Request) (*http.Response, []byte, error) { - pc.auth.Apply(req) + newReq := req + // hacking for thanos here, intercept the request and modify param + if pc.thanos != nil && (pc.thanos.Dedup || pc.thanos.Partial) { + var q url.Values + var err error + var bodyData []byte + if req.Method == http.MethodPost { + bodyReader, err := req.GetBody() + if err != nil { + return nil, nil, err + } + bodyData, err = ioutil.ReadAll(bodyReader) + if err != nil { + return nil, nil, err + } + q, err = url.ParseQuery(string(bodyData)) + if err != nil { + return nil, nil, err + } + } else if req.Method == http.MethodGet { + q = req.URL.Query() + } + + if pc.thanos.Partial { + q.Set("partial_response", "true") + } + if pc.thanos.Dedup { + q.Set("dedup", "true") + } + + klog.V(6).InfoS("Hacking thanos", "originalQueryBody", string(bodyData), "newQuery", q.Encode()) + + if req.Method == http.MethodGet { + req.URL.RawQuery = q.Encode() + } else if req.Method == http.MethodPost { + newReq, err = http.NewRequest(req.Method, req.URL.String(), strings.NewReader(q.Encode())) + if err != nil { + return nil, nil, err + } + newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded") + } + } + + pc.auth.Apply(newReq) + klog.V(6).InfoS("Query url", "url", newReq.URL, "newReq", newReq, "oldReq", req) klog.V(4).InfoS("Prometheus rate limit", "ratelimit", pc.Runtime()) // block wait until at least one InFlight request finished if current inflighting requests reach the max limit, avoid many time consuming requests hit the prometheus. // we use inflight to record the number of inflighting requests, because prometheus query is time-consuming when the range is large // Caller will be blocked by lock if it reached maxFlight. so caller should set timeout for context and cancel it. pc.increaseInFightWait() defer pc.decreaseInFlightSignal() - return pc.client.Do(ctx, req) + return pc.client.Do(ctx, newReq) } func (mfc *prometheusRateLimitClient) increaseInFightWait() { diff --git a/pkg/providers/prom/prometheus.go b/pkg/providers/prom/prometheus.go index 7a34720d3..4176f8020 100644 --- a/pkg/providers/prom/prometheus.go +++ b/pkg/providers/prom/prometheus.go @@ -10,6 +10,7 @@ import ( "github.com/gocrane/crane/pkg/metricnaming" "github.com/gocrane/crane/pkg/metricquery" "github.com/gocrane/crane/pkg/providers" + "github.com/gocrane/crane/pkg/querybuilder" ) type prom struct { @@ -32,7 +33,7 @@ func NewProvider(config *providers.PromConfig) (providers.Interface, error) { func (p *prom) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Time, endTime time.Time, step time.Duration) ([]*common.TimeSeries, error) { promBuilder := namer.QueryBuilder().Builder(metricquery.PrometheusMetricSource) - promQuery, err := promBuilder.BuildQuery() + promQuery, err := promBuilder.BuildQuery(querybuilder.BuildQueryBehavior{FederatedClusterScope: p.config.FederatedClusterScope, ClusterLabelName: p.config.ClusterLabelName, ClusterLabelValue: p.config.ClusterLabelValue}) if err != nil { klog.Errorf("Failed to BuildQuery: %v", err) return nil, err @@ -50,7 +51,7 @@ func (p *prom) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Ti func (p *prom) QueryLatestTimeSeries(namer metricnaming.MetricNamer) ([]*common.TimeSeries, error) { promBuilder := namer.QueryBuilder().Builder(metricquery.PrometheusMetricSource) - promQuery, err := promBuilder.BuildQuery() + promQuery, err := promBuilder.BuildQuery(querybuilder.BuildQueryBehavior{FederatedClusterScope: p.config.FederatedClusterScope, ClusterLabelName: p.config.ClusterLabelName, ClusterLabelValue: p.config.ClusterLabelValue}) if err != nil { klog.Errorf("Failed to BuildQuery: %v", err) return nil, err diff --git a/pkg/querybuilder-providers/grpc/builder.go b/pkg/querybuilder-providers/grpc/builder.go index 5fca36157..444931342 100644 --- a/pkg/querybuilder-providers/grpc/builder.go +++ b/pkg/querybuilder-providers/grpc/builder.go @@ -17,7 +17,7 @@ func NewQueryBuilder(metric *metricquery.Metric) querybuilder.Builder { } } -func (b builder) BuildQuery() (*metricquery.Query, error) { +func (b builder) BuildQuery(behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) { return gRPCQuery(&metricquery.GenericQuery{Metric: b.metric}), nil } diff --git a/pkg/querybuilder-providers/metricserver/builder.go b/pkg/querybuilder-providers/metricserver/builder.go index 43bf48e28..a4bce0318 100644 --- a/pkg/querybuilder-providers/metricserver/builder.go +++ b/pkg/querybuilder-providers/metricserver/builder.go @@ -17,7 +17,7 @@ func NewMetricServerQueryBuilder(metric *metricquery.Metric) querybuilder.Builde } } -func (b builder) BuildQuery() (*metricquery.Query, error) { +func (b builder) BuildQuery(behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) { return metricServerQuery(&metricquery.GenericQuery{Metric: b.metric}), nil } diff --git a/pkg/querybuilder-providers/metricserver/builder_test.go b/pkg/querybuilder-providers/metricserver/builder_test.go index 01071ba57..4a20ae11c 100644 --- a/pkg/querybuilder-providers/metricserver/builder_test.go +++ b/pkg/querybuilder-providers/metricserver/builder_test.go @@ -7,6 +7,7 @@ import ( v1 "k8s.io/api/core/v1" "github.com/gocrane/crane/pkg/metricquery" + "github.com/gocrane/crane/pkg/querybuilder" ) func TestNewPromQueryBuilder(t *testing.T) { @@ -20,7 +21,7 @@ func TestNewPromQueryBuilder(t *testing.T) { }, } builder := NewMetricServerQueryBuilder(metric) - _, err := builder.BuildQuery() + _, err := builder.BuildQuery(querybuilder.BuildQueryBehavior{}) if err != nil { t.Log(err) } @@ -232,7 +233,7 @@ func TestBuildQuery(t *testing.T) { for _, tc := range testCases { builder := NewMetricServerQueryBuilder(tc.metric) - query, err := builder.BuildQuery() + query, err := builder.BuildQuery(querybuilder.BuildQueryBehavior{}) if !reflect.DeepEqual(err, tc.err) { t.Fatalf("tc %v failed, got error: %v, want error: %v", tc.desc, err, tc.err) } diff --git a/pkg/querybuilder-providers/prometheus/builder.go b/pkg/querybuilder-providers/prometheus/builder.go index 56fbb4e7b..c0a101cec 100644 --- a/pkg/querybuilder-providers/prometheus/builder.go +++ b/pkg/querybuilder-providers/prometheus/builder.go @@ -15,25 +15,25 @@ import ( // todo: later we change these templates to configurable like prometheus-adapter const ( // WorkloadCpuUsageExprTemplate is used to query workload cpu usage by promql, param is namespace,workload-name,duration str - WorkloadCpuUsageExprTemplate = `sum(irate(container_cpu_usage_seconds_total{container!="",image!="",container!="POD",namespace="%s",pod=~"^%s-.*$"}[%s]))` + WorkloadCpuUsageExprTemplate = `sum(irate(container_cpu_usage_seconds_total{container!="",image!="",container!="POD",namespace="%s",pod=~"^%s-.*$",%s}[%s]))` // WorkloadMemUsageExprTemplate is used to query workload mem usage by promql, param is namespace, workload-name - WorkloadMemUsageExprTemplate = `sum(container_memory_working_set_bytes{container!="",image!="",container!="POD",namespace="%s",pod=~"^%s-.*$"})` + WorkloadMemUsageExprTemplate = `sum(container_memory_working_set_bytes{container!="",image!="",container!="POD",namespace="%s",pod=~"^%s-.*$",%s})` // following is node exporter metric for node cpu/memory usage // NodeCpuUsageExprTemplate is used to query node cpu usage by promql, param is node name which prometheus scrape, duration str - NodeCpuUsageExprTemplate = `sum(count(node_cpu_seconds_total{mode="idle",instance=~"(%s)(:\\d+)?"}) by (mode, cpu)) - sum(irate(node_cpu_seconds_total{mode="idle",instance=~"(%s)(:\\d+)?"}[%s]))` + NodeCpuUsageExprTemplate = `sum(count(node_cpu_seconds_total{mode="idle",instance=~"(%s)(:\\d+)?",%s}) by (mode, cpu)) - sum(irate(node_cpu_seconds_total{mode="idle",instance=~"(%s)(:\\d+)?",%s}[%s]))` // NodeMemUsageExprTemplate is used to query node cpu memory by promql, param is node name, node name which prometheus scrape - NodeMemUsageExprTemplate = `sum(node_memory_MemTotal_bytes{instance=~"(%s)(:\\d+)?"} - node_memory_MemAvailable_bytes{instance=~"(%s)(:\\d+)?"})` + NodeMemUsageExprTemplate = `sum(node_memory_MemTotal_bytes{instance=~"(%s)(:\\d+)?",%s} - node_memory_MemAvailable_bytes{instance=~"(%s)(:\\d+)?",%s})` // PodCpuUsageExprTemplate is used to query pod cpu usage by promql, param is namespace,pod, duration str - PodCpuUsageExprTemplate = `sum(irate(container_cpu_usage_seconds_total{container!="POD",namespace="%s",pod="%s"}[%s]))` + PodCpuUsageExprTemplate = `sum(irate(container_cpu_usage_seconds_total{container!="POD",namespace="%s",pod="%s",%s}[%s]))` // PodMemUsageExprTemplate is used to query pod cpu usage by promql, param is namespace,pod - PodMemUsageExprTemplate = `sum(container_memory_working_set_bytes{container!="POD",namespace="%s",pod="%s"})` + PodMemUsageExprTemplate = `sum(container_memory_working_set_bytes{container!="POD",namespace="%s",pod="%s",%s})` // ContainerCpuUsageExprTemplate is used to query container cpu usage by promql, param is namespace,pod,container duration str - ContainerCpuUsageExprTemplate = `irate(container_cpu_usage_seconds_total{container!="POD",namespace="%s",pod=~"^%s.*$",container="%s"}[%s])` + ContainerCpuUsageExprTemplate = `irate(container_cpu_usage_seconds_total{container!="POD",namespace="%s",pod=~"^%s.*$",container="%s",%s}[%s])` // ContainerMemUsageExprTemplate is used to query container cpu usage by promql, param is namespace,pod,container - ContainerMemUsageExprTemplate = `container_memory_working_set_bytes{container!="POD",namespace="%s",pod=~"^%s.*$",container="%s"}` + ContainerMemUsageExprTemplate = `container_memory_working_set_bytes{container!="POD",namespace="%s",pod=~"^%s.*$",container="%s",%s}` ) var supportedResources = sets.NewString(v1.ResourceCPU.String(), v1.ResourceMemory.String()) @@ -50,16 +50,16 @@ func NewPromQueryBuilder(metric *metricquery.Metric) querybuilder.Builder { } } -func (b *builder) BuildQuery() (*metricquery.Query, error) { +func (b *builder) BuildQuery(behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) { switch b.metric.Type { case metricquery.WorkloadMetricType: - return b.workloadQuery(b.metric) + return b.workloadQuery(b.metric, behavior) case metricquery.PodMetricType: - return b.podQuery(b.metric) + return b.podQuery(b.metric, behavior) case metricquery.ContainerMetricType: - return b.containerQuery(b.metric) + return b.containerQuery(b.metric, behavior) case metricquery.NodeMetricType: - return b.nodeQuery(b.metric) + return b.nodeQuery(b.metric, behavior) case metricquery.PromQLMetricType: return b.promQuery(b.metric) default: @@ -67,72 +67,92 @@ func (b *builder) BuildQuery() (*metricquery.Query, error) { } } -func (b *builder) workloadQuery(metric *metricquery.Metric) (*metricquery.Query, error) { +func BuildClusterQueryCondition(clusterLabelName, clusterLabelValue string) string { + return fmt.Sprintf(`%s="%s"`, clusterLabelName, clusterLabelValue) +} + +func (b *builder) workloadQuery(metric *metricquery.Metric, behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) { if metric.Workload == nil { return nil, fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", metric.Type) } + clusterCond := "" + if behavior.FederatedClusterScope { + clusterCond = BuildClusterQueryCondition(behavior.ClusterLabelName, behavior.ClusterLabelValue) + } switch strings.ToLower(metric.MetricName) { case v1.ResourceCPU.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(WorkloadCpuUsageExprTemplate, metric.Workload.Namespace, metric.Workload.Name, "3m"), + Query: fmt.Sprintf(WorkloadCpuUsageExprTemplate, metric.Workload.Namespace, metric.Workload.Name, clusterCond, "3m"), }), nil case v1.ResourceMemory.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(WorkloadMemUsageExprTemplate, metric.Workload.Namespace, metric.Workload.Name), + Query: fmt.Sprintf(WorkloadMemUsageExprTemplate, metric.Workload.Namespace, metric.Workload.Name, clusterCond), }), nil default: return nil, fmt.Errorf("metric type %v do not support resource metric %v. only support %v now", metric.Type, metric.MetricName, supportedResources.List()) } } -func (b *builder) containerQuery(metric *metricquery.Metric) (*metricquery.Query, error) { +func (b *builder) containerQuery(metric *metricquery.Metric, behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) { if metric.Container == nil { return nil, fmt.Errorf("metric type %v, but no ContainerNamerInfo provided", metric.Type) } + clusterCond := "" + if behavior.FederatedClusterScope { + clusterCond = BuildClusterQueryCondition(behavior.ClusterLabelName, behavior.ClusterLabelValue) + } switch strings.ToLower(metric.MetricName) { case v1.ResourceCPU.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(ContainerCpuUsageExprTemplate, metric.Container.Namespace, metric.Container.WorkloadName, metric.Container.Name, "3m"), + Query: fmt.Sprintf(ContainerCpuUsageExprTemplate, metric.Container.Namespace, metric.Container.WorkloadName, metric.Container.Name, clusterCond, "3m"), }), nil case v1.ResourceMemory.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(ContainerMemUsageExprTemplate, metric.Container.Namespace, metric.Container.WorkloadName, metric.Container.Name), + Query: fmt.Sprintf(ContainerMemUsageExprTemplate, metric.Container.Namespace, metric.Container.WorkloadName, metric.Container.Name, clusterCond), }), nil default: return nil, fmt.Errorf("metric type %v do not support resource metric %v. only support %v now", metric.Type, metric.MetricName, supportedResources.List()) } } -func (b *builder) podQuery(metric *metricquery.Metric) (*metricquery.Query, error) { +func (b *builder) podQuery(metric *metricquery.Metric, behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) { if metric.Pod == nil { return nil, fmt.Errorf("metric type %v, but no PodNamerInfo provided", metric.Type) } + clusterCond := "" + if behavior.FederatedClusterScope { + clusterCond = BuildClusterQueryCondition(behavior.ClusterLabelName, behavior.ClusterLabelValue) + } switch strings.ToLower(metric.MetricName) { case v1.ResourceCPU.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(PodCpuUsageExprTemplate, metric.Pod.Namespace, metric.Pod.Name, "3m"), + Query: fmt.Sprintf(PodCpuUsageExprTemplate, metric.Pod.Namespace, metric.Pod.Name, clusterCond, "3m"), }), nil case v1.ResourceMemory.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(PodMemUsageExprTemplate, metric.Pod.Namespace, metric.Pod.Name), + Query: fmt.Sprintf(PodMemUsageExprTemplate, metric.Pod.Namespace, metric.Pod.Name, clusterCond), }), nil default: return nil, fmt.Errorf("metric type %v do not support resource metric %v. only support %v now", metric.Type, metric.MetricName, supportedResources.List()) } } -func (b *builder) nodeQuery(metric *metricquery.Metric) (*metricquery.Query, error) { +func (b *builder) nodeQuery(metric *metricquery.Metric, behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) { if metric.Node == nil { return nil, fmt.Errorf("metric type %v, but no NodeNamerInfo provided", metric.Type) } + clusterCond := "" + if behavior.FederatedClusterScope { + clusterCond = BuildClusterQueryCondition(behavior.ClusterLabelName, behavior.ClusterLabelValue) + } switch strings.ToLower(metric.MetricName) { case v1.ResourceCPU.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(NodeCpuUsageExprTemplate, metric.Node.Name, metric.Node.Name, "3m"), + Query: fmt.Sprintf(NodeCpuUsageExprTemplate, metric.Node.Name, metric.Node.Name, clusterCond, clusterCond, "3m"), }), nil case v1.ResourceMemory.String(): return promQuery(&metricquery.PrometheusQuery{ - Query: fmt.Sprintf(NodeMemUsageExprTemplate, metric.Node.Name, metric.Node.Name), + Query: fmt.Sprintf(NodeMemUsageExprTemplate, metric.Node.Name, metric.Node.Name, clusterCond, clusterCond), }), nil default: return nil, fmt.Errorf("metric type %v do not support resource metric %v. only support %v now", metric.Type, metric.MetricName, supportedResources.List()) diff --git a/pkg/querybuilder-providers/prometheus/builder_test.go b/pkg/querybuilder-providers/prometheus/builder_test.go index e44e8b700..b87c05559 100644 --- a/pkg/querybuilder-providers/prometheus/builder_test.go +++ b/pkg/querybuilder-providers/prometheus/builder_test.go @@ -8,6 +8,7 @@ import ( v1 "k8s.io/api/core/v1" "github.com/gocrane/crane/pkg/metricquery" + "github.com/gocrane/crane/pkg/querybuilder" ) func TestNewPromQueryBuilder(t *testing.T) { @@ -21,7 +22,7 @@ func TestNewPromQueryBuilder(t *testing.T) { }, } builder := NewPromQueryBuilder(metric) - _, err := builder.BuildQuery() + _, err := builder.BuildQuery(querybuilder.BuildQueryBehavior{}) if err != nil { t.Log(err) } @@ -46,7 +47,7 @@ func TestBuildQuery(t *testing.T) { APIVersion: "v1", }, }, - want: fmt.Sprintf(WorkloadCpuUsageExprTemplate, "default", "test", "3m"), + want: fmt.Sprintf(WorkloadCpuUsageExprTemplate, "default", "test", "", "3m"), }, { desc: "tc2-workload-mem", @@ -60,7 +61,7 @@ func TestBuildQuery(t *testing.T) { APIVersion: "v1", }, }, - want: fmt.Sprintf(WorkloadMemUsageExprTemplate, "default", "test"), + want: fmt.Sprintf(WorkloadMemUsageExprTemplate, "default", "test", ""), }, { desc: "tc3-container-cpu", @@ -73,7 +74,7 @@ func TestBuildQuery(t *testing.T) { Name: "container", }, }, - want: fmt.Sprintf(ContainerCpuUsageExprTemplate, "default", "workload", "container", "3m"), + want: fmt.Sprintf(ContainerCpuUsageExprTemplate, "default", "workload", "container", "", "3m"), }, { desc: "tc4-container-mem", @@ -86,7 +87,7 @@ func TestBuildQuery(t *testing.T) { Name: "container", }, }, - want: fmt.Sprintf(ContainerMemUsageExprTemplate, "default", "workload", "container"), + want: fmt.Sprintf(ContainerMemUsageExprTemplate, "default", "workload", "container", ""), }, { desc: "tc5-node-cpu", @@ -97,7 +98,7 @@ func TestBuildQuery(t *testing.T) { Name: "test", }, }, - want: fmt.Sprintf(NodeCpuUsageExprTemplate, "test", "test", "3m"), + want: fmt.Sprintf(NodeCpuUsageExprTemplate, "test", "test", "", "", "3m"), }, { desc: "tc6-node-mem", @@ -108,7 +109,7 @@ func TestBuildQuery(t *testing.T) { Name: "test", }, }, - want: fmt.Sprintf(NodeMemUsageExprTemplate, "test", "test"), + want: fmt.Sprintf(NodeMemUsageExprTemplate, "test", "test", "", ""), }, { desc: "tc7-pod-cpu", @@ -120,7 +121,7 @@ func TestBuildQuery(t *testing.T) { Name: "test", }, }, - want: fmt.Sprintf(PodCpuUsageExprTemplate, "default", "test", "3m"), + want: fmt.Sprintf(PodCpuUsageExprTemplate, "default", "test", "", "3m"), }, { desc: "tc8-pod-mem", @@ -132,7 +133,7 @@ func TestBuildQuery(t *testing.T) { Name: "test", }, }, - want: fmt.Sprintf(PodMemUsageExprTemplate, "default", "test"), + want: fmt.Sprintf(PodMemUsageExprTemplate, "default", "test", ""), }, { desc: "tc9-prom", @@ -149,7 +150,7 @@ func TestBuildQuery(t *testing.T) { for _, tc := range testCases { builder := NewPromQueryBuilder(tc.metric) - query, err := builder.BuildQuery() + query, err := builder.BuildQuery(querybuilder.BuildQueryBehavior{}) if !reflect.DeepEqual(err, tc.err) { t.Fatalf("tc %v failed, got error: %v, want error: %v", tc.desc, err, tc.err) } diff --git a/pkg/querybuilder/query.go b/pkg/querybuilder/query.go index 2bca48246..4ba862afd 100644 --- a/pkg/querybuilder/query.go +++ b/pkg/querybuilder/query.go @@ -6,9 +6,19 @@ import ( "github.com/gocrane/crane/pkg/metricquery" ) +type BuildQueryBehavior struct { + // FederatedClusterScope means this query data source supports multiple clusters data query. + // false means do not need use cluster as query param. + // true means the data source maybe has multiple clusters, so must require cluster param. it will inject cluster param to the query when build query + FederatedClusterScope bool + // used to distiguish clusters. such as clusterid=cls-xxx + ClusterLabelName string + ClusterLabelValue string +} + // Builder is an interface which is used to build query for different data sources according a context info about the query. type Builder interface { - BuildQuery() (*metricquery.Query, error) + BuildQuery(behavior BuildQueryBehavior) (*metricquery.Query, error) } // QueryBuilder is an Builder factory to make Builders