-
Notifications
You must be signed in to change notification settings - Fork 9
/
operations.go
499 lines (438 loc) · 20 KB
/
operations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
/*
This file is part of Astarte.
Copyright 2020 Ispirata Srl
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package upgrade
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/go-logr/logr"
"github.com/openlyinc/pointy"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/transport/spdy"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
commontypes "github.com/astarte-platform/astarte-kubernetes-operator/apis/api/commontypes"
apiv1alpha1 "github.com/astarte-platform/astarte-kubernetes-operator/apis/api/v1alpha1"
"github.com/astarte-platform/astarte-kubernetes-operator/lib/misc"
"github.com/astarte-platform/astarte-kubernetes-operator/lib/reconcile"
)
// ForceRunModeEnv indicates if the operator should be forced to run in either local
// or cluster mode (currently only used for local mode)
var ForceRunModeEnv = "OSDK_FORCE_RUN_MODE"
type RunModeType string
const (
LocalRunMode RunModeType = "local"
ClusterRunMode RunModeType = "cluster"
)
func isRunModeLocal() bool {
return os.Getenv(ForceRunModeEnv) == string(LocalRunMode)
}
// ErrNoNamespace indicates that a namespace could not be found for the current
// environment
var ErrNoNamespace = fmt.Errorf("namespace not found for current environment")
// ErrRunLocal indicates that the operator is set to run in local mode (this error
// is returned by functions that only work on operators running in cluster mode)
var ErrRunLocal = fmt.Errorf("operator run mode forced to local")
// GetOperatorNamespace returns the namespace the operator should be running in.
func GetOperatorNamespace() (string, error) {
if isRunModeLocal() {
return "", ErrRunLocal
}
nsBytes, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
if os.IsNotExist(err) {
return "", ErrNoNamespace
}
return "", err
}
ns := strings.TrimSpace(string(nsBytes))
log.V(1).Info("Found namespace", "Namespace", ns)
return ns, nil
}
func shutdownVerneMQ(cr *apiv1alpha1.Astarte, c client.Client, recorder record.EventRecorder) error {
reqLogger := log.WithValues("Request.Namespace", cr.Namespace, "Request.Name", cr.Name)
// First, bring down VerneMQ by putting its replicas to 0, and wait until it is settled.
verneMQStatefulSetName := cr.Name + "-vernemq"
verneMQStatefulSet := &appsv1.StatefulSet{}
if err := c.Get(context.TODO(), types.NamespacedName{Name: verneMQStatefulSetName, Namespace: cr.Namespace}, verneMQStatefulSet); err != nil {
return fmt.Errorf("Could not retrieve VerneMQ statefulset: %v", err)
}
verneMQStatefulSet.Spec.Replicas = pointy.Int32(0)
reqLogger.Info("Bringing down the broker to prevent data loss and mismatches. Devices won't be able to connect until the next reconciliation.")
if err := c.Update(context.TODO(), verneMQStatefulSet); err != nil {
return fmt.Errorf("Could not downscale VerneMQ statefulset: %v", err)
}
recorder.Event(cr, "Normal", commontypes.AstarteResourceEventUpgrade.String(),
"Bringing down the broker to prevent data loss and mismatches. Devices won't be able to connect until the next reconciliation")
reqLogger.Info("Waiting for the broker to go down...")
// Now wait
if err := wait.Poll(retryInterval, timeout, func() (done bool, err error) {
statefulSet := &appsv1.StatefulSet{}
if err = c.Get(context.TODO(), types.NamespacedName{Name: verneMQStatefulSetName, Namespace: cr.Namespace}, statefulSet); err != nil {
return false, err
}
if statefulSet.Status.Replicas > 0 {
return false, nil
}
return true, nil
}); err != nil {
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventUpgradeError.String(),
"Could not bring down the Broker. Upgrade will be retried")
return fmt.Errorf("Failed in waiting for VerneMQ statefulset to shutdown: %v", err)
}
return nil
}
func drainRabbitMQQueues(cr *apiv1alpha1.Astarte, c client.Client, recorder record.EventRecorder) error {
reqLogger := log.WithValues("Request.Namespace", cr.Namespace, "Request.Name", cr.Name)
// We might also find out whether the queue has been entirely drained, so we don't lose
// data. If we're deployed externally, we have to initiate a port forward.
rmqHost, _, rmqUser, rmqPass, err := misc.GetRabbitMQCredentialsFor(cr, c)
if err != nil {
reqLogger.Error(err, "Could not fetch RabbitMQ credentials. Skipping RabbitMQ queue checks.")
return err
}
// If we need to port forward, a connection will be opened. Replace the host, in that case. We assume
// RabbitMQ Management is enabled (it's a requirement, anyway)
var stopChannel chan struct{}
if newHost, theStopChannel, err := openRabbitMQPortForward(cr); err == nil && newHost != "" {
stopChannel = theStopChannel
rmqHost = newHost
} else if err != nil {
return err
}
recorder.Event(cr, "Normal", commontypes.AstarteResourceEventUpgrade.String(),
"Draining RabbitMQ Data Queues")
// Get the queue state
httpClient := &http.Client{}
req, _ := http.NewRequest("GET", "http://"+rmqHost+":15672/api/queues", nil)
req.SetBasicAuth(rmqUser, rmqPass)
// Wait up to a minute, otherwise restart
if err := wait.Poll(5*time.Second, time.Minute, func() (done bool, err error) {
resp, e := httpClient.Do(req)
if e != nil {
reqLogger.Error(e, "Could not query RabbitMQ Management, retrying...")
return false, nil
}
defer resp.Body.Close()
respBody, _ := ioutil.ReadAll(resp.Body)
respJSON := []map[string]interface{}{}
if e2 := json.Unmarshal(respBody, &respJSON); e2 != nil {
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventCriticalError.String(),
"Unrecoverable error in querying RabbitMQ Management. Upgrade will be retried, but manual intervention is likely required")
reqLogger.Error(e2, "Unrecoverable error in querying RabbitMQ Management")
return false, e2
}
for _, queueState := range respJSON {
// Check if it matches one of the data queues from 0.10 onwards
if ok, e3 := checkRabbitMQQueue(queueState, reqLogger); !ok {
return false, e3
}
}
return true, nil
}); err != nil {
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventCriticalError.String(),
"Timed out while waiting for queues to drain. Upgrade will be retried, but manual intervention is likely required")
reqLogger.Error(err, "Failed in waiting for RabbitMQ queues to be drained")
return err
}
reqLogger.Info("RabbitMQ Data Queue(s) drained")
recorder.Event(cr, "Normal", commontypes.AstarteResourceEventUpgrade.String(),
"RabbitMQ Data Queues successfully drained")
if stopChannel != nil {
// Close the forwarder
close(stopChannel)
}
return nil
}
func checkRabbitMQQueue(queueState map[string]interface{}, reqLogger logr.Logger) (bool, error) {
// Check if it matches one of the data queues from 0.10 onwards
queueName, ok := queueState["name"].(string)
switch {
case !ok:
return false, fmt.Errorf("Malformed JSON reply from RabbitMQ management")
case queueName == "astarte_data_updater_plant_rpc":
// Break false positives
return true, nil
case queueName == "vmq_all", strings.HasPrefix(queueName, "astarte_data_"):
// Match, don't do anything
default:
// Don't take the queue into account
return true, nil
}
// float64 is how this is decoded by Go
messagesReady, ok := queueState["messages_ready"].(float64)
switch {
case !ok:
return false, fmt.Errorf("Malformed JSON reply from RabbitMQ management")
case messagesReady > 0:
reqLogger.Info("Waiting for RabbitMQ Data Queues to drain.",
"MessagesLeft", messagesReady, "QueueName", queueName)
return false, nil
}
return true, nil
}
func openRabbitMQPortForward(cr *apiv1alpha1.Astarte) (string, chan struct{}, error) {
reqLogger := log.WithValues("Request.Namespace", cr.Namespace, "Request.Name", cr.Name)
var fw *portforward.PortForwarder
var stopChannel chan struct{}
// Note that we're trying to find out whether the operator is running outside the cluster
if _, err := GetOperatorNamespace(); err != nil {
if err == ErrNoNamespace || err == ErrRunLocal {
reqLogger.Info("Not running in a cluster - trying to forward RabbitMQ port")
restConfig, e := config.GetConfig()
if e != nil {
return "", nil, e
}
path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s-rabbitmq-0/portforward", restConfig.Host, cr.Namespace, cr.Name)
url, e := url.Parse(path)
if e != nil {
return "", nil, e
}
transport, upgrader, e := spdy.RoundTripperFor(restConfig)
if e != nil {
return "", nil, e
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
stopChannel = make(chan struct{}, 1)
readyChannel := make(chan struct{})
errChannel := make(chan error)
// Well, Go!
go func() {
var ferr error
if fw, ferr = portforward.New(dialer, []string{"15672:15672"}, stopChannel, readyChannel, nil, nil); ferr != nil {
errChannel <- ferr
}
if ferr = fw.ForwardPorts(); ferr != nil {
errChannel <- ferr
}
}()
select {
case <-readyChannel:
break
case e := <-errChannel:
return "", nil, e
}
return "localhost", stopChannel, nil
}
return "", nil, err
}
return "", nil, nil
}
func waitForQueueLayoutMigration(version string, cr *apiv1alpha1.Astarte, c client.Client, scheme *runtime.Scheme) error {
dataUpdaterPlant := cr.Spec.Components.DataUpdaterPlant.DeepCopy()
dataUpdaterPlant.Version = version
// Ensure the policy is Replace. We don't want to have old pods hanging around.
dataUpdaterPlant.DeploymentStrategy = &appsv1.DeploymentStrategy{Type: appsv1.RecreateDeploymentStrategyType}
if err := reconcile.EnsureAstarteGenericBackend(cr, dataUpdaterPlant.AstarteGenericClusteredResource, commontypes.DataUpdaterPlant, c, scheme); err != nil {
return err
}
// The operation should be pretty normal and quick enough. Wait with standard timeouts here
return wait.Poll(retryInterval, timeout, func() (done bool, err error) {
deployment := &appsv1.Deployment{}
if err = c.Get(context.TODO(), types.NamespacedName{Name: cr.Name + "-data-updater-plant", Namespace: cr.Namespace}, deployment); err != nil {
return false, err
}
if deployment.Status.ReadyReplicas > 0 {
return true, nil
}
return false, nil
})
}
func waitForHousekeepingUpgrade(cr *apiv1alpha1.Astarte, c client.Client, recorder record.EventRecorder) error {
weirdFailuresCount := 0
weirdFailuresThreshold := 10
return wait.Poll(retryInterval, time.Hour, func() (done bool, err error) {
deployment := &appsv1.Deployment{}
if err = c.Get(context.TODO(), types.NamespacedName{Name: cr.Name + "-housekeeping-api", Namespace: cr.Namespace}, deployment); err != nil {
weirdFailuresCount++
if weirdFailuresCount > weirdFailuresThreshold {
// Something is off.
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventCriticalError.String(),
"Repeated errors in monitoring Database Migration. Manual intervention is likely required")
return false, fmt.Errorf("Failed in looking up Housekeeping API Deployment. Most likely, manual intervention is required. %v", err)
}
// Something is off.
log.Error(err, "Failed in looking up Housekeeping API Deployment. This might be a temporary problem - will retry")
return false, nil
}
if deployment.Status.ReadyReplicas >= 1 {
// That's it bros.
return true, nil
}
// Ensure we aren't in the position where Housekeeping itself is crashing.
housekeepingComponent := commontypes.Housekeeping
podList := &v1.PodList{}
if err = c.List(context.TODO(), podList, client.InNamespace(cr.Namespace),
client.MatchingLabels{"astarte-component": housekeepingComponent.DashedString()}); err != nil {
weirdFailuresCount++
if weirdFailuresCount > weirdFailuresThreshold {
// Something is off.
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventCriticalError.String(),
"Repeated errors in monitoring Database Migration. Manual intervention is likely required")
return false, fmt.Errorf("Failed in looking up Housekeeping pods. Most likely, manual intervention is required. %v", err)
}
// Something is off.
log.Error(err, "Failed in looking up Housekeeping pods. This might be a temporary problem - will retry")
return false, nil
}
// Inspect the list!
if len(podList.Items) != 1 {
weirdFailuresCount++
if weirdFailuresCount > weirdFailuresThreshold {
// Something is off.
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventCriticalError.String(),
"Repeated errors in monitoring Database Migration. Manual intervention is likely required")
return false, fmt.Errorf("%v Housekeeping pods found. Most likely, manual intervention is required. %v", len(podList.Items), err)
}
// Something is off.
log.Error(err, fmt.Sprintf("%v Housekeeping pods found. This might be a temporary problem - will retry", len(podList.Items)))
return false, nil
}
if len(podList.Items[0].Status.ContainerStatuses) != 1 {
weirdFailuresCount++
if weirdFailuresCount > weirdFailuresThreshold {
// Something is off.
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventCriticalError.String(),
"Repeated errors in monitoring Database Migration. Manual intervention is likely required")
return false, fmt.Errorf("%v Container Statuses retrieved. Most likely, manual intervention is required. %v", len(podList.Items[0].Status.ContainerStatuses), err)
}
// Something is off.
log.Error(err, fmt.Sprintf("%v Container Statuses retrieved. This might be a temporary problem - will retry", len(podList.Items[0].Status.ContainerStatuses)))
return false, nil
}
if podList.Items[0].Status.ContainerStatuses[0].State.Waiting != nil {
if podList.Items[0].Status.ContainerStatuses[0].State.Waiting.Reason == "CrashLoopBackoff" {
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventCriticalError.String(),
"Database Migration failed. Manual intervention is likely required")
return true, fmt.Errorf("Housekeeping is crashing repeatedly. There has to be a problem in handling Database migrations. Please take manual action as soon as possible")
}
}
return false, nil
})
}
func upgradeHousekeeping(version string, drainVerneMQResources bool, cr *apiv1alpha1.Astarte, c client.Client, scheme *runtime.Scheme,
recorder record.EventRecorder) (*commontypes.AstarteGenericClusteredResource, error) {
housekeepingBackend := cr.Spec.Components.Housekeeping.Backend.DeepCopy()
housekeepingBackend.Version = version
housekeepingBackend.Replicas = pointy.Int32(1)
// Ensure the policy is Replace. We don't want to have old pods hanging around.
housekeepingBackend.DeploymentStrategy = &appsv1.DeploymentStrategy{Type: appsv1.RecreateDeploymentStrategyType}
if cr.Spec.VerneMQ.Resources != nil && drainVerneMQResources {
resourceRequirements := misc.GetResourcesForAstarteComponent(cr, housekeepingBackend.Resources, commontypes.Housekeeping)
resourceRequirements.Requests.Cpu().Add(*cr.Spec.VerneMQ.Resources.Requests.Cpu())
resourceRequirements.Requests.Memory().Add(*cr.Spec.VerneMQ.Resources.Requests.Memory())
resourceRequirements.Limits.Cpu().Add(*cr.Spec.VerneMQ.Resources.Limits.Cpu())
resourceRequirements.Limits.Memory().Add(*cr.Spec.VerneMQ.Resources.Limits.Memory())
// This way, on the next call to GetResourcesForAstarteComponent, these resources will be returned as explicitly stated
// in the original spec.
housekeepingBackend.Resources = &resourceRequirements
}
// Add a custom, more permissive probe to the Backend
if err := reconcile.EnsureAstarteGenericBackendWithCustomProbe(cr, *housekeepingBackend, commontypes.Housekeeping,
c, scheme, getSpecialHousekeepingMigrationProbe("/health")); err != nil {
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventUpgradeError.String(),
"Could not initiate Database Migration. Upgrade will be retried")
return nil, err
}
housekeepingAPI := cr.Spec.Components.Housekeeping.API.DeepCopy()
housekeepingAPI.Replicas = pointy.Int32(1)
housekeepingAPI.Version = version
// Ensure the policy is Replace. We don't want to have old pods hanging around.
housekeepingAPI.DeploymentStrategy = &appsv1.DeploymentStrategy{Type: appsv1.RecreateDeploymentStrategyType}
if err := reconcile.EnsureAstarteGenericAPIWithCustomProbe(cr, *housekeepingAPI, commontypes.HousekeepingAPI, c,
scheme, getSpecialHousekeepingMigrationProbe("/health")); err != nil {
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventUpgradeError.String(),
"Could not initiate Database Migration. Upgrade will be retried")
return nil, err
}
return housekeepingBackend, nil
}
func scaleDownHousekeeping(housekeepingBackend *commontypes.AstarteGenericClusteredResource, cr *apiv1alpha1.Astarte,
c client.Client, scheme *runtime.Scheme) error {
housekeepingBackend.Replicas = pointy.Int32(0)
if err := reconcile.EnsureAstarteGenericBackend(cr, *housekeepingBackend, commontypes.Housekeeping, c, scheme); err != nil {
return err
}
// Wait for it to go down, then we should be good to go.
return wait.Poll(retryInterval, timeout, func() (done bool, err error) {
deployment := &appsv1.Deployment{}
if err = c.Get(context.TODO(), types.NamespacedName{Name: cr.Name + "-housekeeping", Namespace: cr.Namespace}, deployment); err != nil {
return false, err
}
if deployment.Status.ReadyReplicas > 0 {
return false, nil
}
return true, nil
})
}
func getSpecialHousekeepingMigrationProbe(path string) *v1.Probe {
// This is a special migration probe that handles longer timeouts due to migrations.
// Migrations can take an insane amount of time, as such we should take this into account.
return &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Path: path,
Port: intstr.FromString("http"),
},
},
// Start checking after 30 seconds.
InitialDelaySeconds: 30,
TimeoutSeconds: 5,
// Check every 30 seconds
PeriodSeconds: 30,
// Allow up to an hour before failing. That's 120 failures.
FailureThreshold: 120,
}
}
func tearDownCFSSLStatefulSet(cr *apiv1alpha1.Astarte, c client.Client, recorder record.EventRecorder) error {
reqLogger := log.WithValues("Request.Namespace", cr.Namespace, "Request.Name", cr.Name)
// First, delete CFSSL StatefulSet and wait until it is done
CFSSLStatefulSetName := cr.Name + "-cfssl"
CFSSLStatefulSet := &appsv1.StatefulSet{}
if err := c.Get(context.TODO(), types.NamespacedName{Name: CFSSLStatefulSetName, Namespace: cr.Namespace}, CFSSLStatefulSet); err != nil {
return fmt.Errorf("Cannot retrieve CFSSL StatefulSet: %v", err)
}
reqLogger.Info("Tearing down the CFSSL StatefulSet.")
if err := c.Delete(context.TODO(), CFSSLStatefulSet); err != nil {
return fmt.Errorf("Could not tear down CFSSL StatefulSet: %v", err)
}
recorder.Event(cr, "Normal", commontypes.AstarteResourceEventUpgrade.String(), "Tearing down the CFSSL StatefulSet.")
reqLogger.Info("Waiting for the CFSSL StatefulSet to go down...")
// Now wait
if err := wait.Poll(retryInterval, timeout, func() (done bool, err error) {
statefulSet := &appsv1.StatefulSet{}
if err := c.Get(context.TODO(), types.NamespacedName{Name: CFSSLStatefulSetName, Namespace: cr.Namespace}, statefulSet); err != nil {
return true, nil
}
return false, fmt.Errorf("Failed in waiting CFSSL StatefulSet to be teared down.")
}); err != nil {
recorder.Event(cr, "Warning", commontypes.AstarteResourceEventUpgradeError.String(),
"Could not teard down the CFSSL StatefulSet. Upgrade will be retried")
return err
}
return nil
}