/
k6_stopped_jobs.go
128 lines (100 loc) · 3.45 KB
/
k6_stopped_jobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package controllers
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
k6api "go.k6.io/k6/api/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func isJobRunning(log logr.Logger, service *v1.Service) bool {
resp, err := http.Get(fmt.Sprintf("http://%v:6565/v1/status", service.Spec.ClusterIP))
if err != nil {
return false
}
// Response has been received so assume the job is running.
if resp.StatusCode >= 400 {
log.Error(err, fmt.Sprintf("status from from runner job %v is %d", service.ObjectMeta.Name, resp.StatusCode))
return true
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
log.Error(err, fmt.Sprintf("Error on reading status of the runner job %v", service.ObjectMeta.Name))
return true
}
var status k6api.StatusJSONAPI
if err := json.Unmarshal(data, &status); err != nil {
log.Error(err, fmt.Sprintf("Error on parsing status of the runner job %v", service.ObjectMeta.Name))
return true
}
return status.Status().Running
}
// StoppedJobs checks if the runners pods have stopped execution.
func StoppedJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) (allStopped bool) {
if len(k6.GetStatus().TestRunID) > 0 {
log = log.WithValues("testRunId", k6.GetStatus().TestRunID)
}
log.Info("Waiting for pods to stop the test run")
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.NamespacedName().Name,
"runner": "true",
})
opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace}
sl := &v1.ServiceList{}
if err := r.List(ctx, sl, opts); err != nil {
log.Error(err, "Could not list services")
return
}
var runningJobs int32
for _, service := range sl.Items {
if isJobRunning(log, &service) {
runningJobs++
}
}
log.Info(fmt.Sprintf("%d/%d runners stopped execution", k6.GetSpec().Parallelism-runningJobs, k6.GetSpec().Parallelism))
if runningJobs > 0 {
return
}
allStopped = true
return
}
// KillJobs retrieves all runner jobs and attempts to delete them
// with propagation policy so that corresponding pods are deleted as well.
// On failure, error is returned.
// On success, error is nil and allDeleted shows if all retrieved jobs were deleted.
func KillJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *TestRunReconciler) (allDeleted bool, err error) {
if len(k6.GetStatus().TestRunID) > 0 {
log = log.WithValues("testRunId", k6.GetStatus().TestRunID)
}
log.Info("Killing all runner jobs.")
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.NamespacedName().Name,
"runner": "true",
})
opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace}
jl := &batchv1.JobList{}
if err = r.List(ctx, jl, opts); err != nil {
log.Error(err, "Could not list jobs")
return
}
var deleteCount int
propagationPolicy := client.PropagationPolicy(metav1.DeletionPropagation(metav1.DeletePropagationBackground))
for _, job := range jl.Items {
if err = r.Delete(ctx, &job, propagationPolicy); err != nil {
log.Error(err, fmt.Sprintf("Failed to delete runner job %s", job.Name))
// do we need to retry here?
}
deleteCount++
}
return deleteCount == len(jl.Items), nil
}