-
Notifications
You must be signed in to change notification settings - Fork 193
/
multiClusterHealthProbeManager.go
168 lines (147 loc) · 6.4 KB
/
multiClusterHealthProbeManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package controller
import (
log "github.com/F5Networks/k8s-bigip-ctlr/v3/pkg/vlogger"
"net"
"net/http"
"os"
"strings"
"time"
)
func (ctlr *Controller) checkPrimaryClusterHealthStatus() bool {
status := false
for i := 1; i <= 2; i++ {
switch ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPointType {
case "http":
status = ctlr.getPrimaryClusterHealthStatusFromHTTPEndPoint()
case "tcp":
status = ctlr.getPrimaryClusterHealthStatusFromTCPEndPoint()
case "", "default":
log.Debugf("[MultiCluster] unsupported primaryEndPoint specified under highAvailabilityCIS section: %v", ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint)
return false
}
if status {
return status
}
time.Sleep(time.Duration(ctlr.RequestHandler.PrimaryClusterHealthProbeParams.retryInterval) * time.Second)
}
return false
}
// getPrimaryClusterHealthCheckEndPointType method determines type of probe to be done from CIS parameters
// http/tcp are the supported types
// when cis runs in primary mode this method should never be called
// should be called only when cis is running in secondary mode
func (ctlr *Controller) setPrimaryClusterHealthCheckEndPointType() {
if ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint != "" {
if strings.HasPrefix(ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, "tcp://") {
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPointType = "tcp"
} else if strings.HasPrefix(ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, "http://") {
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPointType = "http"
} else {
log.Debugf("[MultiCluster] unsupported primaryEndPoint protocol type configured under highAvailabilityCIS section. EndPoint: %v \n "+
"supported protocols:[http, tcp] ", ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint)
os.Exit(1)
}
}
}
// getPrimaryClusterHealthStatusFromHTTPEndPoint check the primary cluster health using http endPoint
func (ctlr *Controller) getPrimaryClusterHealthStatusFromHTTPEndPoint() bool {
if ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint == "" {
return false
}
if !strings.HasPrefix(ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, "http://") {
log.Debugf("[MultiCluster] Error: invalid primaryEndPoint detected under highAvailabilityCIS section: %v", ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint)
return false
}
req, err := http.NewRequest("GET", ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, nil)
if err != nil {
log.Errorf("[MultiCluster] Creating new HTTP request error: %v ", err)
return false
}
timeOut := ctlr.PostParams.httpClient.Timeout
defer func() {
ctlr.PostParams.httpClient.Timeout = timeOut
}()
if ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusChanged {
log.Debugf("[MultiCluster] posting GET Check primaryEndPoint Health request on %v", ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint)
}
ctlr.PostParams.httpClient.Timeout = 10 * time.Second
httpResp := ctlr.httpGetReq(req)
if httpResp == nil {
return false
}
switch httpResp.StatusCode {
case http.StatusOK:
return true
case http.StatusNotFound, http.StatusInternalServerError:
log.Debugf("[MultiCluster] error fetching primaryEndPoint health status. endPoint:%v, statusCode: %v, error:%v",
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, httpResp.StatusCode, httpResp.Request.Response)
}
return false
}
// getPrimaryClusterHealthStatusFromTCPEndPoint check the primary cluster health using tcp endPoint
func (ctlr *Controller) getPrimaryClusterHealthStatusFromTCPEndPoint() bool {
if ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint == "" {
return false
}
if !strings.HasPrefix(ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, "tcp://") {
log.Debugf("[MultiCluster] invalid primaryEndPoint health probe tcp endpoint: %v", ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint)
return false
}
_, err := net.Dial("tcp", strings.TrimLeft(ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, "tcp://"))
if err != nil {
log.Debugf("[MultiCluster] error connecting to primaryEndPoint tcp health probe: %v, error: %v", ctlr.RequestHandler.PrimaryClusterHealthProbeParams.EndPoint, err)
return false
}
return true
}
func (ctlr *Controller) httpGetReq(request *http.Request) *http.Response {
httpResp, err := ctlr.PostParams.httpClient.Do(request)
if err != nil {
if ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusChanged {
log.Debugf("[MultiCluster] REST call error: %v ", err)
}
return nil
}
return httpResp
}
/*
* probePrimaryClusterHealthStatus runs as a thread
* this method check the cluster health periodically
* will start probing only after init state is processed
* if cluster is up earlier and now its down then resource queue event will be triggered
* if cluster is down earlier and now also its down then we will skip processing
* if cluster is up and running there is no status change then we skip the processing
*/
func (ctlr *Controller) probePrimaryClusterHealthStatus() {
for {
if ctlr.initState {
continue
}
ctlr.getPrimaryClusterHealthStatus()
}
}
func (ctlr *Controller) getPrimaryClusterHealthStatus() {
// only process when the cis is initialized
status := ctlr.checkPrimaryClusterHealthStatus()
// if status is changed i.e from up -> down / down -> up
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.paramLock.Lock()
if ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusRunning != status {
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusChanged = true
// if primary cis id down then post the config
if !status {
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusRunning = false
ctlr.enqueuePrimaryClusterProbeEvent()
} else {
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusRunning = true
}
} else {
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusChanged = false
}
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.paramLock.Unlock()
// wait for configured probeInterval
time.Sleep(time.Duration(ctlr.RequestHandler.PrimaryClusterHealthProbeParams.probeInterval) * time.Second)
}
func (ctlr *Controller) firstPollPrimaryClusterHealthStatus() {
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusRunning = ctlr.checkPrimaryClusterHealthStatus()
ctlr.RequestHandler.PrimaryClusterHealthProbeParams.statusChanged = true
}