Skip to content

Commit

Permalink
CloudWatch: Datasource improvements (#20268)
Browse files Browse the repository at this point in the history
* CloudWatch: Datasource improvements

* Add statistic as template variale

* Add wildcard to list of values

* Template variable intercept dimension key

* Return row specific errors when transformation error occured

* Add meta feedback

* Make it possible to retrieve values without known metrics

* Add curated dashboard for EC2

* Fix broken tests

* Use correct dashboard name

* Display alert in case multi template var is being used for some certain props in the cloudwatch query

* Minor fixes after feedback

* Update dashboard json

* Update snapshot test

* Make sure region default is intercepted in cloudwatch link

* Update dashboards

* Include ec2 dashboard in ds

* Do not include ec2 dashboard in beta1

* Display actual region

(cherry picked from commit 00bef91)
  • Loading branch information
sunker authored and kylebrandt committed Nov 14, 2019
1 parent 6686611 commit 1f3c557
Show file tree
Hide file tree
Showing 62 changed files with 7,527 additions and 1,512 deletions.
34 changes: 3 additions & 31 deletions pkg/tsdb/cloudwatch/annotation_query.go
Expand Up @@ -24,7 +24,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
namespace := parameters.Get("namespace").MustString("")
metricName := parameters.Get("metricName").MustString("")
dimensions := parameters.Get("dimensions").MustMap()
statistics, extendedStatistics, err := parseStatistics(parameters)
statistics, err := parseStatistics(parameters)
if err != nil {
return nil, err
}
Expand All @@ -51,7 +51,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
if err != nil {
return nil, errors.New("Failed to call cloudwatch:DescribeAlarms")
}
alarmNames = filterAlarms(resp, namespace, metricName, dimensions, statistics, extendedStatistics, period)
alarmNames = filterAlarms(resp, namespace, metricName, dimensions, statistics, period)
} else {
if region == "" || namespace == "" || metricName == "" || len(statistics) == 0 {
return result, nil
Expand Down Expand Up @@ -82,22 +82,6 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
alarmNames = append(alarmNames, alarm.AlarmName)
}
}
for _, s := range extendedStatistics {
params := &cloudwatch.DescribeAlarmsForMetricInput{
Namespace: aws.String(namespace),
MetricName: aws.String(metricName),
Dimensions: qd,
ExtendedStatistic: aws.String(s),
Period: aws.Int64(period),
}
resp, err := svc.DescribeAlarmsForMetric(params)
if err != nil {
return nil, errors.New("Failed to call cloudwatch:DescribeAlarmsForMetric")
}
for _, alarm := range resp.MetricAlarms {
alarmNames = append(alarmNames, alarm.AlarmName)
}
}
}

startTime, err := queryContext.TimeRange.ParseFrom()
Expand Down Expand Up @@ -158,7 +142,7 @@ func transformAnnotationToTable(data []map[string]string, result *tsdb.QueryResu
result.Meta.Set("rowCount", len(data))
}

func filterAlarms(alarms *cloudwatch.DescribeAlarmsOutput, namespace string, metricName string, dimensions map[string]interface{}, statistics []string, extendedStatistics []string, period int64) []*string {
func filterAlarms(alarms *cloudwatch.DescribeAlarmsOutput, namespace string, metricName string, dimensions map[string]interface{}, statistics []string, period int64) []*string {
alarmNames := make([]*string, 0)

for _, alarm := range alarms.MetricAlarms {
Expand Down Expand Up @@ -197,18 +181,6 @@ func filterAlarms(alarms *cloudwatch.DescribeAlarmsOutput, namespace string, met
}
}

if len(extendedStatistics) != 0 {
found := false
for _, s := range extendedStatistics {
if *alarm.Statistic == s {
found = true
}
}
if !found {
continue
}
}

if period != 0 && *alarm.Period != period {
continue
}
Expand Down
176 changes: 2 additions & 174 deletions pkg/tsdb/cloudwatch/cloudwatch.go
Expand Up @@ -2,18 +2,13 @@ package cloudwatch

import (
"context"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"golang.org/x/sync/errgroup"
)

type CloudWatchExecutor struct {
Expand All @@ -38,21 +33,13 @@ func NewCloudWatchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, e
}

var (
plog log.Logger
standardStatistics map[string]bool
aliasFormat *regexp.Regexp
plog log.Logger
aliasFormat *regexp.Regexp
)

func init() {
plog = log.New("tsdb.cloudwatch")
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
standardStatistics = map[string]bool{
"Average": true,
"Maximum": true,
"Minimum": true,
"Sum": true,
"SampleCount": true,
}
aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
}

Expand All @@ -75,162 +62,3 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc

return result, err
}

func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
results := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries))

