-
Notifications
You must be signed in to change notification settings - Fork 1
/
device_plugin.go
136 lines (119 loc) · 4.85 KB
/
device_plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"context"
"golang.org/x/exp/slices"
"github.com/brightbox/brightbox-volume-device-plugin/volwatch"
"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
type volumeDevicePlugin struct {
volumeID string
volumeUpdate chan Completion
volLister *VolumeLister
}
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
func (vdp *volumeDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
glog.V(3).Info("Volume GetDevicePluginOptions Called")
return &pluginapi.DevicePluginOptions{}, nil
}
func isRemoved(event fsnotify.Event) bool {
return event.Op == fsnotify.Remove
}
var volMissing = &pluginapi.ListAndWatchResponse{Devices: []*pluginapi.Device{}}
// Start is executed by Manager after plugin instantiation but before registration with kubelet
func (vdp *volumeDevicePlugin) Start() error {
vdp.volLister.Subscribe(vdp.volumeID, vdp.volumeUpdate)
return nil
}
// Stop is executred by Manager after the plugin is unregistered with kubelet
func (vdp *volumeDevicePlugin) Stop() error {
vdp.volLister.Unsubscribe(vdp.volumeID)
return nil
}
// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
func (vdp *volumeDevicePlugin) ListAndWatch(empty *pluginapi.Empty, srv pluginapi.DevicePlugin_ListAndWatchServer) error {
glog.V(3).Info("Volume ListAndWatch Called")
glog.V(3).Infof("Volume %s: Notifying kubelet", vdp.volumeID)
volPresent := &pluginapi.ListAndWatchResponse{
Devices: []*pluginapi.Device{
&pluginapi.Device{
ID: vdp.volumeID,
Health: pluginapi.Healthy,
},
},
}
if err := srv.Send(volPresent); err != nil {
glog.V(3).Infof("Volume %s: Failed to send volume present: %s", vdp.volumeID, err)
return err
}
glog.V(3).Infof("Volume %s: Waiting for updates", vdp.volumeID)
for {
select {
case <-vdp.volLister.Done():
glog.V(3).Infof("Volume %s: Exiting ListAndWatch: %s\n", vdp.volumeID, vdp.volLister.Err())
err := srv.Send(volMissing)
if err != nil {
glog.V(3).Infof("Volume %s: Failed to send volume missing: %s", vdp.volumeID, err)
return err
}
return vdp.volLister.Err()
case completion, ok := <-vdp.volumeUpdate:
glog.V(3).Infof("Volume %s: Received update", vdp.volumeID)
if !(ok && slices.Contains(completion.Volumes, vdp.volumeID)) {
glog.V(3).Infof("Volume %s: missing from list, updating and exiting", vdp.volumeID)
err := srv.Send(volMissing)
completion.CompleteFunc()
if err != nil {
glog.V(3).Infof("Volume %s: Failed to send volume missing: %s", vdp.volumeID, err)
return err
}
return nil
}
completion.CompleteFunc()
glog.V(3).Infof("Volume %s: still in list", vdp.volumeID)
glog.V(3).Infof("Volume %s: Waiting for updates", vdp.volumeID)
}
}
}
// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
func (vdp *volumeDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
return nil, nil
}
// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
func (vdp *volumeDevicePlugin) Allocate(ctx context.Context, request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
glog.V(3).Info("Volume Allocate Called")
glog.V(4).Infof("Request is %#v", request.ContainerRequests)
resp := new(pluginapi.AllocateResponse)
for _, container := range request.ContainerRequests {
containerResponse := new(pluginapi.ContainerAllocateResponse)
for _, id := range container.DevicesIDs {
idMountPath := volwatch.IDDevicePath(id)
glog.V(4).Infof("supplying mount at %q", idMountPath)
containerResponse.Devices = append(containerResponse.Devices,
&pluginapi.DeviceSpec{
ContainerPath: idMountPath,
HostPath: idMountPath,
Permissions: "rw",
},
)
}
resp.ContainerResponses = append(resp.ContainerResponses, containerResponse)
}
return resp, nil
}
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container
func (vdp *volumeDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return nil, nil
}