-
Notifications
You must be signed in to change notification settings - Fork 450
/
kubelet.go
260 lines (228 loc) · 9.3 KB
/
kubelet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0
package healthcheck
import (
"context"
"fmt"
"net"
"net/http"
"net/netip"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
v1beta1constants "github.com/gardener/gardener/pkg/apis/core/v1beta1/constants"
"github.com/gardener/gardener/pkg/nodeagent/dbus"
)
const (
// defaultKubeletHealthEndpoint is the health endpoint of the kubelet.
defaultKubeletHealthEndpoint = "http://127.0.0.1:10248/healthz"
// maxToggles defines how often the kubelet can change the readiness during toggleTimeSpan until the node will be rebooted.
maxToggles = 5
// toggleTimeSpan is a floating time window where the kubelet readiness toggles are considered harmful.
toggleTimeSpan = 10 * time.Minute
)
// KubeletHealthChecker configures the kubelet healthcheck.
type KubeletHealthChecker struct {
// Clock exported for testing.
Clock clock.Clock
// KubeletReadinessToggles contains an entry for every toggle between NotReady->Ready state.
KubeletReadinessToggles []time.Time
// NodeReady indicates if the node is ready. Exported for testing.
NodeReady bool
client client.Client
httpClient *http.Client
firstFailure *time.Time
dbus dbus.DBus
recorder record.EventRecorder
lastInternalIP netip.Addr
getAddresses func() ([]net.Addr, error)
kubeletHealthEndpoint string
}
// NewKubeletHealthChecker create an instance of a kubelet health check.
func NewKubeletHealthChecker(client client.Client, clock clock.Clock, dbus dbus.DBus, recorder record.EventRecorder, getAddresses func() ([]net.Addr, error)) *KubeletHealthChecker {
return &KubeletHealthChecker{
client: client,
httpClient: &http.Client{Timeout: 10 * time.Second},
dbus: dbus,
Clock: clock,
recorder: recorder,
getAddresses: getAddresses,
KubeletReadinessToggles: []time.Time{},
kubeletHealthEndpoint: defaultKubeletHealthEndpoint,
}
}
// Name returns the name of this health check.
func (*KubeletHealthChecker) Name() string {
return "kubelet"
}
// HasLastInternalIP returns true if the node.InternalIP was stored.
// Exported for testing.
func (k *KubeletHealthChecker) HasLastInternalIP() bool {
return k.lastInternalIP.IsValid()
}
// SetKubeletHealthEndpoint set the kubeletHealthEndpoint.
// Exported for testing.
func (k *KubeletHealthChecker) SetKubeletHealthEndpoint(kubeletHealthEndpoint string) {
k.kubeletHealthEndpoint = kubeletHealthEndpoint
}
// Check performs the actual health check for the kubelet.
func (k *KubeletHealthChecker) Check(ctx context.Context, node *corev1.Node) error {
log := logf.FromContext(ctx).WithName(k.Name())
if err := k.verifyNodeReady(log, node); err != nil {
return err
}
if err := k.ensureNodeInternalIP(ctx, node); err != nil {
return err
}
request, err := http.NewRequestWithContext(ctx, http.MethodGet, k.kubeletHealthEndpoint, nil)
if err != nil {
log.Error(err, "Creating request to kubelet health endpoint failed")
return err
}
response, err := k.httpClient.Do(request)
if err != nil {
log.Error(err, "HTTP request to kubelet health endpoint failed")
}
if err == nil && response.StatusCode == http.StatusOK {
if k.firstFailure != nil {
log.Info("Kubelet is healthy again", "statusCode", response.StatusCode)
k.recorder.Event(node, corev1.EventTypeNormal, "kubelet", "Kubelet is healthy")
k.firstFailure = nil
}
return nil
}
if k.firstFailure == nil {
now := k.Clock.Now()
k.firstFailure = &now
log.Error(err, "Kubelet is unhealthy")
k.recorder.Eventf(node, corev1.EventTypeWarning, "kubelet", "Kubelet is unhealthy, health check error: %s", err.Error())
}
if time.Since(*k.firstFailure).Abs() < maxFailureDuration {
return nil
}
log.Error(err, "Kubelet is unhealthy, restarting it", "failureDuration", maxFailureDuration)
k.recorder.Eventf(node, corev1.EventTypeWarning, "kubelet", "Kubelet is unhealthy for more than %s, restarting it. Health check error: %s", maxFailureDuration, err.Error())
err = k.dbus.Restart(ctx, k.recorder, node, v1beta1constants.OperatingSystemConfigUnitNameKubeletService)
if err == nil {
k.firstFailure = nil
}
return err
}
// ensureNodeInternalIP restores the internalIP of the node if this was initially set but lost in the process.
// This happens if Kubelet runs into a timeout when contacting the cloud provider API during start-up, see https://github.com/gardener/gardener/commit/1311de43a1745cbc8cf65d57c72e9ed0a2c5e586#diff-738db1352694482843441061260a6f02.
func (k *KubeletHealthChecker) ensureNodeInternalIP(ctx context.Context, node *corev1.Node) error {
var (
log = logf.FromContext(ctx).WithName(k.Name())
externalIP string
internalIP string
)
for _, addr := range node.Status.Addresses {
switch addr.Type {
case corev1.NodeExternalIP:
externalIP = addr.Address
case corev1.NodeInternalIP:
internalIP = addr.Address
default:
// ignore
}
}
if externalIP == "" && internalIP == "" {
if k.lastInternalIP.IsValid() {
k.recorder.Eventf(node, corev1.EventTypeWarning, "kubelet", "Node status does neither have an internal nor an external IP, try to recover from last known internal IP:%s", k.lastInternalIP)
} else {
k.recorder.Event(node, corev1.EventTypeWarning, "kubelet", "Node status does neither have an internal nor an external IP")
}
addresses, err := k.getAddresses()
if err != nil {
return fmt.Errorf("unable to list all network interface IP addresses")
}
for _, addr := range addresses {
parsed, err := netip.ParsePrefix(addr.String())
if err != nil {
return fmt.Errorf("unable to parse IP address %w", err)
}
parsedIP := parsed.Addr()
if parsedIP.Compare(k.lastInternalIP) != 0 {
continue
}
// One of the ip addresses on the node matches the previous set internalIP of the node which is now gone, set it again.
node.Status.Addresses = []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: k.lastInternalIP.String(),
},
}
err = k.client.Status().Update(ctx, node)
if err != nil {
if !apierrors.IsConflict(err) {
log.Error(err, "Unable to update node status with internal IP")
k.recorder.Eventf(node, corev1.EventTypeWarning, "kubelet", "Unable to update node status with internal IP: %s", err.Error())
return k.dbus.Restart(ctx, k.recorder, node, v1beta1constants.OperatingSystemConfigUnitNameKubeletService)
}
return err
}
log.Info("Updated internal IP address of node status", "ip", k.lastInternalIP.String())
k.recorder.Eventf(node, corev1.EventTypeNormal, "kubelet", "Updated the lost internal IP address of node status to the previous known: %s ", k.lastInternalIP.String())
}
} else if internalIP != "" {
var err error
k.lastInternalIP, err = netip.ParseAddr(internalIP)
if err != nil {
return fmt.Errorf("unable to parse internal IP address %w", err)
}
}
return nil
}
// verifyNodeReady verifies the NodeReady condition of a node.
// If the condition changes 5 times within 10 minutes from NotReady->Ready the node will be rebooted.
func (k *KubeletHealthChecker) verifyNodeReady(log logr.Logger, node *corev1.Node) error {
if isNodeReady(node) && !k.NodeReady {
needsReboot := k.ToggleKubeletState()
log.Info("Kubelet became Ready", "readinessChanges", len(k.KubeletReadinessToggles), "timespan", toggleTimeSpan)
if needsReboot {
log.Info("Kubelet toggled between NotReady and Ready too often. Rebooting the node now")
k.recorder.Eventf(node, corev1.EventTypeWarning, "kubelet", "Kubelet toggled between NotReady and Ready at least %d times in a %s time window. Rebooting the node now", maxToggles, toggleTimeSpan)
if err := k.dbus.Reboot(); err != nil {
k.RevertToggleKubeletState()
k.recorder.Event(node, corev1.EventTypeWarning, "kubelet", "Rebooting the node failed")
return fmt.Errorf("rebooting the node failed %w", err)
}
}
}
k.NodeReady = isNodeReady(node)
return nil
}
// ToggleKubeletState should be triggered if the state of the kubelet changed from NotReady -> Ready.
// It returns true if a reboot of the node should be triggered.
func (k *KubeletHealthChecker) ToggleKubeletState() bool {
// Remove entries older toggleTimeSpan.
for i := len(k.KubeletReadinessToggles); i > 0; i-- {
if k.Clock.Since(k.KubeletReadinessToggles[i-1]).Abs() > toggleTimeSpan.Abs() {
k.KubeletReadinessToggles = k.KubeletReadinessToggles[i:]
break
}
}
k.KubeletReadinessToggles = append(k.KubeletReadinessToggles, k.Clock.Now())
return len(k.KubeletReadinessToggles) >= maxToggles
}
// RevertToggleKubeletState removes the last entry created by ToggleKubeletState().
func (k *KubeletHealthChecker) RevertToggleKubeletState() {
if i := len(k.KubeletReadinessToggles); i > 0 {
k.KubeletReadinessToggles = k.KubeletReadinessToggles[:i-1]
}
}
// isNodeReady returns true if a node is ready; false otherwise.
func isNodeReady(node *corev1.Node) bool {
for _, c := range node.Status.Conditions {
if c.Type == corev1.NodeReady {
return c.Status == corev1.ConditionTrue
}
}
return false
}