forked from dolthub/vitess
/
websocket.go
118 lines (103 loc) · 2.87 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package workflow
import (
"encoding/json"
"fmt"
"net/http"
log "github.com/golang/glog"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/acl"
)
var upgrader = websocket.Upgrader{} // use default options
// HandleHTTPWebSocket registers the WebSocket handler.
func (m *Manager) HandleHTTPWebSocket(pattern string) {
log.Infof("workflow Manager listening to websocket traffic at %v", pattern)
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
defer func() {
if x := recover(); x != nil {
errMsg := fmt.Sprintf("uncaught panic: %v", x)
log.Error(errMsg)
http.Error(w, errMsg, http.StatusInternalServerError)
}
}()
// Check ACL.
if err := acl.CheckAccessHTTP(r, acl.ADMIN); err != nil {
msg := fmt.Sprintf("WorkflowManager acl.CheckAccessHTTP failed: %v", err)
log.Error(msg)
http.Error(w, msg, http.StatusUnauthorized)
return
}
// Upgrade to WebSocket.
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Errorf("upgrade error: %v", err)
return
}
defer c.Close()
// Register the handler.
tree, notifications, i, err := m.getAndWatchFullTree(r.URL)
if err != nil {
log.Warningf("GetAndWatchFullTree failed: %v", err)
return
}
if notifications != nil {
defer m.NodeManager().CloseWatcher(i)
}
// First we send the full dump
if err := c.WriteMessage(websocket.TextMessage, tree); err != nil {
log.Warningf("WriteMessage(tree) failed: %v", err)
return
}
// If we didn't get a channel back (redirect case), we're done.
// We will just return the redirect, and close the websocket.
if notifications == nil {
return
}
// Start a go routine to get messages, send them to a channel.
recv := make(chan *ActionParameters, 10)
go func() {
for {
mt, message, err := c.ReadMessage()
if err != nil {
log.Warningf("failed to read message from websocket: %v", err)
close(recv)
return
}
if mt != websocket.TextMessage {
log.Warningf("weird message type: %v", mt)
}
ap := &ActionParameters{}
if err := json.Unmarshal(message, ap); err != nil {
log.Warningf("failed to JSON-decode message from websocket: %v", err)
close(recv)
return
}
recv <- ap
}
}()
// Let's listen to the channels until we're done.
for {
select {
case ap, ok := <-recv:
if !ok {
// The websocket was most likely closed.
return
}
ctx := context.TODO()
if err := m.NodeManager().Action(ctx, ap); err != nil {
log.Warningf("Action failed: %v", err)
}
case message, ok := <-notifications:
if !ok {
// We ran out of space on the update
// channel, so we had to close it.
return
}
if err := c.WriteMessage(websocket.TextMessage, message); err != nil {
log.Warningf("WriteMessage(tree) failed: %v", err)
return
}
}
}
})
}