/
health_server.go
124 lines (102 loc) · 3.16 KB
/
health_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package healthserver
import (
"context"
"errors"
"fmt"
"os"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v2"
"github.com/dell/csi-baremetal/pkg/scheduler/extender/healthserver/common"
)
// ExtenderHealthServer provides endpoint for extender readiness check
type ExtenderHealthServer struct {
logger *logrus.Logger
reader yamlReader
nodeName string
isPatchingEnabled bool
}
type yamlReader interface {
getStatuses() (*common.ReadinessStatusList, error)
}
type yamlReaderImpl struct {
statusFilePath string
}
// NewExtenderHealthServer constructs ExtenderHealthServer for extender pod
func NewExtenderHealthServer(logger *logrus.Logger, isPatchingEnabled bool) (*ExtenderHealthServer, error) {
nodeName := os.Getenv("KUBE_NODE_NAME")
if nodeName == "" {
return nil, errors.New("nodeName parameter is empty")
}
return &ExtenderHealthServer{
logger: logger,
reader: &yamlReaderImpl{
statusFilePath: common.ExtenderConfigMapFullPath,
},
nodeName: nodeName,
isPatchingEnabled: isPatchingEnabled,
}, nil
}
const (
ready = 0
notReady = 1
notFound = 3
)
var (
readyResponse = &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}
notReadyResponse = &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}
)
// Check does the health check and changes the status of the server based on drives cache size
func (e *ExtenderHealthServer) Check(context.Context, *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
ll := e.logger.WithFields(logrus.Fields{
"method": "Check",
})
if !e.isPatchingEnabled {
ll.Debugf("Patcher is not enabled")
return readyResponse, nil
}
readinessStatuses, err := e.reader.getStatuses()
if err != nil {
return notReadyResponse, err
}
var scheduler string
isReady := notFound
for _, nodeStatus := range readinessStatuses.Items {
if nodeStatus.NodeName == e.nodeName {
scheduler = nodeStatus.KubeScheduler
if nodeStatus.Restarted {
isReady = ready
break
} else {
isReady = notReady
break
}
}
}
if isReady == ready {
return readyResponse, nil
}
if isReady == notReady {
return notReadyResponse, fmt.Errorf("kube-scheduler %s is not restarted after patching", scheduler)
}
return notReadyResponse, fmt.Errorf("node %s is not found in extenders status list", e.nodeName)
}
// Watch is used by clients to receive updates when the svc status changes.
// Watch only dummy implemented just to satisfy the interface.
func (e *ExtenderHealthServer) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func (y *yamlReaderImpl) getStatuses() (*common.ReadinessStatusList, error) {
readinessStatuses := &common.ReadinessStatusList{}
yamlFile, err := os.ReadFile(y.statusFilePath)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(yamlFile, readinessStatuses)
if err != nil {
return nil, err
}
return readinessStatuses, nil
}