From 72ec42a1f3f6d5080a12797cc16021431a97a5a9 Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Wed, 24 Oct 2018 17:22:25 +0900 Subject: [PATCH 1/2] Check device existence --- server.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 26b399d..7ede9be 100644 --- a/server.go +++ b/server.go @@ -45,10 +45,18 @@ type HostDevicePlugin struct { func NewHostDevicePlugin(config HostDevicePluginConfig) *HostDevicePlugin { var devs = make([]*pluginapi.Device, config.NumDevices) + health := pluginapi.Healthy + for _, device := range config.HostDevices { + if _, err := os.Stat(device.HostPath); os.IsNotExist(err) { + health = pluginapi.Unhealthy + log.Println("HostPath '%s' is not found.", device.HostPath) + } + } + for i, _ := range devs { devs[i] = &pluginapi.Device{ ID: fmt.Sprint(i), - Health: pluginapi.Healthy, + Health: health, } } From 259338ddfe1ae722f5ef73c9d5b7082471de90cb Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Thu, 25 Oct 2018 20:42:55 +0900 Subject: [PATCH 2/2] Implemente health check --- server.go | 74 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/server.go b/server.go index 7ede9be..3449fd4 100644 --- a/server.go +++ b/server.go @@ -13,6 +13,10 @@ import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" ) +const ( + defaultHealthCheckIntervalSeconds = time.Duration(60) +) + type HostDevice struct { HostPath string `json:"hostPath"` ContainerPath string `json:"containerPath"` @@ -21,19 +25,21 @@ type HostDevice struct { // HostDevicePlugin implements the Kubernetes device plugin API type HostDevicePluginConfig struct { - ResourceName string `json:"resourceName"` - SocketName string `json:"socketName"` - HostDevices []*HostDevice `json:"hostDevices"` - NumDevices int `json:"numDevices"` + ResourceName string `json:"resourceName"` + SocketName string `json:"socketName"` + HostDevices []*HostDevice `json:"hostDevices"` + NumDevices int `json:"numDevices"` + HealthCheckIntervalSeconds time.Duration `json:"healthCheckIntervalSeconds"` } type HostDevicePlugin struct { - resourceName string - socket string - devs []*pluginapi.Device + resourceName string + socket string + healthCheckIntervalSeconds time.Duration + devs []*pluginapi.Device stop chan interface{} - health chan *pluginapi.Device + health chan string // this device files will be mounted to container hostDevices []*HostDevice @@ -45,14 +51,7 @@ type HostDevicePlugin struct { func NewHostDevicePlugin(config HostDevicePluginConfig) *HostDevicePlugin { var devs = make([]*pluginapi.Device, config.NumDevices) - health := pluginapi.Healthy - for _, device := range config.HostDevices { - if _, err := os.Stat(device.HostPath); os.IsNotExist(err) { - health = pluginapi.Unhealthy - log.Println("HostPath '%s' is not found.", device.HostPath) - } - } - + health := getHostDevicesHealth(config.HostDevices) for i, _ := range devs { devs[i] = &pluginapi.Device{ ID: fmt.Sprint(i), @@ -60,15 +59,21 @@ func NewHostDevicePlugin(config HostDevicePluginConfig) *HostDevicePlugin { } } + healthCheckIntervalSeconds := defaultHealthCheckIntervalSeconds + if config.HealthCheckIntervalSeconds > 0 { + healthCheckIntervalSeconds = config.HealthCheckIntervalSeconds + } + return &HostDevicePlugin{ resourceName: config.ResourceName, socket: pluginapi.DevicePluginPath + config.SocketName, + healthCheckIntervalSeconds: healthCheckIntervalSeconds, devs: devs, hostDevices: config.HostDevices, stop: make(chan interface{}), - health: make(chan *pluginapi.Device), + health: make(chan string), } } @@ -88,6 +93,17 @@ func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error return c, nil } +func getHostDevicesHealth(hostDevices []*HostDevice) string { + health := pluginapi.Healthy + for _, device := range hostDevices { + if _, err := os.Stat(device.HostPath); os.IsNotExist(err) { + health = pluginapi.Unhealthy + log.Printf("HostPath not found: %s", device.HostPath) + } + } + return health +} + // Start starts the gRPC server of the device plugin func (m *HostDevicePlugin) Start() error { err := m.cleanup() @@ -112,7 +128,7 @@ func (m *HostDevicePlugin) Start() error { } conn.Close() - // go m.healthcheck() + go m.healthCheck() return nil } @@ -161,16 +177,28 @@ func (m *HostDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePl select { case <-m.stop: return nil - case d := <-m.health: - // FIXME: there is no way to recover from the Unhealthy state. - d.Health = pluginapi.Unhealthy + case health := <-m.health: + // Update health of devices only in this thread. + for _, dev := range m.devs { + dev.Health = health + } s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}) } } } -func (m *HostDevicePlugin) unhealthy(dev *pluginapi.Device) { - m.health <- dev +func (m *HostDevicePlugin) healthCheck() { + log.Printf("Starting health check every %d seconds", m.healthCheckIntervalSeconds) + ticker := time.NewTicker(m.healthCheckIntervalSeconds * time.Second) + for { + select { + case <-ticker.C: + m.health <- getHostDevicesHealth(m.hostDevices) + case <-m.stop: + ticker.Stop() + return + } + } } // Allocate which return list of devices.