-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlonghorn-scheduler-admission.go
195 lines (168 loc) · 5.46 KB
/
longhorn-scheduler-admission.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kwhhttp "github.com/slok/kubewebhook/v2/pkg/http"
kwhlogrus "github.com/slok/kubewebhook/v2/pkg/log/logrus"
kwhmodel "github.com/slok/kubewebhook/v2/pkg/model"
kwhmutating "github.com/slok/kubewebhook/v2/pkg/webhook/mutating"
)
const annBetaStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
const annStorageProvisioner = "volume.kubernetes.io/storage-provisioner"
type config struct {
certFile string
keyFile string
driverName string
schedulerName string
}
func initFlags() *config {
cfg := &config{}
fl := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fl.StringVar(&cfg.certFile, "tls-cert-file", "", "TLS certificate file")
fl.StringVar(&cfg.keyFile, "tls-key-file", "", "TLS key file")
fl.StringVar(&cfg.driverName, "driver", "driver.longhorn.io", "Driver name")
fl.StringVar(&cfg.schedulerName, "scheduler", "longhorn", "Scheduler name")
fl.Parse(os.Args[1:])
return cfg
}
func run(cli kubernetes.Interface) error {
logrusLogEntry := logrus.NewEntry(logrus.New())
logrusLogEntry.Logger.SetLevel(logrus.DebugLevel)
logger := kwhlogrus.NewLogrus(logrusLogEntry)
cfg := initFlags()
// Create mutator.
mt := kwhmutating.MutatorFunc(func(ctx context.Context, _ *kwhmodel.AdmissionReview, obj metav1.Object) (*kwhmutating.MutatorResult, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return &kwhmutating.MutatorResult{}, nil
}
// Scheduler name is already assigned
if pod.Spec.SchedulerName != "" && pod.Spec.SchedulerName != "default-scheduler" {
return &kwhmutating.MutatorResult{}, nil
}
var pvcNames []string
// Collect all PVCs attached to Pod
for _, volume := range pod.Spec.Volumes {
// Volume has inline CSI driver assigned
if volume.CSI != nil && volume.CSI.Driver == cfg.driverName {
pod.Spec.SchedulerName = cfg.schedulerName
return &kwhmutating.MutatorResult{MutatedObject: pod}, nil
}
// Volume is not PVC, it does not interest us
if volume.PersistentVolumeClaim == nil {
continue
}
pvcNames = append(pvcNames, volume.PersistentVolumeClaim.ClaimName)
}
// Check PVCs
for _, pvcName := range pvcNames {
var discoveredProvisioner string
pvc, err := cli.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return &kwhmutating.MutatorResult{}, err
}
// Try to gather provisioner name from annotations
if pvc != nil {
if provisioner, ok := pvc.Annotations[annStorageProvisioner]; ok {
discoveredProvisioner = provisioner
}
if provisioner, ok := pvc.Annotations[annBetaStorageProvisioner]; ok {
discoveredProvisioner = provisioner
}
}
// Try to gather provisioner name from associated StorageClass
if discoveredProvisioner == "" && pvc.Spec.StorageClassName != nil && *pvc.Spec.StorageClassName != "" {
sc, err := cli.StorageV1().StorageClasses().Get(ctx, *pvc.Spec.StorageClassName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return &kwhmutating.MutatorResult{}, err
}
if sc != nil && sc.Provisioner == cfg.driverName {
discoveredProvisioner = sc.Provisioner
}
}
// Try to gather provisioner name from associated PV
if discoveredProvisioner == "" && pvc.Spec.VolumeName != "" {
pv, err := cli.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return &kwhmutating.MutatorResult{}, err
}
if pv != nil && pv.Spec.CSI != nil {
discoveredProvisioner = pv.Spec.CSI.Driver
}
}
// Overwrite the scheduler name
if discoveredProvisioner == cfg.driverName {
pod.Spec.SchedulerName = cfg.schedulerName
break
}
}
// Check if we're a RWX share-manager.
for k, v := range pod.Labels {
if k != "longhorn.io/component" {
continue
}
if v == "share-manager" {
pod.Spec.SchedulerName = cfg.schedulerName
break
}
}
return &kwhmutating.MutatorResult{MutatedObject: pod}, nil
})
// Create webhook.
mcfg := kwhmutating.WebhookConfig{
ID: "longhorn-scheduler-admission",
Mutator: mt,
Logger: logger,
}
wh, err := kwhmutating.NewWebhook(mcfg)
if err != nil {
return fmt.Errorf("error creating webhook: %w", err)
}
// Get HTTP handler from webhook.
whHandler, err := kwhhttp.HandlerFor(kwhhttp.HandlerConfig{Webhook: wh, Logger: logger})
if err != nil {
return fmt.Errorf("error creating webhook handler: %w", err)
}
// Serve.
logger.Infof("Listening on :8080")
err = http.ListenAndServeTLS(":8080", cfg.certFile, cfg.keyFile, whHandler)
if err != nil {
return fmt.Errorf("error serving webhook: %w", err)
}
return nil
}
func main() {
cli, err := GetK8sSTDClients()
if err != nil {
fmt.Fprintf(os.Stderr, "error getting kubernetes client: %s", err)
os.Exit(1)
}
err = run(cli)
if err != nil {
fmt.Fprintf(os.Stderr, "error running app: %s", err)
os.Exit(1)
}
}
// GetK8sSTDClients returns a all k8s clients.
func GetK8sSTDClients() (kubernetes.Interface, error) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// Get the client.
k8sCli, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return k8sCli, nil
}