/
device_plugin.go
116 lines (98 loc) · 3.36 KB
/
device_plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package deviceplugin
import (
"fmt"
"strings"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/klog"
devicepluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"github.com/AliyunContainerService/sgx-device-plugin/pkg/sgx"
)
// SGXDevicePlugin implements the Kubernetes device plugin API: DevicePluginServer.
type SGXDevicePlugin struct {
devs []*devicepluginapi.Device
socket string
stop chan interface{}
health chan *devicepluginapi.Device
server *grpc.Server
}
// GetDevicePluginOptions implements DevicePluginServer interface.
// We just do nothing here.
func (m *SGXDevicePlugin) GetDevicePluginOptions(context.Context, *devicepluginapi.Empty) (*devicepluginapi.DevicePluginOptions, error) {
return &devicepluginapi.DevicePluginOptions{}, nil
}
// ListAndWatch lists devices and update that list according to the health status.
// ListAndWatch implements DevicePluginServer interface.
func (m *SGXDevicePlugin) ListAndWatch(e *devicepluginapi.Empty, s devicepluginapi.DevicePlugin_ListAndWatchServer) error {
if err := s.Send(&devicepluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
klog.Errorf("Send ListAndWatchResponse error: %v", err)
}
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = devicepluginapi.Unhealthy
if err := s.Send(&devicepluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
klog.Errorf("Send ListAndWatchResponse error: %v", err)
}
}
}
}
// Allocate which return list of devices.
// Allocate implements DevicePluginServer interface.
func (m *SGXDevicePlugin) Allocate(ctx context.Context, reqs *devicepluginapi.AllocateRequest) (*devicepluginapi.AllocateResponse, error) {
var mounts []*devicepluginapi.Mount
var devices []*devicepluginapi.DeviceSpec
if sgx.EnableAESMSocketAttach {
for path, exist := range sgx.AllMountPoints() {
if path != sgx.AESMSocketDir {
continue
}
if exist {
mounts = append(mounts, &devicepluginapi.Mount{
ContainerPath: path,
HostPath: path,
ReadOnly: true,
})
} else {
klog.Warningf("WARNING: Mount point %s not found", path)
}
}
}
for dev, exist := range sgx.AllDeviceDrivers() {
if exist {
devices = append(devices, &devicepluginapi.DeviceSpec{
ContainerPath: dev,
HostPath: dev,
Permissions: "rw",
})
} else {
klog.Warningf("WARNING: Device %s not found", dev)
}
}
responses := devicepluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
response := devicepluginapi.ContainerAllocateResponse{
Envs: map[string]string{
"SGX_VISIBLE_DEVICES": strings.Join(req.DevicesIDs, ","),
},
Devices: devices,
Mounts: mounts,
}
klog.Infof("[Allocate] %s", req.String())
for _, id := range req.DevicesIDs {
if !sgx.DeviceExists(m.devs, id) {
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
// PreStartContainer implements DevicePluginServer interface.
// We just do nothing here.
func (m *SGXDevicePlugin) PreStartContainer(context.Context, *devicepluginapi.PreStartContainerRequest) (*devicepluginapi.PreStartContainerResponse, error) {
return &devicepluginapi.PreStartContainerResponse{}, nil
}