Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for federated cluster scope datasource, such as one prometheu… #402

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/craned/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.DurationVar(&o.DataSourcePromConfig.Timeout, "prometheus-timeout", 3*time.Minute, "prometheus timeout")
flags.BoolVar(&o.DataSourcePromConfig.BRateLimit, "prometheus-bratelimit", false, "prometheus bratelimit")
flags.IntVar(&o.DataSourcePromConfig.MaxPointsLimitPerTimeSeries, "prometheus-maxpoints", 11000, "prometheus max points limit per time series")
flags.BoolVar(&o.DataSourcePromConfig.FederatedClusterScope, "prometheus-federated-cluster-scope", false, "prometheus support federated clusters query")
flags.BoolVar(&o.DataSourcePromConfig.ThanosPartial, "prometheus-thanos-partial", false, "prometheus api to query thanos data source, hacking way, denote the thanos partial response query")
flags.BoolVar(&o.DataSourcePromConfig.ThanosDedup, "prometheus-thanos-dedup", false, "prometheus api to query thanos data source, hacking way, denote the thanos deduplicate query")
flags.StringVar(&o.DataSourcePromConfig.ClusterLabelName, "prometheus-cluster-label-name", "", "prometheus data query to distinguish the cluster label name for federated clusters datasource query")
flags.StringVar(&o.DataSourcePromConfig.ClusterLabelValue, "prometheus-cluster-label-value", "", "prometheus data query to distinguish the cluster label value for federated clusters datasource query")
flags.StringVar(&o.DataSourceMockConfig.SeedFile, "seed-file", "", "mock provider seed file")
flags.StringVar(&o.DataSourceGrpcConfig.Address, "grpc-ds-address", "localhost:50051", "grpc data source server address")
flags.DurationVar(&o.DataSourceGrpcConfig.Timeout, "grpc-ds-timeout", time.Minute, "grpc timeout")
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/protobuf v1.27.1
)

replace (
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,6 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ=
github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4 h1:vGDg3G6y661KAlhjf/8/r8JCjaIi6aV8szCP+MZRU3Y=
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac h1:lBKVVOA4del0Plj80PCE+nglxaJxaXanCv5N6a3laVY=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.5.1-0.20220706040335-eaadbb4b99ed h1:aARCU+Hs1ZKTqJFJT/4/or/iGR6qYwMcG99CGmBFJpg=
github.com/gocrane/api v0.5.1-0.20220706040335-eaadbb4b99ed/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
6 changes: 6 additions & 0 deletions pkg/providers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ type PromConfig struct {
QueryConcurrency int
BRateLimit bool
MaxPointsLimitPerTimeSeries int
FederatedClusterScope bool
// for thanos query, it must when use thanos as query source https://thanos.io/tip/components/query.md/#partial-response
ThanosPartial bool
ThanosDedup bool
ClusterLabelName string
ClusterLabelValue string
}

// ClientAuth holds the HTTP client identity info.
Expand Down
7 changes: 4 additions & 3 deletions pkg/providers/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gocrane/crane/pkg/metricquery"
"github.com/gocrane/crane/pkg/providers"
"github.com/gocrane/crane/pkg/providers/grpc/pb"
"github.com/gocrane/crane/pkg/querybuilder"
)

var _ providers.Interface = &grpcClient{}
Expand All @@ -30,7 +31,7 @@ type grpcClient struct {
}

