-
Notifications
You must be signed in to change notification settings - Fork 2
/
service-kubernetes.go
286 lines (279 loc) · 11.3 KB
/
service-kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
package idler
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
client "sigs.k8s.io/controller-runtime/pkg/client"
prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
prometheusmodel "github.com/prometheus/common/model"
)
// KubernetesServiceIdler handles scaling deployments in kubernetes.
func (h *Idler) KubernetesServiceIdler(ctx context.Context, opLog logr.Logger, namespace corev1.Namespace, lagoonProject string, forceIdle, forceScale bool) {
labelRequirements := generateLabelRequirements(h.Selectors.Service.Builds)
listOption := (&client.ListOptions{}).ApplyOptions([]client.ListOption{
client.InNamespace(namespace.ObjectMeta.Name),
client.MatchingLabelsSelector{
Selector: labels.NewSelector().Add(labelRequirements...),
},
})
podIntervalCheck := h.PodCheckInterval
prometheusInternalCheck := h.PrometheusCheckInterval
// allow namespace interval overides
if podinterval, ok := namespace.ObjectMeta.Annotations["idling.amazee.io/pod-interval"]; ok {
t, err := time.ParseDuration(podinterval)
if err == nil {
podIntervalCheck = t
}
}
if promethusinterval, ok := namespace.ObjectMeta.Annotations["idling.amazee.io/prometheus-interval"]; ok {
t, err := time.ParseDuration(promethusinterval)
if err == nil {
prometheusInternalCheck = t
}
}
builds := &corev1.PodList{}
runningBuild := false
if !h.Selectors.Service.SkipBuildCheck {
if err := h.Client.List(ctx, builds, listOption); err != nil {
opLog.Error(err, fmt.Sprintf("Error getting running builds for namespace %s", namespace.ObjectMeta.Name))
} else {
for _, build := range builds.Items {
if build.Status.Phase == "Running" || build.Status.Phase == "Pending" {
// if we have any pending builds, break out of this loop and try the next namespace
opLog.Info(fmt.Sprintf("Environment has running build, skipping"))
runningBuild = true
break
}
}
}
}
// if there are no builds, then check all the deployments that match our labelselectors
if !runningBuild {
labelRequirements := generateLabelRequirements(h.Selectors.Service.Deployments)
listOption = (&client.ListOptions{}).ApplyOptions([]client.ListOption{
client.InNamespace(namespace.ObjectMeta.Name),
client.MatchingLabelsSelector{
Selector: labels.NewSelector().Add(labelRequirements...),
},
})
idle := false
deployments := &appsv1.DeploymentList{}
if err := h.Client.List(ctx, deployments, listOption); err != nil {
// if we can't get any deployment configs for this namespace, log it and move on to the next
opLog.Error(err, fmt.Sprintf("Error getting deployments"))
return
}
for _, deployment := range deployments.Items {
checkPods := false
zeroReps := new(int32)
*zeroReps = 0
if deployment.Spec.Replicas != zeroReps {
opLog.Info(fmt.Sprintf("Deployment %s has %d running replicas", deployment.ObjectMeta.Name, *deployment.Spec.Replicas))
checkPods = true
} else {
if h.Debug {
opLog.Info(fmt.Sprintf("Deployment %s already idled", deployment.ObjectMeta.Name))
}
}
if checkPods {
pods := &corev1.PodList{}
// pods in kubernetes have the label `h.Selectors.ServiceName` with the name of the deployment in it
listOption = (&client.ListOptions{}).ApplyOptions([]client.ListOption{
client.InNamespace(namespace.ObjectMeta.Name),
client.MatchingLabels(map[string]string{h.Selectors.ServiceName: deployment.ObjectMeta.Name}),
})
if err := h.Client.List(ctx, pods, listOption); err != nil {
// if we can't get any pods for this deployment, log it and move on to the next
opLog.Error(err, fmt.Sprintf("Error listing pods"))
break
}
for _, pod := range pods.Items {
// check if the runtime of the pod is more than our interval
if pod.Status.StartTime != nil {
hs := time.Now().Sub(pod.Status.StartTime.Time)
if h.Debug {
opLog.Info(fmt.Sprintf("Pod %s has been running for %v", pod.ObjectMeta.Name, hs))
}
if hs > podIntervalCheck {
// if it is, set the idle flag
idle = true
}
}
}
}
}
// we the idle flag, then proceed to check the router logs and eventually idle the environment
if idle || forceIdle || forceScale {
numHits := 0
if !h.Selectors.Service.SkipHitCheck && !forceIdle && !forceScale {
opLog.Info(fmt.Sprintf("Environment marked for idling, checking routerlogs for hits"))
// query prometheus for hits to ingress resources in this namespace
v1api := prometheusapiv1.NewAPI(h.PrometheusClient)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// get the number of requests to any ingress in the exported namespace by status code
promQuery := fmt.Sprintf(
`round(sum(increase(nginx_ingress_controller_requests{exported_namespace="%s",status="200"}[%s])) by (status))`,
namespace.ObjectMeta.Name,
prometheusInternalCheck,
)
result, warnings, err := v1api.Query(ctx, promQuery, time.Now())
if err != nil {
opLog.Error(err, "Error querying Prometheus")
return
}
if len(warnings) > 0 {
opLog.Info(fmt.Sprintf("Warnings: %v", warnings))
}
// and then add up the results of all the status requests to determine hit count
if result.Type() == prometheusmodel.ValVector {
resultVal := result.(prometheusmodel.Vector)
for _, elem := range resultVal {
hits, _ := strconv.Atoi(elem.Value.String())
numHits = numHits + hits
}
}
// if the hits are not 0, then the environment doesn't need to be idled
opLog.Info(fmt.Sprintf("Environment has had %d hits in the last %s", numHits, prometheusInternalCheck))
if numHits != 0 {
opLog.Info(fmt.Sprintf("Environment does not need idling"))
return
}
}
// if there weren't any issues patching the ingress, then proceed to scale the deployments
// just disregard the error, we're logging it in the patchIngres function, but if that step fails
// the environment shouldn't be idled, as it will never unidle if the ingress annotation doesn't exist
err := h.patchIngress(ctx, opLog, namespace)
if err != nil {
// if patching the ingress resources fail, then don't idle the environment
opLog.Info(fmt.Sprintf("Environment not idled due to errors patching ingress"))
return
}
opLog.Info(fmt.Sprintf("Environment will be idled"))
h.idleDeployments(ctx, opLog, deployments, forceIdle, forceScale)
}
}
}
func (h *Idler) idleDeployments(ctx context.Context, opLog logr.Logger, deployments *appsv1.DeploymentList, forceIdle, forceScale bool) {
d := []string{}
for _, deployment := range deployments.Items {
d = append(d, deployment.ObjectMeta.Name)
// @TODO: use the patch method for the k8s client for now, this seems to work just fine
// Patching the deployment also works as we patch the endpoints below
if !h.DryRun {
// to avoid having the idle replicas as 0, always use 1
// this is to help prevent a deployment from incorrectly being told to have 0 replicas
idleReplicas := new(int32)
*idleReplicas = 1
if *deployment.Spec.Replicas > 0 {
// and override it with whatever is in the deployment if it is greater than 0
idleReplicas = deployment.Spec.Replicas
}
scaleDeployment := deployment.DeepCopy()
labels := map[string]string{
// add the watch label so that the unidler knows to look at it
"idling.amazee.io/watch": "true",
"idling.amazee.io/idled": "true",
}
if forceIdle {
labels["idling.amazee.io/force-idled"] = "true"
}
if forceScale {
labels["idling.amazee.io/force-scaled"] = "true"
}
mergePatch, _ := json.Marshal(map[string]interface{}{
"spec": map[string]interface{}{
"replicas": 0,
},
"metadata": map[string]interface{}{
"labels": labels,
"annotations": map[string]string{
// add these annotations so user knows to look at them
"idling.amazee.io/idled-at": time.Now().Format(time.RFC3339),
"idling.amazee.io/unidle-replicas": strconv.FormatInt(int64(*idleReplicas), 10),
},
},
})
if err := h.Client.Patch(ctx, scaleDeployment, client.RawPatch(types.MergePatchType, mergePatch)); err != nil {
// log it but try and scale the rest of the deployments anyway (some idled is better than none?)
opLog.Info(fmt.Sprintf("Error scaling deployment %s", deployment.ObjectMeta.Name))
} else {
opLog.Info(fmt.Sprintf("Deployment %s scaled to 0", deployment.ObjectMeta.Name))
}
} else {
opLog.Info(fmt.Sprintf("Deployment %s would be scaled to 0", deployment.ObjectMeta.Name))
}
}
}
/*
patchIngress will patch any ingress with matching labels with the `custom-http-errors` annotation.
this annotation is used by the unidler to make sure that the correct information is passed to the custom backend for
the nginx ingress controller so that we can handle unidling of the environment properly
*/
func (h *Idler) patchIngress(ctx context.Context, opLog logr.Logger, namespace corev1.Namespace) error {
if !h.Selectors.Service.SkipIngressPatch {
labelRequirements := generateLabelRequirements(h.Selectors.Service.Ingress)
listOption := (&client.ListOptions{}).ApplyOptions([]client.ListOption{
client.InNamespace(namespace.ObjectMeta.Name),
client.MatchingLabelsSelector{
Selector: labels.NewSelector().Add(labelRequirements...),
},
})
ingressList := &networkv1.IngressList{}
if err := h.Client.List(ctx, ingressList, listOption); err != nil {
// if we can't get any ingress for this namespace, log it and move on to the next
opLog.Error(err, fmt.Sprintf("Error getting ingress"))
return fmt.Errorf("Error getting ingress")
}
patched := false
for _, ingress := range ingressList.Items {
if !h.DryRun {
ingressCopy := ingress.DeepCopy()
mergePatch, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]string{
"idling.amazee.io/idled": "true",
},
"annotations": map[string]string{
// add the custom-http-errors annotation so that the unidler knows to handle this ingress
"nginx.ingress.kubernetes.io/custom-http-errors": "503",
},
},
})
if err := h.Client.Patch(ctx, ingressCopy, client.RawPatch(types.MergePatchType, mergePatch)); err != nil {
// log it but try and patch the other ingress anyway (some idled is better than none?)
opLog.Info(fmt.Sprintf("Error patching ingress %s", ingress.ObjectMeta.Name))
return fmt.Errorf(fmt.Sprintf("Error patching ingress %s", ingress.ObjectMeta.Name))
}
opLog.Info(fmt.Sprintf("Ingress %s patched", ingress.ObjectMeta.Name))
patched = true
} else {
opLog.Info(fmt.Sprintf("Ingress %s would be patched", ingress.ObjectMeta.Name))
}
}
if patched {
// update the namespace to indicate it is idled
namespaceCopy := namespace.DeepCopy()
mergePatch, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]string{
"idling.amazee.io/idled": "true",
},
},
})
serviceIdleEvents.Inc()
if err := h.Client.Patch(ctx, namespaceCopy, client.RawPatch(types.MergePatchType, mergePatch)); err != nil {
return fmt.Errorf(fmt.Sprintf("Error patching namespace %s", namespace.Name))
}
}
}
return nil
}