-
-
Notifications
You must be signed in to change notification settings - Fork 131
/
websocket.go
131 lines (111 loc) · 2.5 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package transport
import (
"bytes"
"io"
"net"
"net/http"
"nhooyr.io/websocket"
"github.com/gotd/td/internal/mtproxy/obfuscated2"
"github.com/gotd/td/internal/proto/codec"
"github.com/gotd/td/internal/tdsync"
"github.com/gotd/td/internal/wsutil"
)
type wsListener struct {
addr string
ch chan *wsServerConn
closed *tdsync.Ready
}
// WebsocketListener creates new MTProto Websocket listener.
func WebsocketListener(addr string) (net.Listener, http.Handler) {
l := wsListener{
addr: addr,
ch: make(chan *wsServerConn, 1),
closed: tdsync.NewReady(),
}
return l, l
}
func (l wsListener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
wsConn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
Subprotocols: []string{"binary"},
})
if err != nil {
w.WriteHeader(400)
return
}
defer func() {
_ = wsConn.Close(websocket.StatusNormalClosure, "Close")
}()
conn := wsutil.NetConn(wsConn, wsutil.Addr("localhost"), wsutil.Addr(r.RemoteAddr))
rw, md, err := obfuscated2.Accept(conn, nil)
if err != nil {
w.WriteHeader(400)
return
}
var tag *bytes.Reader
if md.Protocol[0] == codec.AbridgedClientStart[0] {
// Abridged sends only byte for tag.
tag = bytes.NewReader(md.Protocol[:1])
} else {
tag = bytes.NewReader(md.Protocol[:])
}
accepted := &wsServerConn{
closed: *tdsync.NewReady(),
// Add codec tag in the begin of stream to emulate TCP fully.
// MTProto sends codec tag in plain TCP connections, but not in obfuscated2 (Websocket/MTProxy).
reader: io.MultiReader(tag, rw),
writer: rw,
Conn: conn,
}
reqCtx := r.Context().Done()
closed := l.closed.Ready()
// Pass connection to the Accept().
select {
case <-reqCtx:
return
case <-closed:
return
case l.ch <- accepted:
}
// Await close or shutdown.
select {
case <-reqCtx:
return
case <-closed:
return
case <-accepted.closed.Ready():
}
}
func (l wsListener) Accept() (net.Conn, error) {
r := l.closed.Ready()
for {
select {
case <-r:
return nil, net.ErrClosed
case conn := <-l.ch:
return conn, nil
}
}
}
func (l wsListener) Close() error {
l.closed.Signal()
return nil
}
func (l wsListener) Addr() net.Addr {
return wsutil.Addr(l.addr)
}
type wsServerConn struct {
closed tdsync.Ready
reader io.Reader
writer io.Writer
net.Conn
}
func (c *wsServerConn) Read(p []byte) (int, error) {
return c.reader.Read(p)
}
func (c *wsServerConn) Write(p []byte) (int, error) {
return c.writer.Write(p)
}
func (c *wsServerConn) Close() error {
c.closed.Signal()
return nil
}