eg, ectx := errgroup.WithContext(ctx)

getMetricDataQueries := make(map[string]map[string]*CloudWatchQuery)
for i, model := range queryContext.Queries {
queryType := model.Model.Get("type").MustString()
if queryType != "timeSeriesQuery" && queryType != "" {
continue
}

RefId := queryContext.Queries[i].RefId
query, err := parseQuery(queryContext.Queries[i].Model)
if err != nil {
results.Results[RefId] = &tsdb.QueryResult{
Error: err,
}
return results, nil
}
query.RefId = RefId

if query.Id != "" {
if _, ok := getMetricDataQueries[query.Region]; !ok {
getMetricDataQueries[query.Region] = make(map[string]*CloudWatchQuery)
}
getMetricDataQueries[query.Region][query.Id] = query
continue
}

if query.Id == "" && query.Expression != "" {
results.Results[query.RefId] = &tsdb.QueryResult{
Error: fmt.Errorf("Invalid query: id should be set if using expression"),
}
return results, nil
}

eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- &tsdb.QueryResult{
RefId: query.RefId,
Error: theErr,
}
}
}
}()

queryRes, err := e.executeQuery(ectx, query, queryContext)
if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
return err
}
if err != nil {
resultChan <- &tsdb.QueryResult{
RefId: query.RefId,
Error: err,
}
return nil
}
resultChan <- queryRes
return nil
})
}

if len(getMetricDataQueries) > 0 {
for region, getMetricDataQuery := range getMetricDataQueries {
q := getMetricDataQuery
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- &tsdb.QueryResult{
Error: theErr,
}
}
}
}()

queryResponses, err := e.executeGetMetricDataQuery(ectx, region, q, queryContext)
if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
return err
}
for _, queryRes := range queryResponses {
if err != nil {
queryRes.Error = err
}
resultChan <- queryRes
}
return nil
})
}
}

if err := eg.Wait(); err != nil {
return nil, err
}
close(resultChan)
for result := range resultChan {
results.Results[result.RefId] = result
}

return results, nil
}

func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]string, label string) string {
region := query.Region
namespace := query.Namespace
metricName := query.MetricName
period := strconv.Itoa(query.Period)
if len(query.Id) > 0 && len(query.Expression) > 0 {
if strings.Index(query.Expression, "SEARCH(") == 0 {
pIndex := strings.LastIndex(query.Expression, ",")
period = strings.Trim(query.Expression[pIndex+1:], " )")
sIndex := strings.LastIndex(query.Expression[:pIndex], ",")
stat = strings.Trim(query.Expression[sIndex+1:pIndex], " '")
} else if len(query.Alias) > 0 {
// expand by Alias
} else {
return query.Id
}
}

data := map[string]string{}
data["region"] = region
data["namespace"] = namespace
data["metric"] = metricName
data["stat"] = stat
data["period"] = period
if len(label) != 0 {
data["label"] = label
}
for k, v := range dimensions {
data[k] = v
}

result := aliasFormat.ReplaceAllFunc([]byte(query.Alias), func(in []byte) []byte {
labelName := strings.Replace(string(in), "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, exists := data[labelName]; exists {
return []byte(val)
}

return in
})

if string(result) == "" {
return metricName + "_" + stat
}

return string(result)
}
62 changes: 62 additions & 0 deletions pkg/tsdb/cloudwatch/cloudwatch_query.go
@@ -0,0 +1,62 @@
package cloudwatch

import (
"strings"
)

type cloudWatchQuery struct {
RefId string
Region string
Id string
Namespace string
MetricName string
Stats string
Expression string
ReturnData bool
Dimensions map[string][]string
Period int
Alias string
Identifier string
HighResolution bool
MatchExact bool
UsedExpression string
RequestExceededMaxLimit bool
}

func (q *cloudWatchQuery) isMathExpression() bool {
return q.Expression != "" && !q.isUserDefinedSearchExpression()
}

func (q *cloudWatchQuery) isSearchExpression() bool {
return q.isUserDefinedSearchExpression() || q.isInferredSearchExpression()
}

func (q *cloudWatchQuery) isUserDefinedSearchExpression() bool {
return strings.Contains(q.Expression, "SEARCH(")
}

func (q *cloudWatchQuery) isInferredSearchExpression() bool {
if len(q.Dimensions) == 0 {
return !q.MatchExact
}

if !q.MatchExact {
return true
}

for _, values := range q.Dimensions {
if len(values) > 1 {
return true
}
for _, v := range values {
if v == "*" {
return true
}
}
}
return false
}

func (q *cloudWatchQuery) isMetricStat() bool {
return !q.isSearchExpression() && !q.isMathExpression()
}

0 comments on commit 1f3c557

Please sign in to comment.