-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
490 lines (442 loc) · 18.8 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
/*
Copyright 2017 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package flexvolume to manage Kubernetes storage attach events.
package flexvolume
import (
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/rook/rook/pkg/util/display"
"github.com/coreos/pkg/capnslog"
rookalpha "github.com/rook/rook/pkg/apis/rook.io/v1alpha2"
"github.com/rook/rook/pkg/clusterd"
"github.com/rook/rook/pkg/daemon/ceph/agent/flexvolume/attachment"
"github.com/rook/rook/pkg/operator/ceph/agent"
"github.com/rook/rook/pkg/operator/ceph/cluster"
"github.com/rook/rook/pkg/operator/ceph/cluster/mon"
"github.com/rook/rook/pkg/operator/k8sutil"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
)
const (
// ClusterNamespaceKey key for cluster namespace option.
ClusterNamespaceKey = "clusterNamespace"
// ClusterNameKey key for cluster name option (deprecated).
ClusterNameKey = "clusterName"
// StorageClassKey key for storage class name option.
StorageClassKey = "storageClass"
// PoolKey key for pool name option.
PoolKey = "pool"
// BlockPoolKey key for blockPool name option.
BlockPoolKey = "blockPool"
// PoolKey key for image name option.
ImageKey = "image"
// PoolKey key for data pool name option.
DataBlockPoolKey = "dataBlockPool"
kubeletDefaultRootDir = "/var/lib/kubelet"
)
var driverLogger = capnslog.NewPackageLogger("github.com/rook/rook", "flexdriver")
// Controller handles all events from the Flexvolume driver
type Controller struct {
context *clusterd.Context
volumeManager VolumeManager
volumeAttachment attachment.Attachment
mountSecurityMode string
}
// ClientAccessInfo hols info for Ceph access
type ClientAccessInfo struct {
MonAddresses []string `json:"monAddresses"`
UserName string `json:"userName"`
SecretKey string `json:"secretKey"`
}
// NewController create a new controller to handle events from the flexvolume driver
func NewController(context *clusterd.Context, volumeAttachment attachment.Attachment, manager VolumeManager, mountSecurityMode string) *Controller {
return &Controller{
context: context,
volumeAttachment: volumeAttachment,
volumeManager: manager,
mountSecurityMode: mountSecurityMode,
}
}
// Attach attaches rook volume to the node
func (c *Controller) Attach(attachOpts AttachOptions, devicePath *string) error {
namespace := os.Getenv(k8sutil.PodNamespaceEnvVar)
node := os.Getenv(k8sutil.NodeNameEnvVar)
// Name of CRD is the PV name. This is done so that the CRD can be use for fencing
crdName := attachOpts.VolumeName
// Check if this volume has been attached
volumeattachObj, err := c.volumeAttachment.Get(namespace, crdName)
if err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get volume CRD %s. %+v", crdName, err)
}
// No volumeattach CRD for this volume found. Create one
volumeattachObj = rookalpha.NewVolume(
crdName,
namespace,
node,
attachOpts.PodNamespace,
attachOpts.Pod,
attachOpts.ClusterNamespace,
attachOpts.MountDir,
strings.ToLower(attachOpts.RW) == ReadOnly,
)
logger.Infof("creating Volume attach Resource %s/%s: %+v", volumeattachObj.Namespace, volumeattachObj.Name, attachOpts)
err = c.volumeAttachment.Create(volumeattachObj)
if err != nil {
if !errors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create volume CRD %s. %+v", crdName, err)
}
// Some other attacher beat us in this race. Kubernetes will retry again.
return fmt.Errorf("failed to attach volume %s for pod %s/%s. Volume is already attached by a different pod",
crdName, attachOpts.PodNamespace, attachOpts.Pod)
}
} else {
// Volume has already been attached.
// find if the attachment object has been previously created.
// This could be in the case of a multiple attachment for ROs or
// it could be the the Volume record was created previously and
// the attach operation failed and Kubernetes retried.
found := false
for _, a := range volumeattachObj.Attachments {
if a.MountDir == attachOpts.MountDir {
found = true
}
}
if !found {
// Check if there is already an attachment with RW.
index := getPodRWAttachmentObject(volumeattachObj)
if index != -1 {
// check if the RW attachment is orphaned.
attachment := &volumeattachObj.Attachments[index]
logger.Infof("volume attachment record %s/%s exists for pod: %s/%s", volumeattachObj.Namespace, volumeattachObj.Name, attachment.PodNamespace, attachment.PodName)
// Note this could return the reference of the pod who is requesting the attach if this pod have the same name as the pod in the attachment record.
allowAttach := false
pod, err := c.context.Clientset.CoreV1().Pods(attachment.PodNamespace).Get(attachment.PodName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get pod CRD %s/%s. %+v", attachment.PodNamespace, attachment.PodName, err)
}
allowAttach = true
logger.Infof("volume attachment record %s/%s is orphaned. Updating record with new attachment information for pod %s/%s", volumeattachObj.Namespace, volumeattachObj.Name, attachOpts.PodNamespace, attachOpts.Pod)
}
if err == nil && (attachment.PodNamespace == attachOpts.PodNamespace && attachment.PodName == attachOpts.Pod && attachment.Node == node) {
allowAttach = true
logger.Infof("volume attachment record %s/%s is starting on the same node. Updating record with new attachment information for pod %s/%s", volumeattachObj.Namespace, volumeattachObj.Name, attachOpts.PodNamespace, attachOpts.Pod)
}
if allowAttach {
// Update attachment record and proceed with attaching
attachment.Node = node
attachment.MountDir = attachOpts.MountDir
attachment.PodNamespace = attachOpts.PodNamespace
attachment.PodName = attachOpts.Pod
attachment.ClusterName = attachOpts.ClusterNamespace
attachment.ReadOnly = attachOpts.RW == ReadOnly
err = c.volumeAttachment.Update(volumeattachObj)
if err != nil {
return fmt.Errorf("failed to update volume CRD %s. %+v", crdName, err)
}
} else {
// Attachment is not orphaned. Original pod still exists. Don't attach.
return fmt.Errorf("failed to attach volume %s for pod %s/%s. Volume is already attached by pod %s/%s. Status %+v",
crdName, attachOpts.PodNamespace, attachOpts.Pod, attachment.PodNamespace, attachment.PodName, pod.Status.Phase)
}
} else {
// No RW attachment found. Check if this is a RW attachment request.
// We only support RW once attachment. No mixing either with RO
if attachOpts.RW == "rw" && len(volumeattachObj.Attachments) > 0 {
return fmt.Errorf("failed to attach volume %s for pod %s/%s. Volume is already attached by one or more pods",
crdName, attachOpts.PodNamespace, attachOpts.Pod)
}
// Create a new attachment record and proceed with attaching
newAttach := rookalpha.Attachment{
Node: node,
PodNamespace: attachOpts.PodNamespace,
PodName: attachOpts.Pod,
ClusterName: attachOpts.ClusterNamespace,
MountDir: attachOpts.MountDir,
ReadOnly: attachOpts.RW == ReadOnly,
}
volumeattachObj.Attachments = append(volumeattachObj.Attachments, newAttach)
err = c.volumeAttachment.Update(volumeattachObj)
if err != nil {
return fmt.Errorf("failed to update volume CRD %s. %+v", crdName, err)
}
}
}
}
*devicePath, err = c.volumeManager.Attach(attachOpts.Image, attachOpts.BlockPool, attachOpts.MountUser, attachOpts.MountSecret, attachOpts.ClusterNamespace)
if err != nil {
return fmt.Errorf("failed to attach volume %s/%s: %+v", attachOpts.BlockPool, attachOpts.Image, err)
}
return nil
}
// Expand RBD image
func (c *Controller) Expand(expandArgs ExpandArgs, _ *struct{}) error {
expandOpts := expandArgs.ExpandOptions
sizeInMb := display.BToMb(expandArgs.Size)
err := c.volumeManager.Expand(expandOpts.Image, expandOpts.Pool, expandOpts.ClusterNamespace, sizeInMb)
if err != nil {
return fmt.Errorf("failed to resize volume %s/%s: %+v", expandOpts.Pool, expandOpts.Image, err)
}
return nil
}
// Detach detaches a rook volume to the node
func (c *Controller) Detach(detachOpts AttachOptions, _ *struct{} /* void reply */) error {
return c.doDetach(detachOpts, false /* force */)
}
// DetachForce forces a detach on a rook volume to the node
func (c *Controller) DetachForce(detachOpts AttachOptions, _ *struct{} /* void reply */) error {
return c.doDetach(detachOpts, true /* force */)
}
func (c *Controller) doDetach(detachOpts AttachOptions, force bool) error {
if err := c.volumeManager.Detach(
detachOpts.Image,
detachOpts.BlockPool,
detachOpts.MountUser,
detachOpts.MountSecret,
detachOpts.ClusterNamespace,
force,
); err != nil {
return fmt.Errorf("failed to detach volume %s/%s: %+v", detachOpts.BlockPool, detachOpts.Image, err)
}
namespace := os.Getenv(k8sutil.PodNamespaceEnvVar)
crdName := detachOpts.VolumeName
volumeAttach, err := c.volumeAttachment.Get(namespace, crdName)
if err != nil {
return fmt.Errorf("failed to get VolumeAttachment for %s in namespace %s. %+v", crdName, namespace, err)
}
if len(volumeAttach.Attachments) == 0 {
logger.Infof("Deleting Volume CRD %s/%s", namespace, crdName)
return c.volumeAttachment.Delete(namespace, crdName)
}
return nil
}
// RemoveAttachmentObject removes the attachment from the Volume CRD and returns whether the volume is safe to detach
func (c *Controller) RemoveAttachmentObject(detachOpts AttachOptions, safeToDetach *bool) error {
namespace := os.Getenv(k8sutil.PodNamespaceEnvVar)
crdName := detachOpts.VolumeName
logger.Infof("deleting attachment for mountDir %s from Volume attach CRD %s/%s", detachOpts.MountDir, namespace, crdName)
volumeAttach, err := c.volumeAttachment.Get(namespace, crdName)
if err != nil {
return fmt.Errorf("failed to get Volume attach CRD %s/%s: %+v", namespace, crdName, err)
}
node := os.Getenv(k8sutil.NodeNameEnvVar)
nodeAttachmentCount := 0
needUpdate := false
for i, v := range volumeAttach.Attachments {
if v.Node == node {
nodeAttachmentCount++
if v.MountDir == detachOpts.MountDir {
// Deleting slice
volumeAttach.Attachments = append(volumeAttach.Attachments[:i], volumeAttach.Attachments[i+1:]...)
needUpdate = true
}
}
}
if needUpdate {
// only one attachment on this node, which is the one that got removed.
if nodeAttachmentCount == 1 {
*safeToDetach = true
}
return c.volumeAttachment.Update(volumeAttach)
}
return fmt.Errorf("volume CRD %s found but attachment to the mountDir %s was not found", crdName, detachOpts.MountDir)
}
// Log logs messages from the driver
func (c *Controller) Log(message LogMessage, _ *struct{} /* void reply */) error {
if message.IsError {
driverLogger.Error(message.Message)
} else {
driverLogger.Info(message.Message)
}
return nil
}
func (c *Controller) parseClusterNamespace(storageClassName string) (string, error) {
sc, err := c.context.Clientset.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{})
if err != nil {
return "", err
}
clusterNamespace, ok := sc.Parameters["clusterNamespace"]
if !ok {
// Checks for older version of parameter i.e., clusterName if clusterNamespace not found
logger.Infof("clusterNamespace not specified in the storage class %s. Checking for clusterName", storageClassName)
clusterNamespace, ok = sc.Parameters["clusterName"]
if !ok {
// Defaults to rook if not found
logger.Infof("clusterNamespace not specified in the storage class %s. Defaulting to '%s'", storageClassName, cluster.DefaultClusterName)
return cluster.DefaultClusterName, nil
}
return clusterNamespace, nil
}
return clusterNamespace, nil
}
// GetAttachInfoFromMountDir obtain pod and volume information from the mountDir. K8s does not provide
// all necessary information to detach a volume (https://github.com/kubernetes/kubernetes/issues/52590).
// So we are hacking a bit and by parsing it from mountDir
func (c *Controller) GetAttachInfoFromMountDir(mountDir string, attachOptions *AttachOptions) error {
if attachOptions.PodID == "" {
podID, pvName, err := getPodAndPVNameFromMountDir(mountDir)
if err != nil {
return err
}
attachOptions.PodID = podID
attachOptions.VolumeName = pvName
}
pv, err := c.context.Clientset.CoreV1().PersistentVolumes().Get(attachOptions.VolumeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get persistent volume %s: %+v", attachOptions.VolumeName, err)
}
if attachOptions.PodNamespace == "" {
// pod namespace should be the same as the PVC namespace
attachOptions.PodNamespace = pv.Spec.ClaimRef.Namespace
}
node := os.Getenv(k8sutil.NodeNameEnvVar)
if attachOptions.Pod == "" {
// Find all pods scheduled to this node
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", node).String(),
}
pods, err := c.context.Clientset.CoreV1().Pods(attachOptions.PodNamespace).List(opts)
if err != nil {
return fmt.Errorf("failed to get pods in namespace %s: %+v", attachOptions.PodNamespace, err)
}
pod := findPodByID(pods, types.UID(attachOptions.PodID))
if pod != nil {
attachOptions.Pod = pod.GetName()
}
}
if attachOptions.Image == "" {
attachOptions.Image = pv.Spec.PersistentVolumeSource.FlexVolume.Options[ImageKey]
}
if attachOptions.BlockPool == "" {
attachOptions.BlockPool = pv.Spec.PersistentVolumeSource.FlexVolume.Options[BlockPoolKey]
if attachOptions.BlockPool == "" {
// fall back to the "pool" if the "blockPool" is not set
attachOptions.BlockPool = pv.Spec.PersistentVolumeSource.FlexVolume.Options[PoolKey]
}
}
if attachOptions.StorageClass == "" {
attachOptions.StorageClass = pv.Spec.PersistentVolumeSource.FlexVolume.Options[StorageClassKey]
}
if attachOptions.MountUser == "" {
attachOptions.MountUser = "admin"
}
attachOptions.ClusterNamespace, err = c.parseClusterNamespace(attachOptions.StorageClass)
if err != nil {
return fmt.Errorf("failed to parse clusterNamespace from storageClass %s: %+v", attachOptions.StorageClass, err)
}
return nil
}
// GetGlobalMountPath generate the global mount path where the device path is mounted.
// It is based on the kubelet root dir, which defaults to /var/lib/kubelet
func (c *Controller) GetGlobalMountPath(input GlobalMountPathInput, globalMountPath *string) error {
vendor, driver, err := getFlexDriverInfo(input.DriverDir)
if err != nil {
return err
}
*globalMountPath = path.Join(c.getKubeletRootDir(), "plugins", vendor, driver, "mounts", input.VolumeName)
return nil
}
// GetClientAccessInfo obtains the cluster monitor endpoints, username and secret
func (c *Controller) GetClientAccessInfo(args []string, clientAccessInfo *ClientAccessInfo) error {
// args: 0 ClusterNamespace, 1 PodNamespace, 2 MountUser, 3 MountSecret
clusterNamespace := args[0]
clusterInfo, _, _, err := mon.LoadClusterInfo(c.context, clusterNamespace)
if err != nil {
return fmt.Errorf("failed to load cluster information from clusters namespace %s: %+v", clusterNamespace, err)
}
monEndpoints := make([]string, 0, len(clusterInfo.Monitors))
for _, monitor := range clusterInfo.Monitors {
monEndpoints = append(monEndpoints, monitor.Endpoint)
}
clientAccessInfo.MonAddresses = monEndpoints
podNamespace := args[1]
clientAccessInfo.UserName = args[2]
clientAccessInfo.SecretKey = args[3]
if c.mountSecurityMode == agent.MountSecurityModeRestricted && (clientAccessInfo.UserName == "" || clientAccessInfo.SecretKey == "") {
return fmt.Errorf("no mount user and/or mount secret given")
}
if c.mountSecurityMode == agent.MountSecurityModeAny && clientAccessInfo.UserName == "" {
clientAccessInfo.UserName = "admin"
}
if clientAccessInfo.SecretKey != "" {
secret, err := c.context.Clientset.CoreV1().Secrets(podNamespace).Get(clientAccessInfo.SecretKey, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("unable to get mount secret %s from pod namespace %s. %+v", clientAccessInfo.SecretKey, podNamespace, err)
}
if len(secret.Data) == 0 || len(secret.Data) > 1 {
return fmt.Errorf("no data or more than one data (length %d) in mount secret %s in namespace %s", len(secret.Data), clientAccessInfo.SecretKey, podNamespace)
}
var secretValue string
for _, value := range secret.Data {
secretValue = string(value[:])
break
}
clientAccessInfo.SecretKey = secretValue
} else if c.mountSecurityMode == agent.MountSecurityModeAny && clientAccessInfo.SecretKey == "" {
clientAccessInfo.SecretKey = clusterInfo.AdminSecret
}
return nil
}
// GetKernelVersion returns the kernel version of the current node.
func (c *Controller) GetKernelVersion(_ *struct{} /* no inputs */, kernelVersion *string) error {
nodeName := os.Getenv(k8sutil.NodeNameEnvVar)
node, err := c.context.Clientset.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get kernel version from node information for node %s: %+v", nodeName, err)
}
*kernelVersion = node.Status.NodeInfo.KernelVersion
return nil
}
// getKubeletRootDir queries the kubelet configuration to find the kubelet root dir. Defaults to /var/lib/kubelet
func (c *Controller) getKubeletRootDir() string {
// in k8s 1.8 it does not appear possible to change the default root dir
// see https://github.com/rook/rook/issues/1282
return kubeletDefaultRootDir
}
// getPodAndPVNameFromMountDir parses pod information from the mountDir
func getPodAndPVNameFromMountDir(mountDir string) (string, string, error) {
// mountDir is in the form of <rootDir>/pods/<podID>/volumes/rook.io~rook/<pv name>
filepath.Clean(mountDir)
token := strings.Split(mountDir, string(filepath.Separator))
// token length should at least size 5
length := len(token)
if length < 5 {
return "", "", fmt.Errorf("failed to parse mountDir %s for CRD name and podID", mountDir)
}
return token[length-4], token[length-1], nil
}
func findPodByID(pods *v1.PodList, podUID types.UID) *v1.Pod {
for i := range pods.Items {
if pods.Items[i].GetUID() == podUID {
return &(pods.Items[i])
}
}
return nil
}
// getPodRWAttachmentObject loops through the list of attachments of the Volume
// resource and returns the index of the first RW attachment object
func getPodRWAttachmentObject(volumeAttachmentObject *rookalpha.Volume) int {
for i, a := range volumeAttachmentObject.Attachments {
if !a.ReadOnly {
return i
}
}
return -1
}