/
actions.go
354 lines (314 loc) · 11.5 KB
/
actions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
package controllers
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
cnsbench "github.com/cnsbench/cnsbench/api/v1alpha1"
snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/v2/pkg/apis/volumesnapshot/v1beta1"
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/v2/pkg/client/clientset/versioned/scheme"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/kubernetes/scheme"
utilptr "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)
func (r *BenchmarkReconciler) createObj(bm *cnsbench.Benchmark, obj client.Object, makeOwner bool) error {
name, err := meta.NewAccessor().Name(obj)
kind, err := meta.NewAccessor().Kind(obj)
objMeta, err := meta.Accessor(obj)
if err != nil {
r.Log.Error(err, "Error getting ObjectMeta from obj", "name", name)
return err
}
if makeOwner {
if err := controllerutil.SetControllerReference(bm, objMeta, r.Scheme); err != nil {
r.Log.Error(err, "Error making object child of Benchmark", "name", name)
return err
}
if err := controllerutil.SetOwnerReference(bm, objMeta, r.Scheme); err != nil {
r.Log.Error(err, "Error making object child of Benchmark")
return err
}
}
for _, x := range scheme.Codecs.SupportedMediaTypes() {
if x.MediaType == "application/yaml" {
ptBytes, err := runtime.Encode(x.Serializer, obj)
if err != nil {
r.Log.Error(err, "Error encoding spec")
}
fmt.Println(string(ptBytes))
}
}
if err := r.Client.Create(context.TODO(), obj); err != nil {
if errors.IsAlreadyExists(err) {
r.Log.Info("Object already exists, proceeding", "name", name)
} else {
return err
}
}
r.metric(bm, "createObj", "name", name, "kind", kind)
err = r.controller.Watch(&source.Kind{Type: obj}, &handler.EnqueueRequestForOwner{
IsController: false,
OwnerType: &cnsbench.Benchmark{},
})
if err != nil {
r.Log.Error(err, "Watching")
return err
}
return nil
}
func (r *BenchmarkReconciler) CreateVolume(bm *cnsbench.Benchmark, vol cnsbench.Volume) {
// XXX This might be called because a rate fired, in which case there might
// already be a volume - need to check what the last volume number is and
// count from <last vol> to count+<last vol>
for c := 0; c < vol.Count; c++ {
name := vol.Name
if vol.Count > 1 {
name += "-" + strconv.Itoa(c)
}
pvc := corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Labels: map[string]string{
"volumename": vol.Name,
},
},
Spec: vol.Spec,
}
if err := r.createObj(bm, client.Object(&pvc), true); err != nil {
r.Log.Error(err, "Creating volume")
}
}
}
func (r *BenchmarkReconciler) RunWorkload(bm *cnsbench.Benchmark, a cnsbench.Workload, workloadName string) error {
cm := &corev1.ConfigMap{}
err := r.Client.Get(context.TODO(), client.ObjectKey{Name: a.Workload, Namespace: LIBRARY_NAMESPACE}, cm)
if err != nil {
r.Log.Error(err, "Error getting ConfigMap", a.Workload)
return err
}
// hack to make sure parsers are created first, they need to exist before any workload that
// uses them is created
for k := range cm.Data {
if !strings.Contains(k, "parse") {
continue
}
if err := r.prepareAndRun(bm, 0, workloadName, a, cm, []byte(cm.Data[k])); err != nil {
return err
}
}
for k := range cm.Data {
if strings.Contains(k, "parse") {
continue
}
if _, ok := r.workloadInstance[a.Workload]; !ok {
r.workloadInstance[a.Workload] = -1
}
for w := 0; w < a.Count; w++ {
r.workloadInstance[a.Workload] += 1
if err := r.prepareAndRun(bm, r.workloadInstance[a.Workload], workloadName, a, cm, []byte(cm.Data[k])); err != nil {
return err
}
}
}
return nil
}
func (r *BenchmarkReconciler) CreateSnapshot(bm *cnsbench.Benchmark, s cnsbench.Snapshot, actionName string) error {
ls := &metav1.LabelSelector{}
if s.WorkloadName != "" {
ls = metav1.AddLabelToSelector(ls, "workloadname", s.WorkloadName)
} else if s.VolumeName != "" {
ls = metav1.AddLabelToSelector(ls, "volumename", s.VolumeName)
}
selector, err := metav1.LabelSelectorAsSelector(ls)
if err != nil {
return err
}
pvcs := &corev1.PersistentVolumeClaimList{}
if err := r.Client.List(context.TODO(), pvcs, &client.ListOptions{Namespace: "default", LabelSelector: selector}); err != nil {
return err
}
snapshotscheme.AddToScheme(scheme.Scheme)
// Takes a snapshot of every volume matching the given selector
for _, pvc := range pvcs.Items {
volName := pvc.Name
name := names.NameGenerator.GenerateName(names.SimpleNameGenerator, bm.ObjectMeta.Name+"-snapshot-")
snap := snapshotv1beta1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Labels: map[string]string{
"workloadname": actionName,
},
},
Spec: snapshotv1beta1.VolumeSnapshotSpec{
VolumeSnapshotClassName: &s.SnapshotClass,
Source: snapshotv1beta1.VolumeSnapshotSource{
PersistentVolumeClaimName: &volName,
},
},
}
if err := r.createObj(bm, client.Object(&snap), false); err != nil {
r.Log.Error(err, "Creating snapshot")
}
}
return nil
}
func (r *BenchmarkReconciler) DeleteObj(bm *cnsbench.Benchmark, d cnsbench.Delete) error {
// TODO: Generalize to more than just snapshots. I think we need to get all the api groups,
// then get all the kinds in those groups, then just iterate through those kinds searching
// for objects matching the spec
// See https://godoc.org/k8s.io/client-go/discovery
r.Log.Info("Delete object")
labelSelector, err := metav1.LabelSelectorAsSelector(&d.Selector)
if err != nil {
return err
}
objList := &unstructured.UnstructuredList{}
objList.SetAPIVersion(d.APIVersion)
objList.SetKind(d.Kind)
if err := r.Client.List(context.TODO(), objList, &client.ListOptions{Namespace: "default", LabelSelector: labelSelector}); err != nil {
return err
}
sort.Slice(objList.Items, func(i, j int) bool {
return objList.Items[i].GetCreationTimestamp().Unix() < objList.Items[j].GetCreationTimestamp().Unix()
})
if len(objList.Items) > 0 {
r.Log.Info("Deleting first item", "name", objList.Items[0].GetName(), "createtime", objList.Items[0].GetCreationTimestamp().Unix())
return r.Client.Delete(context.TODO(), &objList.Items[0])
}
r.Log.Info("No objects found")
return nil
}
func (r *BenchmarkReconciler) ScaleObj(bm *cnsbench.Benchmark, s cnsbench.Scale, numReplicas int) error {
// For now, the way this works is: a configmap in the library namespace
// contains scripts for scaling up/down an object. In a Scale control
// op spec, the user specifies the name of the object they want to
// scale and the name of this configmap in the library. We clone the
// configmap in to the default namespace, create a pod that attaches
// that configmap, and run the scale script in that pod.
//
// Once we are able to include more complicated objects (i.e.,
// non-default resource types) in a workload, then we will add a field
// to the Scale spec that references a Workload in the Benchmark (like
// the WorkloadName field of a Snapshot control op spec). That will
// let us lookup the target object (so the user doesn't need to specify
// the actual name of the object to be scaled), and the workload spec
// should include the scale scripts so we can look those up as well.
//
// Summary: Currently, a user must supply both the name of the object
// to be scaled and the name of a configmap in the library namespace
// that has scripts for scaling that object. In the future, they will
// be able to instead just supply the name of a workload that CNSBench
// has already instantiated, and CNSBench will be able to lookup both
// the scale scripts and the target object from that alone.
// TODO: Check to see if a copy of the scale script configmap already
// exists, use that if so.
scriptsCMName, err := r.cloneParser(bm, s.ScaleScripts)
if err != nil {
return err
}
name := names.NameGenerator.GenerateName(names.SimpleNameGenerator, "scale-pod-")
scalePod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Labels: map[string]string{
"app": "scale-pod",
},
},
Spec: corev1.PodSpec{
RestartPolicy: "Never",
ServiceAccountName: s.ServiceAccountName,
Containers: []corev1.Container{
{
Name: "scale-container",
Image: "cnsbench/kubectl-container",
Command: []string{"/scripts/scale.sh", s.ObjName, strconv.Itoa(numReplicas)},
VolumeMounts: []corev1.VolumeMount{
{
MountPath: "/scripts/",
Name: "scale-script",
},
},
},
},
},
}
scaleScriptCM := corev1.ConfigMapVolumeSource{}
scaleScriptCM.DefaultMode = utilptr.Int32Ptr(0777)
scaleScriptCM.Name = scriptsCMName
scaleScriptVol := corev1.Volume{}
scaleScriptVol.Name = "scale-script"
scaleScriptVol.ConfigMap = &scaleScriptCM
scalePod.Spec.Volumes = append(scalePod.Spec.Volumes, scaleScriptVol)
if err := controllerutil.SetControllerReference(bm, scalePod, r.Scheme); err != nil {
r.Log.Error(err, "Error making object child of Benchmark", "name", name)
return err
}
if err := controllerutil.SetOwnerReference(bm, scalePod, r.Scheme); err != nil {
r.Log.Error(err, "Error making object child of Benchmark")
return err
}
if err := r.Client.Create(context.TODO(), scalePod); err != nil {
return err
}
return err
}
func (r *BenchmarkReconciler) ReconcileInstances(bm *cnsbench.Benchmark, workloads []cnsbench.Workload) error {
var err error
cm := &corev1.ConfigMap{}
accessor := meta.NewAccessor()
for _, a := range workloads {
fmt.Println(a)
// Check how many workloads are complete and how many exist (running or otherwise) but aren't complete
workloadsNeeded := 0
var workloadsComplete, workloadsNotComplete int
if workloadsComplete, workloadsNotComplete, err = CountCompletions(r.Client, a.Name); err != nil {
return err
} else {
workloadsNeeded = a.Count - workloadsNotComplete
}
if workloadsNeeded <= 0 {
continue
}
r.Log.Info("ReconcileInstances", "Workloads needed", workloadsNeeded, "complete", workloadsComplete, "not complete", workloadsNotComplete)
// Fewer non-complete workloads exist than "count", so need to create more workload instances
if err := r.Client.Get(context.TODO(), client.ObjectKey{Name: a.Workload, Namespace: LIBRARY_NAMESPACE}, cm); err != nil {
return err
}
for k := range cm.Data {
// We need to decode the configmap to get the workload object's annotations
// But we won't actually use the decoded yaml to instantiate anything, so
// just use "0" for the workload number
cmString := r.replaceVars(cm.Data[k], a, 0, a.Count, a.Name, cm)
if obj, err := r.decodeConfigMap(cmString); err != nil {
return err
} else {
// Only create a new instance of this workload if it's "duplicate" annotation is "true"
if objAnnotations, err := accessor.Annotations(obj); err != nil {
return err
} else if duplicate, exists := objAnnotations["duplicate"]; !exists || duplicate != "true" {
continue
}
}
for w := workloadsComplete + workloadsNotComplete; w < workloadsComplete+a.Count; w++ {
if err := r.prepareAndRun(bm, w, a.Name, a, cm, []byte(cm.Data[k])); err != nil {
return err
}
}
}
}
return nil
}