forked from amazon-archives/k8s-cloudwatch-adapter
/
handler.go
117 lines (98 loc) · 3.32 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package controller
import (
"fmt"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
listers "github.com/awslabs/k8s-cloudwatch-adapter/pkg/client/listers/metrics/v1alpha1"
"github.com/awslabs/k8s-cloudwatch-adapter/pkg/metriccache"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
)
// Handler processes the events from the controler for external metrics
type Handler struct {
externalmetricLister listers.ExternalMetricLister
metriccache *metriccache.MetricCache
}
// NewHandler created a new handler
func NewHandler(externalmetricLister listers.ExternalMetricLister, metricCache *metriccache.MetricCache) Handler {
return Handler{
externalmetricLister: externalmetricLister,
metriccache: metricCache,
}
}
// ControllerHandler is a handler to process resource items
type ControllerHandler interface {
Process(queueItem namespacedQueueItem) error
}
// Process validates the item exists then stores updates the metric cached used to make requests to
// cloudwatch
func (h *Handler) Process(queueItem namespacedQueueItem) error {
ns, name, err := cache.SplitMetaNamespaceKey(queueItem.namespaceKey)
if err != nil {
// not a valid key do not put back on queue
runtime.HandleError(fmt.Errorf("expected namespace/name key in workqueue but got %s", queueItem.namespaceKey))
return err
}
switch queueItem.kind {
case "ExternalMetric":
return h.handleExternalMetric(ns, name, queueItem)
}
return nil
}
func (h *Handler) handleExternalMetric(ns, name string, queueItem namespacedQueueItem) error {
// check if item exists
glog.V(2).Infof("processing item '%s' in namespace '%s'", name, ns)
externalMetricInfo, err := h.externalmetricLister.ExternalMetrics(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
// Then this we should remove
glog.V(2).Infof("removing item from cache '%s' in namespace '%s'", name, ns)
h.metriccache.Remove(queueItem.Key())
return nil
}
return err
}
glog.V(2).Infof("externalMetricInfo: %v", externalMetricInfo)
queries := externalMetricInfo.Spec.Queries
// If changing logic in this block ensure changes are duplicated in
// `pkg/client.Query()`
cwMetricQueries := make([]cloudwatch.MetricDataQuery, len(queries))
for i, q := range queries {
q := q
mdq := cloudwatch.MetricDataQuery{
Id: &q.ID,
Label: &q.Label,
ReturnData: &q.ReturnData,
}
if len(q.Expression) == 0 {
dimensions := make([]cloudwatch.Dimension, len(q.MetricStat.Metric.Dimensions))
for j, d := range q.MetricStat.Metric.Dimensions {
dimensions[j] = cloudwatch.Dimension{
Name: &d.Name,
Value: &d.Value,
}
}
metric := &cloudwatch.Metric{
Dimensions: dimensions,
MetricName: &q.MetricStat.Metric.MetricName,
Namespace: &q.MetricStat.Metric.Namespace,
}
mdq.MetricStat = &cloudwatch.MetricStat{
Metric: metric,
Period: &q.MetricStat.Period,
Stat: &q.MetricStat.Stat,
Unit: cloudwatch.StandardUnit(q.MetricStat.Unit),
}
} else {
mdq.Expression = &q.Expression
}
cwMetricQueries[i] = mdq
}
cwQuery := cloudwatch.GetMetricDataInput{
MetricDataQueries: cwMetricQueries,
}
glog.V(2).Infof("adding to cache item '%s' in namespace '%s'", name, ns)
h.metriccache.Update(queueItem.Key(), name, cwQuery)
return nil
}