func (g *grpcClient) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Time, endTime time.Time, step time.Duration) ([]*common.TimeSeries, error) {
m, err := grpcMetric(namer)
m, err := grpcMetric(namer, querybuilder.BuildQueryBehavior{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -84,8 +85,8 @@ func commonTimeSeriesList(tsList []*pb.TimeSeries) []*common.TimeSeries {
return res
}

func grpcMetric(namer metricnaming.MetricNamer) (*pb.Metric, error) {
q, err := namer.QueryBuilder().Builder(metricquery.GrpcMetricSource).BuildQuery()
func grpcMetric(namer metricnaming.MetricNamer, behavior querybuilder.BuildQueryBehavior) (*pb.Metric, error) {
q, err := namer.QueryBuilder().Builder(metricquery.GrpcMetricSource).BuildQuery(behavior)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/providers/metricserver/metricserver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metricserver

import (
"github.com/gocrane/crane/pkg/querybuilder"
cacheddiscovery "k8s.io/client-go/discovery/cached"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -27,7 +28,7 @@ type metricsServer struct {

func (m *metricsServer) QueryLatestTimeSeries(metricNamer metricnaming.MetricNamer) ([]*common.TimeSeries, error) {
msBuilder := metricNamer.QueryBuilder().Builder(metricquery.MetricServerMetricSource)
msQuery, err := msBuilder.BuildQuery()
msQuery, err := msBuilder.BuildQuery(querybuilder.BuildQueryBehavior{})
if err != nil {
klog.Errorf("Failed to QueryLatestTimeSeries metricNamer %v, err: %v", metricNamer.BuildUniqueKey(), err)
return nil, err
Expand Down
125 changes: 113 additions & 12 deletions pkg/providers/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@ import (
gocontext "context"
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"

prometheus "github.com/prometheus/client_golang/api"
"k8s.io/klog/v2"

"github.com/gocrane/crane/pkg/providers"
datasource "github.com/gocrane/crane/pkg/providers"
)

type ThanosConfig struct {
Partial bool
Dedup bool
}

// NewPrometheusClient returns a prometheus.Client
func NewPrometheusClient(config *providers.PromConfig) (prometheus.Client, error) {
func NewPrometheusClient(config *datasource.PromConfig) (prometheus.Client, error) {

tlsConfig := &tls.Config{InsecureSkipVerify: config.InsecureSkipVerify}

Expand All @@ -34,26 +41,29 @@ func NewPrometheusClient(config *providers.PromConfig) (prometheus.Client, error
},
}
if config.BRateLimit {
return newPrometheusRateLimitClient(PrometheusClientID, pc, &config.Auth, config.QueryConcurrency)
return newPrometheusRateLimitClient(PrometheusClientID, pc, &config.Auth, config.QueryConcurrency, &ThanosConfig{Dedup: config.ThanosDedup, Partial: config.ThanosPartial})
}
return newPrometheusAuthClient(PrometheusClientID, pc, &config.Auth)
return newPrometheusAuthClient(PrometheusClientID, pc, &config.Auth, &ThanosConfig{Dedup: config.ThanosDedup, Partial: config.ThanosPartial})

}

// prometheusAuthClient wraps the prometheus api raw client with authentication info
type prometheusAuthClient struct {
id string
auth *providers.ClientAuth
auth *datasource.ClientAuth
client prometheus.Client
// hack here, later maybe a datasource for thanos
thanos *ThanosConfig
}

func newPrometheusAuthClient(id string, config prometheus.Config, auth *providers.ClientAuth) (prometheus.Client, error) {
func newPrometheusAuthClient(id string, config prometheus.Config, auth *datasource.ClientAuth, thanos *ThanosConfig) (prometheus.Client, error) {
c, err := prometheus.NewClient(config)
if err != nil {
return nil, err
}

client := &prometheusAuthClient{
thanos: thanos,
id: id,
client: c,
auth: auth,
Expand All @@ -69,22 +79,68 @@ func (pc *prometheusAuthClient) URL(ep string, args map[string]string) *url.URL

// Do implements prometheus client interface, wrapped with an auth info
func (pc *prometheusAuthClient) Do(ctx gocontext.Context, req *http.Request) (*http.Response, []byte, error) {
pc.auth.Apply(req)
return pc.client.Do(ctx, req)
newReq := req
// hacking for thanos here, intercept the request and modify param
if pc.thanos != nil && (pc.thanos.Dedup || pc.thanos.Partial) {
var q url.Values
var err error
var bodyData []byte
if req.Method == http.MethodPost {
bodyReader, err := req.GetBody()
if err != nil {
return nil, nil, err
}
bodyData, err = ioutil.ReadAll(bodyReader)
if err != nil {
return nil, nil, err
}
q, err = url.ParseQuery(string(bodyData))
if err != nil {
return nil, nil, err
}
} else if req.Method == http.MethodGet {
q = req.URL.Query()
}

if pc.thanos.Partial {
q.Set("partial_response", "true")
}
if pc.thanos.Dedup {
q.Set("dedup", "true")
}

klog.V(6).InfoS("Hacking thanos", "originalQueryBody", string(bodyData), "newQuery", q.Encode())

if req.Method == http.MethodGet {
req.URL.RawQuery = q.Encode()
} else if req.Method == http.MethodPost {
newReq, err = http.NewRequest(req.Method, req.URL.String(), strings.NewReader(q.Encode()))
if err != nil {
return nil, nil, err
}
newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}
}

pc.auth.Apply(newReq)
klog.V(6).InfoS("Query url", "url", newReq.URL, "newReq", newReq, "oldReq", req)
return pc.client.Do(ctx, newReq)
}

type prometheusRateLimitClient struct {
id string
auth *providers.ClientAuth
auth *datasource.ClientAuth
client prometheus.Client
// hack here, later maybe a datasource for thanos
thanos *ThanosConfig

lock *sync.Mutex
cond *sync.Cond
maxInFlight int
currentInFlight int
}

func newPrometheusRateLimitClient(id string, config prometheus.Config, auth *providers.ClientAuth, maxInFlight int) (prometheus.Client, error) {
func newPrometheusRateLimitClient(id string, config prometheus.Config, auth *datasource.ClientAuth, maxInFlight int, thanos *ThanosConfig) (prometheus.Client, error) {
c, err := prometheus.NewClient(config)
if err != nil {
return nil, err
Expand All @@ -98,6 +154,7 @@ func newPrometheusRateLimitClient(id string, config prometheus.Config, auth *pro
maxInFlight: maxInFlight,
lock: lock,
cond: sync.NewCond(lock),
thanos: thanos,
}

return client, nil
Expand All @@ -110,14 +167,58 @@ func (pc *prometheusRateLimitClient) URL(ep string, args map[string]string) *url

// Do implements prometheus client interface, wrapped with an auth info
func (pc *prometheusRateLimitClient) Do(ctx gocontext.Context, req *http.Request) (*http.Response, []byte, error) {
pc.auth.Apply(req)
newReq := req
// hacking for thanos here, intercept the request and modify param
if pc.thanos != nil && (pc.thanos.Dedup || pc.thanos.Partial) {
var q url.Values
var err error
var bodyData []byte
if req.Method == http.MethodPost {
bodyReader, err := req.GetBody()
if err != nil {
return nil, nil, err
}
bodyData, err = ioutil.ReadAll(bodyReader)
if err != nil {
return nil, nil, err
}
q, err = url.ParseQuery(string(bodyData))
if err != nil {
return nil, nil, err
}
} else if req.Method == http.MethodGet {
q = req.URL.Query()
}

if pc.thanos.Partial {
q.Set("partial_response", "true")
}
if pc.thanos.Dedup {
q.Set("dedup", "true")
}

klog.V(6).InfoS("Hacking thanos", "originalQueryBody", string(bodyData), "newQuery", q.Encode())

if req.Method == http.MethodGet {
req.URL.RawQuery = q.Encode()
} else if req.Method == http.MethodPost {
newReq, err = http.NewRequest(req.Method, req.URL.String(), strings.NewReader(q.Encode()))
if err != nil {
return nil, nil, err
}
newReq.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}
}

pc.auth.Apply(newReq)
klog.V(6).InfoS("Query url", "url", newReq.URL, "newReq", newReq, "oldReq", req)
klog.V(4).InfoS("Prometheus rate limit", "ratelimit", pc.Runtime())
// block wait until at least one InFlight request finished if current inflighting requests reach the max limit, avoid many time consuming requests hit the prometheus.
// we use inflight to record the number of inflighting requests, because prometheus query is time-consuming when the range is large
// Caller will be blocked by lock if it reached maxFlight. so caller should set timeout for context and cancel it.
pc.increaseInFightWait()
defer pc.decreaseInFlightSignal()
return pc.client.Do(ctx, req)
return pc.client.Do(ctx, newReq)
}

func (mfc *prometheusRateLimitClient) increaseInFightWait() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/providers/prom/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gocrane/crane/pkg/metricnaming"
"github.com/gocrane/crane/pkg/metricquery"
"github.com/gocrane/crane/pkg/providers"
"github.com/gocrane/crane/pkg/querybuilder"
)

type prom struct {
Expand All @@ -32,7 +33,7 @@ func NewProvider(config *providers.PromConfig) (providers.Interface, error) {

func (p *prom) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Time, endTime time.Time, step time.Duration) ([]*common.TimeSeries, error) {
promBuilder := namer.QueryBuilder().Builder(metricquery.PrometheusMetricSource)
promQuery, err := promBuilder.BuildQuery()
promQuery, err := promBuilder.BuildQuery(querybuilder.BuildQueryBehavior{FederatedClusterScope: p.config.FederatedClusterScope, ClusterLabelName: p.config.ClusterLabelName, ClusterLabelValue: p.config.ClusterLabelValue})
if err != nil {
klog.Errorf("Failed to BuildQuery: %v", err)
return nil, err
Expand All @@ -50,7 +51,7 @@ func (p *prom) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Ti

func (p *prom) QueryLatestTimeSeries(namer metricnaming.MetricNamer) ([]*common.TimeSeries, error) {
promBuilder := namer.QueryBuilder().Builder(metricquery.PrometheusMetricSource)
promQuery, err := promBuilder.BuildQuery()
promQuery, err := promBuilder.BuildQuery(querybuilder.BuildQueryBehavior{FederatedClusterScope: p.config.FederatedClusterScope, ClusterLabelName: p.config.ClusterLabelName, ClusterLabelValue: p.config.ClusterLabelValue})
if err != nil {
klog.Errorf("Failed to BuildQuery: %v", err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/querybuilder-providers/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewQueryBuilder(metric *metricquery.Metric) querybuilder.Builder {
}
}

func (b builder) BuildQuery() (*metricquery.Query, error) {
func (b builder) BuildQuery(behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) {
return gRPCQuery(&metricquery.GenericQuery{Metric: b.metric}), nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querybuilder-providers/metricserver/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewMetricServerQueryBuilder(metric *metricquery.Metric) querybuilder.Builde
}
}

func (b builder) BuildQuery() (*metricquery.Query, error) {
func (b builder) BuildQuery(behavior querybuilder.BuildQueryBehavior) (*metricquery.Query, error) {
return metricServerQuery(&metricquery.GenericQuery{Metric: b.metric}), nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/querybuilder-providers/metricserver/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/gocrane/crane/pkg/metricquery"
"github.com/gocrane/crane/pkg/querybuilder"
)

func TestNewPromQueryBuilder(t *testing.T) {
Expand All @@ -20,7 +21,7 @@ func TestNewPromQueryBuilder(t *testing.T) {
},
}
builder := NewMetricServerQueryBuilder(metric)
_, err := builder.BuildQuery()
_, err := builder.BuildQuery(querybuilder.BuildQueryBehavior{})
if err != nil {
t.Log(err)
}
Expand Down Expand Up @@ -232,7 +233,7 @@ func TestBuildQuery(t *testing.T) {

for _, tc := range testCases {
builder := NewMetricServerQueryBuilder(tc.metric)
query, err := builder.BuildQuery()
query, err := builder.BuildQuery(querybuilder.BuildQueryBehavior{})
if !reflect.DeepEqual(err, tc.err) {
t.Fatalf("tc %v failed, got error: %v, want error: %v", tc.desc, err, tc.err)
}
Expand Down
Loading