This repository has been archived by the owner on Dec 1, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 62
/
ws_handler.go
142 lines (124 loc) · 3.33 KB
/
ws_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package collector
import (
"net/http"
"sync"
"time"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2"
"code.cloudfoundry.org/lager"
"github.com/golang/protobuf/proto" //nolint
"github.com/gorilla/websocket"
)
type wsMessageHandler struct {
logger lager.Logger
envelopeChannels []chan *loggregator_v2.Envelope
keepAlive time.Duration
lock *sync.Mutex
}
func NewWSMessageHandler(logger lager.Logger, envelopeChannels []chan *loggregator_v2.Envelope, keepAlive time.Duration) *wsMessageHandler {
return &wsMessageHandler{
logger: logger,
envelopeChannels: envelopeChannels,
keepAlive: keepAlive,
lock: &sync.Mutex{},
}
}
func (h *wsMessageHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(*http.Request) bool { return true },
}
ws, err := upgrader.Upgrade(rw, r, nil)
if err != nil {
h.logger.Error("serve-websocket-upgrade", err)
return
}
defer ws.Close()
closeCode, closeMessage := h.runWebsocketUntilClosed(ws)
err = ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeCode, closeMessage), time.Time{})
if err != nil {
h.logger.Error("serve-websocket-close", err)
return
}
}
func (h *wsMessageHandler) runWebsocketUntilClosed(ws *websocket.Conn) (closeCode int, closeMessage string) {
keepAliveExpired := make(chan struct{})
clientWentAway := make(chan struct{})
go func() {
for {
_, bytes, err := ws.ReadMessage()
if err != nil {
h.logger.Error("run-websocket-read-message", err)
close(clientWentAway)
return
}
var envelop loggregator_v2.Envelope
err = proto.Unmarshal(bytes, &envelop)
if err != nil {
h.logger.Error("run-websocket-unmarshal", err)
}
h.envelopeChannels[helpers.FNVHash(envelop.GetSourceId())%uint32(len(h.envelopeChannels))] <- &envelop
}
}()
go func() {
NewKeepAlive(h.lock, ws, h.keepAlive).Run()
close(keepAliveExpired)
}()
closeCode = websocket.CloseNormalClosure
closeMessage = ""
for {
select {
case <-clientWentAway:
return
case <-keepAliveExpired:
closeCode = websocket.ClosePolicyViolation
closeMessage = "Client did not respond to ping before keep-alive timeout expired."
return
}
}
}
type KeepAlive struct {
lock *sync.Mutex
conn *websocket.Conn
pongChan chan struct{}
keepAliveInterval time.Duration
}
func NewKeepAlive(lock *sync.Mutex, conn *websocket.Conn, keepAliveInterval time.Duration) *KeepAlive {
return &KeepAlive{
lock: lock,
conn: conn,
pongChan: make(chan struct{}, 1),
keepAliveInterval: keepAliveInterval,
}
}
func (k *KeepAlive) Run() {
k.lock.Lock()
k.conn.SetPongHandler(k.pongHandler)
k.lock.Unlock()
defer func() {
k.lock.Lock()
k.conn.SetPongHandler(nil)
k.lock.Unlock()
}()
timeout := time.NewTimer(k.keepAliveInterval)
for {
err := k.conn.WriteControl(websocket.PingMessage, nil, time.Time{})
if err != nil {
return
}
timeout.Reset(k.keepAliveInterval)
select {
case <-k.pongChan:
time.Sleep(k.keepAliveInterval / 2)
continue
case <-timeout.C:
return
}
}
}
func (k *KeepAlive) pongHandler(string) error {
select {
case k.pongChan <- struct{}{}:
default:
}
return nil
}