-
Notifications
You must be signed in to change notification settings - Fork 16
/
fleet_rollout.go
171 lines (146 loc) · 5.61 KB
/
fleet_rollout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package tasks
import (
"context"
"fmt"
api "github.com/flightctl/flightctl/api/v1alpha1"
"github.com/flightctl/flightctl/internal/store"
"github.com/flightctl/flightctl/internal/store/model"
"github.com/flightctl/flightctl/internal/util"
"github.com/flightctl/flightctl/pkg/log"
"github.com/flightctl/flightctl/pkg/reqid"
"github.com/go-chi/chi/v5/middleware"
"github.com/sirupsen/logrus"
)
// Wait to be notified via channel about fleet template updates, exit upon ctx.Done()
func FleetRollouts(taskManager TaskManager) {
for {
select {
case <-taskManager.ctx.Done():
taskManager.log.Info("Received ctx.Done(), stopping")
return
case resourceRef := <-taskManager.channels[ChannelFleetRollout]:
requestID := reqid.NextRequestID()
ctx := context.WithValue(context.Background(), middleware.RequestIDKey, requestID)
log := log.WithReqIDFromCtx(ctx, taskManager.log)
logic := NewFleetRolloutsLogic(taskManager, log, taskManager.store, resourceRef)
if resourceRef.Op != FleetRolloutOpUpdate {
taskManager.log.Errorf("received unknown op %s", resourceRef.Op)
break
}
if resourceRef.Kind == model.FleetKind {
err := logic.RolloutFleet(ctx)
if err != nil {
taskManager.log.Errorf("failed rolling out fleet %s/%s: %v", resourceRef.OrgID, resourceRef.Name, err)
}
} else if resourceRef.Kind == model.DeviceKind {
err := logic.RolloutDevice(ctx)
if err != nil {
taskManager.log.Errorf("failed rolling out device %s/%s: %v", resourceRef.OrgID, resourceRef.Name, err)
}
} else {
taskManager.log.Errorf("FleetRollouts called with incorrect resource kind %s", resourceRef.Kind)
}
}
}
}
type FleetRolloutsLogic struct {
taskManager TaskManager
log logrus.FieldLogger
fleetStore store.Fleet
devStore store.Device
tvStore store.TemplateVersion
resourceRef ResourceReference
itemsPerPage int
}
func NewFleetRolloutsLogic(tm TaskManager, log logrus.FieldLogger, storeInst store.Store, resourceRef ResourceReference) FleetRolloutsLogic {
return FleetRolloutsLogic{
taskManager: tm,
log: log,
fleetStore: storeInst.Fleet(),
devStore: storeInst.Device(),
tvStore: storeInst.TemplateVersion(),
resourceRef: resourceRef,
itemsPerPage: ItemsPerPage,
}
}
func (f *FleetRolloutsLogic) SetItemsPerPage(items int) {
f.itemsPerPage = items
}
func (f FleetRolloutsLogic) RolloutFleet(ctx context.Context) error {
f.log.Infof("Rolling out fleet %s/%s", f.resourceRef.OrgID, f.resourceRef.Name)
templateVersion, err := f.tvStore.GetNewestValid(ctx, f.resourceRef.OrgID, f.resourceRef.Name)
if err != nil {
return fmt.Errorf("failed to get templateVersion: %w", err)
}
failureCount := 0
owner := util.SetResourceOwner(model.FleetKind, f.resourceRef.Name)
listParams := store.ListParams{Owner: owner, Limit: ItemsPerPage}
for {
devices, err := f.devStore.List(ctx, f.resourceRef.OrgID, listParams)
if err != nil {
// TODO: Retry when we have a mechanism that allows it
return fmt.Errorf("failed fetching devices: %w", err)
}
for devIndex := range devices.Items {
device := &devices.Items[devIndex]
err = f.updateDeviceToFleetTemplate(ctx, device, templateVersion)
if err != nil {
f.log.Errorf("failed to update target generation for device %s (fleet %s): %v", *device.Metadata.Name, f.resourceRef.Name, err)
failureCount++
}
}
if devices.Metadata.Continue == nil {
break
} else {
cont, err := store.ParseContinueString(devices.Metadata.Continue)
if err != nil {
return fmt.Errorf("failed to parse continuation for paging: %w", err)
}
listParams.Continue = cont
}
}
if failureCount != 0 {
// TODO: Retry when we have a mechanism that allows it
return fmt.Errorf("failed updating %d devices", failureCount)
}
return nil
}
// The device's owner was changed, roll out if necessary
func (f FleetRolloutsLogic) RolloutDevice(ctx context.Context) error {
f.log.Infof("Rolling out device %s/%s", f.resourceRef.OrgID, f.resourceRef.Name)
device, err := f.devStore.Get(ctx, f.resourceRef.OrgID, f.resourceRef.Name)
if err != nil {
return fmt.Errorf("failed to get device: %w", err)
}
if device.Metadata.Owner == nil || len(*device.Metadata.Owner) == 0 {
return nil
}
if device.Metadata.Annotations != nil {
multipleOwners, ok := (*device.Metadata.Annotations)[model.DeviceAnnotationMultipleOwners]
if ok && len(multipleOwners) > 0 {
f.log.Warnf("Device has multiple owners, skipping rollout: %s", multipleOwners)
}
}
ownerName, isFleetOwner, err := getOwnerFleet(device)
if err != nil {
return fmt.Errorf("failed getting device owner: %w", err)
}
if !isFleetOwner {
return nil
}
templateVersion, err := f.tvStore.GetNewestValid(ctx, f.resourceRef.OrgID, ownerName)
if err != nil {
return fmt.Errorf("failed to get templateVersion: %w", err)
}
return f.updateDeviceToFleetTemplate(ctx, device, templateVersion)
}
func (f FleetRolloutsLogic) updateDeviceToFleetTemplate(ctx context.Context, device *api.Device, templateVersion *api.TemplateVersion) error {
if device.Spec.TemplateVersion != nil && *device.Spec.TemplateVersion == *templateVersion.Metadata.Name {
f.log.Debugf("Not rolling out device %s/%s because it is already at templateVersion %s", f.resourceRef.OrgID, *device.Metadata.Name, *templateVersion.Metadata.Name)
return nil
}
f.log.Infof("Rolling out device %s/%s to templateVersion %s", f.resourceRef.OrgID, *device.Metadata.Name, *templateVersion.Metadata.Name)
device.Spec.TemplateVersion = templateVersion.Metadata.Name
_, _, err := f.devStore.CreateOrUpdate(ctx, f.resourceRef.OrgID, device, nil, false, f.taskManager.DeviceUpdatedCallback)
return err
}