forked from kubernetes-retired/heapster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
driver.go
275 lines (245 loc) · 8.52 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcmautoscaling
import (
"fmt"
"net/url"
"time"
"github.com/GoogleCloudPlatform/heapster/extpoints"
"github.com/GoogleCloudPlatform/heapster/sinks/gcm"
"github.com/golang/glog"
sink_api "github.com/GoogleCloudPlatform/heapster/sinks/api/v1"
kube_api "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
var (
LabelHostname = sink_api.LabelDescriptor{
Key: "hostname",
Description: "Hostname where the container ran",
}
LabelGCEResourceID = sink_api.LabelDescriptor{
Key: "compute.googleapis.com/resource_id",
Description: "Resource id for nodes specific for GCE.",
}
LabelGCEResourceType = sink_api.LabelDescriptor{
Key: "compute.googleapis.com/resource_type",
Description: "Resource types for nodes specific for GCE.",
}
cpuUsage = "cpu/usage"
cpuLimit = "cpu/limit"
memUsage = "memory/usage"
memLimit = "memory/limit"
)
var autoscalingLabels = []sink_api.LabelDescriptor{
LabelHostname,
LabelGCEResourceID,
LabelGCEResourceType,
}
type utilizationMetric struct {
name string
description string
}
var autoscalingMetrics = map[string]utilizationMetric{
cpuUsage: {
name: "cpu/node_utilization",
description: "Cpu utilization as a share of node capacity",
},
cpuLimit: {
name: "cpu/node_reservation",
description: "Share of cpu that is reserved on the node",
},
memUsage: {
name: "memory/node_utilization",
description: "Memory utilization as a share of memory capacity",
},
memLimit: {
name: "memory/node_reservation",
description: "Share of memory that is reserved on the node",
},
}
// Since the input may contain data from different time windows we want to support it.
type hostTime struct {
host string
time time.Time
}
type gcmAutocalingSink struct {
core *gcm.GcmCore
// For given hostname and time remembers its cpu capacity in milicores.
cpuCapacity map[hostTime]int64
// For given hostname and time remembers amount of reserved cpu in milicores.
cpuReservation map[hostTime]int64
// For given hostname and time remembers its memory capacity in bytes.
memCapacity map[hostTime]int64
// For given hostname and time remembers amount of reserved memory in bytes.
memReservation map[hostTime]int64
}
// Adds the specified metrics or updates them if they already exist.
func (self gcmAutocalingSink) Register(_ []sink_api.MetricDescriptor) error {
for _, metric := range autoscalingMetrics {
if err := self.core.Register(metric.name, metric.description, sink_api.MetricGauge.String(), sink_api.ValueDouble.String(), autoscalingLabels); err != nil {
return err
}
}
return nil
}
func (self gcmAutocalingSink) Unregister(_ []sink_api.MetricDescriptor) error {
for _, metric := range autoscalingMetrics {
if err := self.core.Unregister(metric.name); err != nil {
return err
}
}
return nil
}
// Stores events into the backend.
func (self gcmAutocalingSink) StoreEvents([]kube_api.Event) error {
// No-op, Google Cloud Monitoring doesn't store events
return nil
}
func isNode(metric *sink_api.Point) bool {
return metric.Labels[sink_api.LabelContainerName.Key] == "/"
}
func isPodContainer(metric *sink_api.Point) bool {
return len(metric.Labels[sink_api.LabelPodName.Key]) > 0
}
func (self *gcmAutocalingSink) updateMachineCapacityAndReservation(input []sink_api.Timeseries) {
self.cpuCapacity = make(map[hostTime]int64)
self.cpuReservation = make(map[hostTime]int64)
self.memCapacity = make(map[hostTime]int64)
self.memReservation = make(map[hostTime]int64)
for _, entry := range input {
metric := entry.Point
if metric.Name != cpuLimit && metric.Name != memLimit {
continue
}
host := metric.Labels[sink_api.LabelHostname.Key]
value, ok := metric.Value.(int64)
if !ok || value < 1 {
continue
}
if isNode(metric) {
if metric.Name == cpuLimit {
self.cpuCapacity[hostTime{host, metric.End}] = value
} else if metric.Name == memLimit {
self.memCapacity[hostTime{host, metric.End}] = value
}
} else if isPodContainer(metric) {
if metric.Name == cpuLimit {
self.cpuReservation[hostTime{host, metric.End}] += value
} else if metric.Name == memLimit {
self.memReservation[hostTime{host, metric.End}] += value
}
}
}
}
// For the given metric compute minimal set of labels required by autoscaling.
// See more: https://cloud.google.com/compute/docs/autoscaler/scaling-cloud-monitoring-metrics#custom_metrics_beta
func getLabels(metric *sink_api.Point) map[string]string {
return map[string]string{
gcm.FullLabelName(LabelHostname.Key): metric.Labels[sink_api.LabelHostname.Key],
gcm.FullLabelName(LabelGCEResourceID.Key): metric.Labels[sink_api.LabelHostID.Key],
gcm.FullLabelName(LabelGCEResourceType.Key): "instance",
}
}
// For the given metric compute value of corresponding metric based on
// the original value and precomputed node stats.
func (self *gcmAutocalingSink) getNewValue(metric *sink_api.Point, ts *gcm.Timeseries) *float64 {
host := metric.Labels[sink_api.LabelHostname.Key]
var val float64
switch metric.Name {
case cpuUsage:
capacity, ok := self.cpuCapacity[hostTime{host, metric.End}]
if !ok || capacity < 1 || ts.Point.DoubleValue == nil {
return nil
}
val = *ts.Point.DoubleValue / float64(capacity)
case cpuLimit:
reserved, ok := self.cpuReservation[hostTime{host, metric.End}]
capacity, ok2 := self.cpuCapacity[hostTime{host, metric.End}]
if !ok || !ok2 || capacity < 1 {
return nil
}
val = float64(reserved) / float64(capacity)
case memUsage:
capacity, ok := self.memCapacity[hostTime{host, metric.End}]
if !ok || capacity < 1 || ts.Point.Int64Value == nil {
return nil
}
val = float64(*ts.Point.Int64Value) / float64(capacity)
case memLimit:
reserved, ok := self.memReservation[hostTime{host, metric.End}]
capacity, ok2 := self.memCapacity[hostTime{host, metric.End}]
if !ok || !ok2 || capacity < 1 {
return nil
}
val = float64(reserved) / float64(capacity)
default:
return nil
}
return &val
}
// Pushes the specified metric values in input. The metrics must already exist.
func (self gcmAutocalingSink) StoreTimeseries(input []sink_api.Timeseries) error {
self.updateMachineCapacityAndReservation(input)
// Build a map of metrics by name.
metrics := make(map[string][]gcm.Timeseries)
for _, entry := range input {
metric := entry.Point
// We want to export only node metrics.
if !isNode(metric) {
continue
}
var ts *gcm.Timeseries
var err error
if metric.Name == cpuUsage {
ts, err = self.core.GetEquivalentRateMetric(metric)
} else if metric.Name == cpuLimit || metric.Name == memUsage || metric.Name == memLimit {
ts, err = self.core.GetMetric(metric)
} else {
continue
}
if err != nil || ts == nil {
glog.Infof("Failed to create Timeseries for metric %v, host %v. Error %v.", autoscalingMetrics[metric.Name].name, metric.Labels[sink_api.LabelHostname.Key], err)
continue
}
val := self.getNewValue(metric, ts)
if val == nil {
glog.Infof("Failed to compute new value for metric %v, host %v.", autoscalingMetrics[metric.Name].name, metric.Labels[sink_api.LabelHostname.Key])
continue
}
ts.Point.Int64Value = nil
ts.Point.DoubleValue = val
name := gcm.FullMetricName(autoscalingMetrics[metric.Name].name)
ts.TimeseriesDescriptor.Metric = name
ts.TimeseriesDescriptor.Labels = getLabels(metric)
metrics[name] = append(metrics[name], *ts)
}
return self.core.StoreTimeseries(metrics)
}
func (self gcmAutocalingSink) DebugInfo() string {
return "Sink Type: GCM Autoscaling"
}
func (self gcmAutocalingSink) Name() string {
return "Google Cloud Monitoring Sink for Autoscaling"
}
func init() {
extpoints.SinkFactories.Register(CreateGCMScalingSink, "gcmautoscaling")
}
func CreateGCMScalingSink(uri *url.URL) ([]sink_api.ExternalSink, error) {
if *uri != (url.URL{}) {
return nil, fmt.Errorf("gcmautoscaling sinks don't take arguments")
}
core, err := gcm.NewCore()
sink := gcmAutocalingSink{core: core}
glog.Infof("created GCM Autocaling sink")
return []sink_api.ExternalSink{sink}, err
}