forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstatus.go
202 lines (175 loc) · 6.53 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
"fmt"
"sync"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
"k8s.io/kubernetes/pkg/kubelet/metrics"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
const (
// LoadError indicates that the Kubelet failed to load the config checkpoint
LoadError = "failed to load config, see Kubelet log for details"
// ValidateError indicates that the Kubelet failed to validate the config checkpoint
ValidateError = "failed to validate config, see Kubelet log for details"
// AllNilSubfieldsError is used when no subfields are set
// This could happen in the case that an old client tries to read an object from a newer API server with a set subfield it does not know about
AllNilSubfieldsError = "invalid NodeConfigSource, exactly one subfield must be non-nil, but all were nil"
// DownloadError is used when the download fails, e.g. due to network issues
DownloadError = "failed to download config, see Kubelet log for details"
// InternalError indicates that some internal error happened while trying to sync config, e.g. filesystem issues
InternalError = "internal failure, see Kubelet log for details"
// SyncErrorFmt is used when the system couldn't sync the config, due to a malformed Node.Spec.ConfigSource, a download failure, etc.
SyncErrorFmt = "failed to sync: %s"
)
// NodeConfigStatus represents Node.Status.Config
type NodeConfigStatus interface {
// SetActive sets the active source in the status
SetActive(source *apiv1.NodeConfigSource)
// SetAssigned sets the assigned source in the status
SetAssigned(source *apiv1.NodeConfigSource)
// SetLastKnownGood sets the last-known-good source in the status
SetLastKnownGood(source *apiv1.NodeConfigSource)
// SetError sets the error associated with the status
SetError(err string)
// SetErrorOverride sets an error that overrides the base error set by SetError.
// If the override is set to the empty string, the base error is reported in
// the status, otherwise the override is reported.
SetErrorOverride(err string)
// Sync patches the current status into the Node identified by `nodeName` if an update is pending
Sync(client clientset.Interface, nodeName string)
}
type nodeConfigStatus struct {
// status is the core NodeConfigStatus that we report
status apiv1.NodeConfigStatus
// mux is a mutex on the nodeConfigStatus, alternate between setting and syncing the status
mux sync.Mutex
// errorOverride is sent in place of the usual error if it is non-empty
errorOverride string
// syncCh; write to this channel to indicate that the status needs to be synced to the API server
syncCh chan bool
}
// NewNodeConfigStatus returns a new NodeConfigStatus interface
func NewNodeConfigStatus() NodeConfigStatus {
// channels must have capacity at least 1, since we signal with non-blocking writes
syncCh := make(chan bool, 1)
// prime new status managers to sync with the API server on the first call to Sync
syncCh <- true
return &nodeConfigStatus{
syncCh: syncCh,
}
}
// transact grabs the lock, performs the fn, records the need to sync, and releases the lock
func (s *nodeConfigStatus) transact(fn func()) {
s.mux.Lock()
defer s.mux.Unlock()
fn()
s.sync()
}
func (s *nodeConfigStatus) SetAssigned(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.Assigned = source
})
}
func (s *nodeConfigStatus) SetActive(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.Active = source
})
}
func (s *nodeConfigStatus) SetLastKnownGood(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.LastKnownGood = source
})
}
func (s *nodeConfigStatus) SetError(err string) {
s.transact(func() {
s.status.Error = err
})
}
func (s *nodeConfigStatus) SetErrorOverride(err string) {
s.transact(func() {
s.errorOverride = err
})
}
// sync notes that the status needs to be synced to the API server
func (s *nodeConfigStatus) sync() {
select {
case s.syncCh <- true:
default:
}
}
// Sync attempts to sync the status with the Node object for this Kubelet,
// if syncing fails, an error is logged, and work is queued for retry.
func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) {
select {
case <-s.syncCh:
default:
// no work to be done, return
return
}
utillog.Infof("updating Node.Status.Config")
// grab the lock
s.mux.Lock()
defer s.mux.Unlock()
// if the sync fails, we want to retry
var err error
defer func() {
if err != nil {
utillog.Errorf(err.Error())
s.sync()
}
}()
// get the Node so we can check the current status
oldNode, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("could not get Node %q, will not sync status, error: %v", nodeName, err)
return
}
status := &s.status
// override error, if necessary
if len(s.errorOverride) > 0 {
// copy the status, so we don't overwrite the prior error
// with the override
status = status.DeepCopy()
status.Error = s.errorOverride
}
// update metrics based on the status we will sync
metrics.SetConfigError(len(status.Error) > 0)
err = metrics.SetAssignedConfig(status.Assigned)
if err != nil {
err = fmt.Errorf("failed to update Assigned config metric, error: %v", err)
return
}
err = metrics.SetActiveConfig(status.Active)
if err != nil {
err = fmt.Errorf("failed to update Active config metric, error: %v", err)
return
}
err = metrics.SetLastKnownGoodConfig(status.LastKnownGood)
if err != nil {
err = fmt.Errorf("failed to update LastKnownGood config metric, error: %v", err)
return
}
// apply the status to a copy of the node so we don't modify the object in the informer's store
newNode := oldNode.DeepCopy()
newNode.Status.Config = status
// patch the node with the new status
if _, _, err := nodeutil.PatchNodeStatus(client.CoreV1(), types.NodeName(nodeName), oldNode, newNode); err != nil {
utillog.Errorf("failed to patch node status, error: %v", err)
}
}