-
Notifications
You must be signed in to change notification settings - Fork 6
/
pod_scaler.go
198 lines (166 loc) · 5.35 KB
/
pod_scaler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package manager
import (
"context"
"encoding/json"
"fmt"
"log/slog"
v1 "github.com/ctrox/zeropod/api/shim/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
const (
CPUAnnotationKey = "zeropod.ctrox.dev/cpu-requests"
MemoryAnnotationKey = "zeropod.ctrox.dev/memory-requests"
)
var (
ScaledDownCPU = resource.MustParse("1m")
ScaledDownMemory = resource.MustParse("1Ki")
)
type containerResource map[string]resource.Quantity
type PodScaler struct {
client client.Client
log *slog.Logger
}
func NewPodScaler() (*PodScaler, error) {
log := slog.With("component", "podscaler")
log.Info("init")
cfg, err := config.GetConfig()
if err != nil {
return nil, err
}
c, err := client.New(cfg, client.Options{})
return &PodScaler{client: c, log: log}, err
}
func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) error {
clog := ps.log.With("container", status.Name, "pod", status.PodName,
"namespace", status.PodNamespace, "phase", status.Phase)
clog.Info("status event")
pod := &corev1.Pod{}
podName := types.NamespacedName{Name: status.PodName, Namespace: status.PodNamespace}
if err := ps.client.Get(ctx, podName, pod); err != nil {
return err
}
updatePod := false
for i, container := range pod.Spec.Containers {
if container.Name != status.Name {
continue
}
_, hasCPU := container.Resources.Requests[corev1.ResourceCPU]
_, hasMemory := container.Resources.Requests[corev1.ResourceMemory]
if !hasCPU || !hasMemory {
clog.Debug("ignoring container without resources")
continue
}
initial, err := ps.initialRequests(container, pod.Annotations)
if err != nil {
return fmt.Errorf("getting initial requests from pod failed: %w", err)
}
current := container.Resources.Requests
if ps.isUpToDate(initial, current, status) {
clog.Debug("container is up to date", "initial", printResources(initial))
continue
}
if err := ps.setAnnotations(pod); err != nil {
return err
}
new := ps.newRequests(initial, current, status)
pod.Spec.Containers[i].Resources.Requests = new
clog.Debug("container needs to be updated", "current", printResources(current), "new", printResources(new))
updatePod = true
}
if !updatePod {
return nil
}
if err := ps.updateRequests(ctx, pod); err != nil {
if errors.IsInvalid(err) {
clog.Error("in-place scaling failed, ensure InPlacePodVerticalScaling feature flag is enabled")
return nil
}
return err
}
return nil
}
func (ps *PodScaler) isUpToDate(initial, current corev1.ResourceList, status *v1.ContainerStatus) bool {
switch status.Phase {
case v1.ContainerPhase_SCALED_DOWN:
return current[corev1.ResourceCPU] == ScaledDownCPU &&
current[corev1.ResourceMemory] == ScaledDownMemory
case v1.ContainerPhase_RUNNING:
return current[corev1.ResourceCPU] == initial[corev1.ResourceCPU] &&
current[corev1.ResourceMemory] == initial[corev1.ResourceMemory]
default:
return true
}
}
func (ps *PodScaler) newRequests(initial, current corev1.ResourceList, status *v1.ContainerStatus) corev1.ResourceList {
switch status.Phase {
case v1.ContainerPhase_SCALED_DOWN:
current[corev1.ResourceCPU] = ScaledDownCPU
current[corev1.ResourceMemory] = ScaledDownMemory
return current
case v1.ContainerPhase_RUNNING:
return initial
default:
return current
}
}
func (ps *PodScaler) initialRequests(container corev1.Container, podAnnotations map[string]string) (corev1.ResourceList, error) {
initial := container.DeepCopy().Resources.Requests
containerCPUs := containerResource{}
if cpuReq, ok := podAnnotations[CPUAnnotationKey]; ok {
if err := json.Unmarshal([]byte(cpuReq), &containerCPUs); err != nil {
return nil, err
}
}
containerMemory := containerResource{}
if memortReq, ok := podAnnotations[MemoryAnnotationKey]; ok {
if err := json.Unmarshal([]byte(memortReq), &containerMemory); err != nil {
return nil, err
}
}
if cpu, ok := containerCPUs[container.Name]; ok {
initial[corev1.ResourceCPU] = cpu
}
if memory, ok := containerMemory[container.Name]; ok {
initial[corev1.ResourceMemory] = memory
}
return initial, nil
}
func (ps *PodScaler) setAnnotations(pod *corev1.Pod) error {
containerCPUs := containerResource{}
containerMemory := containerResource{}
for _, container := range pod.Spec.Containers {
containerCPUs[container.Name] = container.Resources.Requests[corev1.ResourceCPU]
containerMemory[container.Name] = container.Resources.Requests[corev1.ResourceMemory]
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if _, ok := pod.Annotations[CPUAnnotationKey]; !ok {
val, err := json.Marshal(containerCPUs)
if err != nil {
return err
}
pod.Annotations[CPUAnnotationKey] = string(val)
}
if _, ok := pod.Annotations[MemoryAnnotationKey]; !ok {
val, err := json.Marshal(containerMemory)
if err != nil {
return err
}
pod.Annotations[MemoryAnnotationKey] = string(val)
}
return nil
}
func (ps *PodScaler) updateRequests(ctx context.Context, pod *corev1.Pod) error {
return ps.client.Update(ctx, pod)
}
func printResources(res corev1.ResourceList) string {
cpu := res[corev1.ResourceCPU]
memory := res[corev1.ResourceMemory]
return fmt.Sprintf("cpu: %s, memory: %s", cpu.String(), memory.String())
}