forked from percona/qan-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ws_server.go
125 lines (109 loc) · 2.83 KB
/
ws_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
Copyright (c) 2016, Percona LLC and/or its affiliates. All rights reserved.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package mock
import (
"golang.org/x/net/websocket"
"log"
"net/http"
)
type WebsocketServer struct {
}
var SendChan chan interface{}
var RecvChan chan interface{}
// addr: http://127.0.0.1:8000
// endpoint: /agent
func (s *WebsocketServer) Run(addr string, endpoint string) {
go run()
http.Handle(endpoint, websocket.Handler(wsHandler))
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatal("ListenAndServe:", err)
}
}
type client struct {
ws *websocket.Conn
origin string
SendChan chan interface{} // data to client
RecvChan chan interface{} // data from client
}
func wsHandler(ws *websocket.Conn) {
c := &client{
ws: ws,
origin: ws.Config().Origin.String(),
SendChan: make(chan interface{}, 5),
RecvChan: make(chan interface{}, 5),
}
internalClientConnectChan <- c
defer func() {
ClientRmChan <- c
ClientDisconnectChan <- c
}()
go c.send()
c.recv()
}
func (c *client) recv() {
defer c.ws.Close()
for {
var data interface{}
err := websocket.JSON.Receive(c.ws, &data)
if err != nil {
//log.Printf("ERROR: recv: %s\n", err)
break
}
//log.Printf("recv: %+v\n", data)
c.RecvChan <- data
}
}
func (c *client) send() {
defer c.ws.Close()
for data := range c.SendChan {
// log.Printf("recv: %+v\n", data)
err := websocket.JSON.Send(c.ws, data)
if err != nil {
break
}
}
}
var internalClientConnectChan = make(chan *client)
var ClientConnectChan = make(chan *client, 1)
var ClientDisconnectChan = make(chan *client)
var ClientRmChan = make(chan *client, 5)
var Clients = make(map[*client]*client)
func DisconnectClient(c *client) {
c, ok := Clients[c]
if ok {
close(c.SendChan)
c.ws.Close()
//log.Printf("disconnect: %+v\n", c)
<-ClientDisconnectChan
}
}
func run() {
for {
select {
case c := <-internalClientConnectChan:
// todo: this is probably prone to deadlocks, not thread-safe
Clients[c] = c
// log.Printf("connect: %+v\n", c)
select {
case ClientConnectChan <- c:
default:
}
case c := <-ClientRmChan:
if _, ok := Clients[c]; ok {
//log.Printf("remove : %+v\n", c)
delete(Clients, c)
}
}
}
}