Skip to content

Commit

Permalink
Instant query functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Sevilla <rsevilla@redhat.com>
  • Loading branch information
rsevilla87 committed Nov 14, 2020
1 parent 9b401dd commit 72ab840
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 34 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ metrics:
metricName: nodeCPU
```


It's also possible to execute instant queries from kube-burner by adding the flag instant to the desired metric. These kind of queries are useful to get only one sample for a static metric such as the number of nodes or the kube-apiserver version.

```yaml
metrics:
- query: kube_node_role
metricName: nodeRoles
instant: true
```

### Job Summary

In case indexing is enabled, at the end of each job, a document holding the job summary is indexed. This is useful to identify the parameters the job was executed with:
Expand Down Expand Up @@ -331,7 +341,6 @@ This document looks like:
"podWait": false,
"waitWhenFinished": true,
"cleanup": true,
"namespaced": false,
"namespacedIterations": false,
"verifyObjects": true,
"errorOnVerify": false
Expand Down
12 changes: 6 additions & 6 deletions cmd/kube-burner.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func indexCmd() *cobra.Command {
}
var indexer *indexers.Indexer
if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled {
indexer = indexers.NewIndexer(config.ConfigSpec.GlobalConfig.IndexerConfig)
indexer = indexers.NewIndexer()
} else {
log.Fatal("Indexing is disabled in the configuration")
}
Expand All @@ -146,7 +146,7 @@ func indexCmd() *cobra.Command {
startTime := time.Unix(start, 0)
endTime := time.Unix(end, 0)
log.Infof("Indexing metrics with UUID %s", uuid)
if err := p.ScrapeMetrics(startTime, endTime, config.ConfigSpec, indexer); err != nil {
if err := p.ScrapeMetrics(startTime, endTime, indexer); err != nil {
log.Error(err)
}
},
Expand Down Expand Up @@ -207,15 +207,15 @@ func steps(uuid string, p *prometheus.Prometheus, prometheusStep time.Duration)
var rc int
var indexer *indexers.Indexer
if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled {
indexer = indexers.NewIndexer(config.ConfigSpec.GlobalConfig.IndexerConfig)
indexer = indexers.NewIndexer()
}
for _, job := range burner.NewExecutorList(uuid) {
// Run execution
switch job.Config.JobType {
case config.CreationJob:
job.Cleanup()
measurements.NewMeasurementFactory(burner.RestConfig, config.ConfigSpec.GlobalConfig, job.Config, uuid, indexer)
measurements.Register(config.ConfigSpec.GlobalConfig.Measurements)
measurements.NewMeasurementFactory(burner.RestConfig, job.Config, uuid, indexer)
measurements.Register()
measurements.Start()
job.RunCreateJob()
if job.Config.VerifyObjects {
Expand Down Expand Up @@ -249,7 +249,7 @@ func steps(uuid string, p *prometheus.Prometheus, prometheusStep time.Duration)
if p != nil {
log.Infof("Waiting %v extra before scraping prometheus metrics", prometheusStep*4)
time.Sleep(prometheusStep * 4)
if err := p.ScrapeMetrics(start, time.Now().UTC(), config.ConfigSpec, indexer); err != nil {
if err := p.ScrapeMetrics(start, time.Now().UTC(), indexer); err != nil {
log.Error(err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/indexers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type Indexer interface {
var indexerMap = make(map[string]Indexer)

// NewIndexer creates a new Indexer with the specified IndexerConfig
func NewIndexer(cfg config.IndexerConfig) *Indexer {
func NewIndexer() *Indexer {
var indexer Indexer
var exists bool
cfg := config.ConfigSpec.GlobalConfig.IndexerConfig
if indexer, exists = indexerMap[cfg.Type]; exists {
log.Infof("📁 Creating indexer: %s", cfg.Type)
indexer.new(cfg)
Expand Down
10 changes: 5 additions & 5 deletions pkg/measurements/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ var factory measurementFactory
var measurementMap = make(map[string]measurement)

// NewMeasurementFactory initializes the measurement facture
func NewMeasurementFactory(restConfig *rest.Config, globalConfig config.GlobalConfig, config config.Job, uuid string, indexer *indexers.Indexer) {
func NewMeasurementFactory(restConfig *rest.Config, jobConfig config.Job, uuid string, indexer *indexers.Indexer) {
log.Info("📈 Creating measurement factory")
clientSet := kubernetes.NewForConfigOrDie(restConfig)
factory = measurementFactory{
globalConfig: globalConfig,
config: config,
globalConfig: config.ConfigSpec.GlobalConfig,
config: jobConfig,
clientSet: clientSet,
restConfig: restConfig,
createFuncs: make(map[string]measurement),
Expand All @@ -67,8 +67,8 @@ func (mf *measurementFactory) register(measure config.Measurement, measurementFu
}

// Register registers the given list of measurements
func Register(measurementList []config.Measurement) {
for _, measurement := range measurementList {
func Register() {
for _, measurement := range config.ConfigSpec.GlobalConfig.Measurements {
if measurementFunc, exists := measurementMap[measurement.Name]; exists {
factory.register(measurement, measurementFunc)
} else {
Expand Down
86 changes: 65 additions & 21 deletions pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type metricDefinition struct {
Query string `yaml:"query"`
MetricName string `yaml:"metricName"`
IndexName string `yaml:"indexName"`
Instant bool `yaml:"instant"`
}

// MetricsProfile describes what metrics kube-burner collects
Expand All @@ -69,8 +70,8 @@ type metric struct {
Value float64 `json:"value"`
UUID string `json:"uuid"`
Query string `json:"query"`
MetricName string `json:"metricName"`
JobName string `json:"jobName"`
MetricName string `json:"metricName,omitempty"`
JobName string `json:"jobName,omitempty"`
}

func (bat authTransport) RoundTrip(req *http.Request) (*http.Response, error) {
Expand Down Expand Up @@ -138,28 +139,45 @@ func (p *Prometheus) readProfile(metricsFile string) error {
}

// ScrapeMetrics gets all prometheus metrics required and handles them
func (p *Prometheus) ScrapeMetrics(start, end time.Time, cfg config.Spec, indexer *indexers.Indexer) error {
var filename string
func (p *Prometheus) ScrapeMetrics(start, end time.Time, indexer *indexers.Indexer) error {
var filename, jobName string
var err error
var v model.Value
r := apiv1.Range{Start: start, End: end, Step: p.step}
log.Infof("🔍 Scraping prometheus metrics from %s to %s", start, end)
for _, md := range p.metricsProfile.Metrics {
var metrics []interface{}
log.Infof("Querying %s", md.Query)
v, _, err := p.api.QueryRange(context.TODO(), md.Query, r)
if err != nil {
return prometheusError(err)
// IndexCMD can work w/o specifying any job
if len(config.ConfigSpec.Jobs) > 0 {
jobName = config.ConfigSpec.Jobs[0].Name
}
if err := p.parseResponse(md.MetricName, md.Query, v, &metrics); err != nil {
return err
if md.Instant {
log.Infof("Instant query: %s", md.Query)
v, _, err = p.api.Query(context.TODO(), md.Query, time.Now().UTC())
if err != nil {
return prometheusError(err)
}
if err := p.parseVector(md.MetricName, md.Query, jobName, v, &metrics); err != nil {
return err
}
} else {
log.Infof("Range query: %s", md.Query)
v, _, err = p.api.QueryRange(context.TODO(), md.Query, r)
if err != nil {
return prometheusError(err)
}
if err := p.parseMatrix(md.MetricName, md.Query, jobName, v, &metrics); err != nil {
return err
}
}
if cfg.GlobalConfig.WriteToFile {
if config.ConfigSpec.GlobalConfig.WriteToFile {
filename = fmt.Sprintf("%s.json", md.MetricName)
if cfg.GlobalConfig.MetricsDirectory != "" {
err = os.MkdirAll(cfg.GlobalConfig.MetricsDirectory, 0744)
if config.ConfigSpec.GlobalConfig.MetricsDirectory != "" {
err = os.MkdirAll(config.ConfigSpec.GlobalConfig.MetricsDirectory, 0744)
if err != nil {
return fmt.Errorf("Error creating metrics directory %s: ", err)
}
filename = path.Join(cfg.GlobalConfig.MetricsDirectory, filename)
filename = path.Join(config.ConfigSpec.GlobalConfig.MetricsDirectory, filename)
}
log.Infof("Writing to: %s", filename)
f, err := os.Create(filename)
Expand All @@ -174,8 +192,8 @@ func (p *Prometheus) ScrapeMetrics(start, end time.Time, cfg config.Spec, indexe
}
f.Close()
}
if cfg.GlobalConfig.IndexerConfig.Enabled {
indexName := cfg.GlobalConfig.IndexerConfig.DefaultIndex
if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled {
indexName := config.ConfigSpec.GlobalConfig.IndexerConfig.DefaultIndex
if md.IndexName != "" {
indexName = strings.ToLower(md.IndexName)
}
Expand All @@ -185,14 +203,40 @@ func (p *Prometheus) ScrapeMetrics(start, end time.Time, cfg config.Spec, indexe
return nil
}

func (p *Prometheus) parseResponse(metricName, query string, value model.Value, metrics *[]interface{}) error {
var jobName string
data, ok := value.(model.Matrix)
func (p *Prometheus) parseVector(metricName, query, jobName string, value model.Value, metrics *[]interface{}) error {
data, ok := value.(model.Vector)
if !ok {
return prometheusError(fmt.Errorf("Unsupported result format: %s", value.Type().String()))
}
if len(config.ConfigSpec.Jobs) > 0 {
jobName = config.ConfigSpec.Jobs[0].Name
for _, v := range data {
m := metric{
Labels: make(map[string]string),
UUID: p.uuid,
Query: query,
MetricName: metricName,
JobName: jobName,
}
for k, v := range v.Metric {
if k == "__name__" {
continue
}
m.Labels[string(k)] = string(v)
}
if math.IsNaN(float64(v.Value)) {
m.Value = 0
} else {
m.Value = float64(v.Value)
}
m.Timestamp = v.Timestamp.Time()
*metrics = append(*metrics, m)
}
return nil
}

func (p *Prometheus) parseMatrix(metricName, query, jobName string, value model.Value, metrics *[]interface{}) error {
data, ok := value.(model.Matrix)
if !ok {
return prometheusError(fmt.Errorf("Unsupported result format: %s", value.Type().String()))
}
for _, v := range data {
for _, val := range v.Values {
Expand Down
4 changes: 4 additions & 0 deletions test/metrics-profile.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
metrics:
- query: process_resident_memory_bytes{job="prometheus"}
metricName: prometheusRSS

- query: prometheus_build_info
metricName: prometheusBuildInfo
instant: true

0 comments on commit 72ab840

Please sign in to comment.