/
create.go
254 lines (203 loc) · 9.21 KB
/
create.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package drainer
import (
"context"
"fmt"
"time"
"github.com/giantswarm/errors/tenant"
"github.com/giantswarm/k8sclient/v7/pkg/k8sclient"
"github.com/giantswarm/microerror"
"github.com/giantswarm/tenantcluster/v5/pkg/tenantcluster"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
v1alpha1 "github.com/giantswarm/node-operator/api"
"github.com/giantswarm/node-operator/service/controller/key"
)
const (
// UnschedulablePatch is the JSON patch structure being applied to nodes using
// a strategic merge patch in order to drain them.
UnschedulablePatch = `{"spec":{"unschedulable":true}}`
)
func (r *Resource) EnsureCreated(ctx context.Context, obj interface{}) error {
drainerConfig, err := key.ToDrainerConfig(obj)
if err != nil {
return microerror.Mask(err)
}
if drainerConfig.Status.HasDrainedCondition() {
r.logger.LogCtx(ctx, "level", "debug", "message", "drainer config status has drained condition")
r.logger.LogCtx(ctx, "level", "debug", "message", "canceling resource")
return nil
}
if drainerConfig.Status.HasTimeoutCondition() {
r.logger.LogCtx(ctx, "level", "debug", "message", "drainer config status has timeout condition")
r.logger.LogCtx(ctx, "level", "debug", "message", "canceling resource")
return nil
}
if drainingTimedOut(drainerConfig, time.Now(), 60*time.Minute) {
r.logger.LogCtx(ctx, "level", "debug", "message", "drainer config exists for too long without draining being finished")
r.logger.LogCtx(ctx, "level", "debug", "message", "setting drainer config status of tenant cluster node to timeout condition")
drainerConfig.Status.Conditions = append(drainerConfig.Status.Conditions, drainerConfig.Status.NewTimeoutCondition())
err := r.client.Status().Update(ctx, &drainerConfig)
if err != nil {
return microerror.Mask(err)
}
r.logger.LogCtx(ctx, "level", "debug", "message", "set drainer config status of tenant cluster node to timeout condition")
r.logger.LogCtx(ctx, "level", "debug", "message", "canceling resource")
return nil
}
var restConfig *rest.Config
{
i := key.ClusterIDFromDrainerConfig(drainerConfig)
e := key.ClusterEndpointFromDrainerConfig(drainerConfig)
restConfig, err = r.tenantCluster.NewRestConfig(ctx, i, e)
if tenantcluster.IsTimeout(err) {
r.logger.LogCtx(ctx, "level", "debug", "message", "fetching certificates timed out")
r.logger.LogCtx(ctx, "level", "debug", "message", "canceling resource")
return nil
} else if err != nil {
return microerror.Mask(err)
}
}
var k8sClient kubernetes.Interface
{
c := k8sclient.ClientsConfig{
Logger: r.logger,
RestConfig: restConfig,
}
k8sClients, err := k8sclient.NewClients(c)
if tenant.IsAPINotAvailable(err) {
r.logger.LogCtx(ctx, "level", "debug", "message", "tenant cluster API is not available")
r.logger.LogCtx(ctx, "level", "debug", "message", "canceling resource")
return nil
} else if err != nil {
return microerror.Mask(err)
}
k8sClient = k8sClients.K8sClient()
}
{
r.logger.LogCtx(ctx, "level", "debug", "message", "cordoning tenant cluster node")
n := key.NodeNameFromDrainerConfig(drainerConfig)
t := types.StrategicMergePatchType
p := []byte(UnschedulablePatch)
_, err := k8sClient.CoreV1().Nodes().Patch(ctx, n, t, p, metav1.PatchOptions{})
if tenant.IsAPINotAvailable(err) {
r.logger.LogCtx(ctx, "level", "debug", "message", "tenant cluster API is not available")
r.logger.LogCtx(ctx, "level", "debug", "message", "canceling resource")
return nil
} else if apierrors.IsNotFound(err) {
// It might happen the node we want to drain got already removed. This
// might even be due to human intervention. In case we cannot find the
// node we assume the draining was successful and set the drainer config
// status accordingly.
r.logger.LogCtx(ctx, "level", "debug", "message", "tenant cluster node not found")
r.logger.LogCtx(ctx, "level", "debug", "message", "setting drainer config status of tenant cluster node to drained condition")
drainerConfig.Status.Conditions = append(drainerConfig.Status.Conditions, drainerConfig.Status.NewDrainedCondition())
err := r.client.Status().Update(ctx, &drainerConfig)
if err != nil {
return microerror.Mask(err)
}
r.logger.LogCtx(ctx, "level", "debug", "message", "set drainer config status of tenant cluster node to drained condition")
r.logger.LogCtx(ctx, "level", "debug", "message", "canceling resource")
return nil
} else if err != nil {
return microerror.Mask(err)
}
r.logger.LogCtx(ctx, "level", "debug", "message", "cordoned tenant cluster node")
}
var customPods []v1.Pod
var systemPods []v1.Pod
{
r.logger.LogCtx(ctx, "level", "debug", "message", "looking for all pods running on the tenant cluster node")
fieldSelector := fields.SelectorFromSet(fields.Set{
"spec.nodeName": key.NodeNameFromDrainerConfig(drainerConfig),
})
listOptions := metav1.ListOptions{
FieldSelector: fieldSelector.String(),
}
podList, err := k8sClient.CoreV1().Pods(v1.NamespaceAll).List(ctx, listOptions)
if err != nil {
return microerror.Mask(err)
}
for _, p := range podList.Items {
if key.IsCriticalPod(p.Name) {
// ignore critical pods (api, controller-manager and scheduler)
// they are static pods so kubelet will recreate them anyway and it can cause other issues
continue
}
if key.IsDaemonSetPod(p) {
// ignore daemonSet owned pods
// daemonSets pod are recreated even on unschedulable node so draining doesn't make sense
// we are aligning here with community as 'kubectl drain' also ignore them
continue
}
if key.IsDeploymentPod(p.Name) {
// ignore special deployments (cert-exporter-deployment)
continue
}
if key.IsEvictedPod(p) {
// we don't need to care about already evicted pods
continue
}
if p.GetNamespace() == "kube-system" {
systemPods = append(systemPods, p)
} else {
customPods = append(customPods, p)
}
}
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("found %d pods running custom workloads", len(customPods)))
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("found %d pods running system workloads", len(systemPods)))
}
if len(customPods) > 0 {
r.logger.LogCtx(ctx, "level", "debug", "message", "sending eviction to all pods running custom workloads")
for _, p := range customPods {
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("sending eviction to pod %#q", fmt.Sprintf("%s/%s", p.GetNamespace(), p.GetName())))
err := evictPod(ctx, k8sClient, p)
if IsCannotEvictPod(err) {
r.logger.LogCtx(ctx, "level", "warning", "message", fmt.Sprintf("cannot evict pod %#q due to disruption budget", p.GetName()))
continue
} else if err != nil {
return microerror.Mask(err)
}
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("sent eviction to pod %#q", fmt.Sprintf("%s/%s", p.GetNamespace(), p.GetName())))
}
r.logger.LogCtx(ctx, "level", "debug", "message", "sent eviction to all pods running custom workloads")
} else {
r.logger.LogCtx(ctx, "level", "debug", "message", "no pods running custom workloads to send evictions to")
}
// evict systemPods after all customPods are evicted
if len(systemPods) > 0 && len(customPods) == 0 {
r.logger.LogCtx(ctx, "level", "debug", "message", "sending eviction to all pods running system workloads")
for _, p := range systemPods {
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("sending eviction to pod %#q", fmt.Sprintf("%s/%s", p.GetNamespace(), p.GetName())))
err := evictPod(ctx, k8sClient, p)
if IsCannotEvictPod(err) {
r.logger.LogCtx(ctx, "level", "warning", "message", fmt.Sprintf("cannot evict pod %#q due to disruption budget", p.GetName()))
continue
} else if err != nil {
return microerror.Mask(err)
}
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("sent eviction to pod %#q", fmt.Sprintf("%s/%s", p.GetNamespace(), p.GetName())))
}
r.logger.LogCtx(ctx, "level", "debug", "message", "sent eviction to all pods running system workloads")
} else {
r.logger.LogCtx(ctx, "level", "debug", "message", "no pods running system workloads to send evictions to")
}
// When all pods are evicted from the tenant node, set the CR status to drained.
if len(systemPods) == 0 && len(customPods) == 0 {
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("setting drainer config status of node in tenant cluster '%s' to drained condition", key.ClusterIDFromDrainerConfig(drainerConfig)))
drainerConfig.Status.Conditions = append(drainerConfig.Status.Conditions, drainerConfig.Status.NewDrainedCondition())
err := r.client.Status().Update(ctx, &drainerConfig)
if err != nil {
return microerror.Mask(err)
}
r.logger.LogCtx(ctx, "level", "debug", "message", fmt.Sprintf("set drainer config status of node in tenant cluster '%s' to drained condition", key.ClusterIDFromDrainerConfig(drainerConfig)))
}
return nil
}
func drainingTimedOut(drainerConfig v1alpha1.DrainerConfig, now time.Time, timeout time.Duration) bool {
return !drainerConfig.GetCreationTimestamp().Add(timeout).After(now)
}