forked from kubeedge/kubeedge
/
websockethandler.go
103 lines (91 loc) · 3.19 KB
/
websockethandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package handler
import (
"fmt"
"io"
"net/http"
"time"
bhLog "github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/beehive/pkg/core/model"
hubio "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/io"
emodel "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/model"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
// WebSocketHandler handle all request from websocket
var WebSocketHandler *WebsocketHandle
// WebsocketHandle access handler
type WebsocketHandle struct {
EventHandler *EventHandle
NodeLimit int
}
// ServeEvent handle the event coming from websocket
func (wh *WebsocketHandle) ServeEvent(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
projectID := vars["project_id"]
nodeID := vars["node_id"]
if wh.EventHandler.GetNodeCount() >= wh.NodeLimit {
bhLog.LOGGER.Errorf("Fail to serve node %s, reach node limit", nodeID)
http.Error(w, "too many Nodes connected", http.StatusTooManyRequests)
return
}
upgrader := websocket.Upgrader{}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
bhLog.LOGGER.Errorf("Fail to build websocket connection for node %s, reason %s", nodeID, err.Error())
http.Error(w, "failed to upgrade to websocket protocol", http.StatusInternalServerError)
return
}
info := &emodel.HubInfo{ProjectID: projectID, NodeID: nodeID}
hi := &hubio.JSONWSIO{WSConn: conn}
wh.EventHandler.ServeConn(hi, info)
}
// ServeQueueWorkload handle workload from queue
func (wh *WebsocketHandle) ServeQueueWorkload(w http.ResponseWriter, r *http.Request) {
workload, err := wh.EventHandler.GetWorkload()
if err != nil {
bhLog.LOGGER.Errorf("%s", err.Error())
http.Error(w, "Fail to get event queue workload", http.StatusInternalServerError)
return
}
_, err = io.WriteString(w, fmt.Sprintf("%f", workload))
if err != nil {
bhLog.LOGGER.Errorf("Fail to write string, reason: %s", err.Error())
}
}
// returns if the event queue is available or not.
// returns 0 if not available and 1 if available.
func (wh *WebsocketHandle) getEventQueueAvailability() int {
_, err := wh.EventHandler.GetWorkload()
if err != nil {
bhLog.LOGGER.Errorf("eventq is not available, reason %s", err.Error())
return 0
}
return 1
}
// EventReadLoop processes all read requests
func (wh *WebsocketHandle) EventReadLoop(hi hubio.CloudHubIO, info *emodel.HubInfo, stop chan ExitCode) {
for {
var msg model.Message
// set the read timeout as the keepalive interval so that we can disconnect when heart beat is lost
err := hi.SetReadDeadline(time.Now().Add(time.Duration(wh.EventHandler.KeepaliveInterval) * time.Second))
if err != nil {
bhLog.LOGGER.Errorf("SetReadDeadline error, %s", err.Error())
stop <- hubioReadFail
return
}
_, err = hi.ReadData(&msg)
if err != nil {
bhLog.LOGGER.Errorf("read error, connection for node %s will be closed, reason: %s", info.NodeID, err.Error())
stop <- hubioReadFail
return
}
err = wh.EventHandler.Pub2Controller(info, &msg)
if err != nil {
stop <- eventQueueDisconnect
}
}
}
// EventWriteLoop processes all write requests
func (wh *WebsocketHandle) EventWriteLoop(hi hubio.CloudHubIO, info *emodel.HubInfo, stop chan ExitCode) {
wh.EventHandler.eventWriteLoop(hi, info, stop)
}