/
utils.go
225 lines (199 loc) · 7.75 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package server
import (
"context"
b64 "encoding/base64"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"
encconfig "github.com/containers/ocicrypt/config"
cryptUtils "github.com/containers/ocicrypt/utils"
"github.com/containers/storage/pkg/mount"
"github.com/cri-o/cri-o/internal/log"
"github.com/cri-o/cri-o/server/metrics"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
types "k8s.io/cri-api/pkg/apis/runtime/v1"
kubeletTypes "k8s.io/kubelet/pkg/types"
)
const (
maxLabelSize = 4096
)
func validateLabels(labels map[string]string) error {
for k, v := range labels {
if (len(k) + len(v)) > maxLabelSize {
if len(k) > 10 {
k = k[:10]
}
return fmt.Errorf("label key and value greater than maximum size (%d bytes), key: %s", maxLabelSize, k)
}
}
return nil
}
func mergeEnvs(imageConfig *v1.Image, kubeEnvs []*types.KeyValue) []string {
envs := []string{}
if kubeEnvs == nil && imageConfig != nil {
envs = imageConfig.Config.Env
} else {
for _, item := range kubeEnvs {
if item.Key == "" {
continue
}
envs = append(envs, item.Key+"="+item.Value)
}
if imageConfig != nil {
for _, imageEnv := range imageConfig.Config.Env {
var found bool
parts := strings.SplitN(imageEnv, "=", 2)
if len(parts) != 2 {
continue
}
imageEnvKey := parts[0]
if imageEnvKey == "" {
continue
}
for _, kubeEnv := range envs {
kubeEnvKey := strings.SplitN(kubeEnv, "=", 2)[0]
if kubeEnvKey == "" {
continue
}
if imageEnvKey == kubeEnvKey {
found = true
break
}
}
if !found {
envs = append(envs, imageEnv)
}
}
}
}
return envs
}
// Translate container labels to a description of the container
func translateLabelsToDescription(labels map[string]string) string {
return fmt.Sprintf("%s/%s/%s", labels[kubeletTypes.KubernetesPodNamespaceLabel], labels[kubeletTypes.KubernetesPodNameLabel], labels[kubeletTypes.KubernetesContainerNameLabel])
}
// getDecryptionKeys reads the keys from the given directory
func getDecryptionKeys(keysPath string) (*encconfig.DecryptConfig, error) {
if _, err := os.Stat(keysPath); os.IsNotExist(err) {
logrus.Debugf("Skipping non-existing decryption_keys_path: %s", keysPath)
return &encconfig.DecryptConfig{}, nil
}
base64Keys := []string{}
walkFn := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
// Handle symlinks
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
return errors.New("symbolic links not supported in decryption keys paths")
}
privateKey, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("read private key file: %w", err)
}
sEnc := b64.StdEncoding.EncodeToString(privateKey)
base64Keys = append(base64Keys, sEnc)
return nil
}
if err := filepath.Walk(keysPath, walkFn); err != nil {
return nil, err
}
sortedDc, err := cryptUtils.SortDecryptionKeys(strings.Join(base64Keys, ","))
if err != nil {
return nil, err
}
return encconfig.InitDecryption(sortedDc).DecryptConfig, nil
}
func getSourceMount(source string, mountinfos []*mount.Info) (path, optional string, _ error) {
var res *mount.Info
for _, mi := range mountinfos {
// check if mi can be a parent of source
if strings.HasPrefix(source, mi.Mountpoint) {
// look for a longest one
if res == nil || len(mi.Mountpoint) > len(res.Mountpoint) {
res = mi
}
}
}
if res == nil {
return "", "", fmt.Errorf("could not find source mount of %s", source)
}
return res.Mountpoint, res.Optional, nil
}
func isContextError(err error) bool {
return err == context.Canceled || err == context.DeadlineExceeded
}
func (s *Server) getResourceOrWait(ctx context.Context, name, resourceType string) (string, error) {
ctx, span := log.StartSpan(ctx)
defer span.End()
// In 99% of cases, we shouldn't hit this timeout. Instead, the context should be cancelled.
// This is really to catch an unlikely case where the kubelet doesn't cancel the context.
// Adding on top of the specified deadline ensures this deadline will be respected, regardless of
// how Kubelet's runtime-request-timeout changes.
resourceCreationWaitTime := time.Minute * 4
if initialDeadline, ok := ctx.Deadline(); ok {
resourceCreationWaitTime += time.Until(initialDeadline)
}
if cachedID := s.resourceStore.Get(name); cachedID != "" {
log.Infof(ctx, "Found %s %s with ID %s in resource cache; using it", resourceType, name, cachedID)
return cachedID, nil
}
watcher, stage := s.resourceStore.WatcherForResource(name)
if watcher == nil {
return "", fmt.Errorf("error attempting to watch for %s %s: no longer found", resourceType, name)
}
log.Infof(ctx, "Creation of %s %s not yet finished. Currently at stage %v. Waiting up to %v for it to finish", resourceType, name, stage, resourceCreationWaitTime)
metrics.Instance().MetricResourcesStalledAtStage(stage)
var err error
select {
// We should wait as long as we can (within reason), thus stalling the kubelet's sync loop.
// This will prevent "name is reserved" errors popping up every two seconds.
case <-ctx.Done():
err = ctx.Err()
// This is probably overly cautious, but it doesn't hurt to have a way to terminate
// independent of the kubelet's signal.
case <-time.After(resourceCreationWaitTime):
err = fmt.Errorf("waited too long for request to timeout or %s %s to be created", resourceType, name)
// If the resource becomes available while we're watching for it, we still need to error on this request.
// When we pull the resource from the cache after waiting, we won't run the cleanup funcs.
// However, we don't know how long we've been making the kubelet wait for the request, and the request could time outt
// after we stop paying attention. This would cause CRI-O to attempt to send back a resource that the kubelet
// will not receive, causing a resource leak.
case <-watcher:
// We need to wait again here. If we error out to the Kubelet before it times out
// it will bump the attempt number, nulllifying all of the work we've done so far.
// Just the same as above, use resourceCreationWaitTime to make sure we catch cases where the context
// is never done.
select {
case <-time.After(resourceCreationWaitTime):
case <-ctx.Done():
}
err = fmt.Errorf("the requested %s %s is now ready and will be provided to the kubelet on next retry", resourceType, name)
}
return "", fmt.Errorf("kubelet may be retrying requests that are timing out in CRI-O due to system load. Currently at stage %v: %w", stage, err)
}
// FilterDisallowedAnnotations is a common place to have a map of annotations filtered for both runtimes and workloads.
// This function exists until the support for runtime level allowed annotations is dropped.
// toFind is used to find the workload for the specific pod or container, toFilter are the annotations
// for which disallowed annotations will be filtered. They may be the same.
// After this function, toFilter will no longer container disallowed annotations.
func (s *Server) FilterDisallowedAnnotations(toFind, toFilter map[string]string, runtimeHandler string) error {
// Only one of these Filter* will actually do any filtering, as the runtime DisallowedAnnotations
// were scrubbed at the config validation step if there were workload AllowedAnnotations configured.
// When runtime level allowed annotations are deprecated, this will be dropped.
// TODO: eventually, this should be in the container package, but it's going through a lot of churn
// and SpecAddAnnotations is already passed too many arguments
allowed, err := s.Runtime().AllowedAnnotations(runtimeHandler)
if err != nil {
return err
}
allowed = append(allowed, s.config.Workloads.AllowedAnnotations(toFind)...)
return s.config.Workloads.FilterDisallowedAnnotations(allowed, toFilter)
}