/
cloudwatch.go
119 lines (96 loc) · 3.02 KB
/
cloudwatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package backend
import (
"fmt"
"log"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/buildkite/buildkite-metrics/collector"
)
// CloudWatchDimension is a dimension to add to metrics
type CloudWatchDimension struct {
Key string
Value string
}
func ParseCloudWatchDimensions(ds string) ([]CloudWatchDimension, error) {
dimensions := []CloudWatchDimension{}
if strings.TrimSpace(ds) == "" {
return dimensions, nil
}
for _, dimension := range strings.Split(strings.TrimSpace(ds), ",") {
parts := strings.SplitN(strings.TrimSpace(dimension), "=", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("Failed to parse dimension of %q", dimension)
}
dimensions = append(dimensions, CloudWatchDimension{
Key: parts[0],
Value: parts[1],
})
}
return dimensions, nil
}
// CloudWatchBackend sends metrics to AWS CloudWatch
type CloudWatchBackend struct {
dimensions []CloudWatchDimension
}
// NewCloudWatchBackend returns a new CloudWatchBackend with optional dimensions
func NewCloudWatchBackend(dimensions []CloudWatchDimension) *CloudWatchBackend {
return &CloudWatchBackend{dimensions: dimensions}
}
func (cb *CloudWatchBackend) Collect(r *collector.Result) error {
svc := cloudwatch.New(session.New())
for _, d := range cb.dimensions {
log.Printf("Using custom Cloudwatch dimension of [ %s = %s ]", d.Key, d.Value)
}
metrics := []*cloudwatch.MetricDatum{}
metrics = append(metrics, cloudwatchMetrics(r.Totals, nil)...)
for name, c := range r.Queues {
dimensions := []*cloudwatch.Dimension{}
// Add custom dimension if provided
for _, d := range cb.dimensions {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: aws.String(d.Key), Value: aws.String(d.Value),
})
}
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: aws.String("Queue"), Value: aws.String(name),
})
metrics = append(metrics, cloudwatchMetrics(c, dimensions)...)
}
log.Printf("Extracted %d cloudwatch metrics from results", len(metrics))
for _, chunk := range chunkCloudwatchMetrics(10, metrics) {
log.Printf("Submitting chunk of %d metrics to Cloudwatch", len(chunk))
_, err := svc.PutMetricData(&cloudwatch.PutMetricDataInput{
MetricData: chunk,
Namespace: aws.String("Buildkite"),
})
if err != nil {
return err
}
}
return nil
}
func cloudwatchMetrics(counts map[string]int, dimensions []*cloudwatch.Dimension) []*cloudwatch.MetricDatum {
m := []*cloudwatch.MetricDatum{}
for k, v := range counts {
m = append(m, &cloudwatch.MetricDatum{
MetricName: aws.String(k),
Dimensions: dimensions,
Value: aws.Float64(float64(v)),
Unit: aws.String("Count"),
})
}
return m
}
func chunkCloudwatchMetrics(size int, data []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum {
var chunks = [][]*cloudwatch.MetricDatum{}
for i := 0; i < len(data); i += size {
end := i + size
if end > len(data) {
end = len(data)
}
chunks = append(chunks, data[i:end])
}
return chunks
}