/
session.go
345 lines (293 loc) · 8.84 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
package client
import (
"errors"
conn "github.com/inconshreveable/go-tunnel/conn"
log "github.com/inconshreveable/go-tunnel/log"
proto "github.com/inconshreveable/go-tunnel/proto"
muxado "github.com/inconshreveable/muxado"
"net"
"strconv"
"strings"
"sync"
)
type rawSession interface {
Auth(string, interface{}) (*proto.AuthResp, error)
Listen(string, interface{}, interface{}) (*proto.BindResp, error)
Unlisten(string) (*proto.UnbindResp, error)
Accept() (conn.Conn, error)
log.Logger
}
// A RawSession is a client session which handles authorization with the tunnel server, then
// listening and unlistening of tunnels.
//
// When RawSession.Accept() returns an error, that means the session is dead.
// Client sessions run over a muxado session.
type RawSession struct {
mux muxado.Session // the muxado session we're multiplexing streams over
log.Logger // logger for this client
id string // session id, allows for resuming existing sessions
}
// Creates a new client tunnel session with the given id
// running over the given muxado session.
func NewRawSession(mux muxado.Session) *RawSession {
sess := &RawSession{
mux: mux,
Logger: log.NewTaggedLogger("session"),
}
return sess
}
// Auth sends an authentication message to the server and returns the server's response.
// The id string will be empty unless reconnecting an existing session.
// extra is an opaque struct useful for passing application-specific data.
func (s *RawSession) Auth(id string, extra interface{}) (resp *proto.AuthResp, err error) {
req := &proto.Auth{
ClientId: id,
Extra: extra,
Version: []string{proto.Version},
}
resp = new(proto.AuthResp)
if err = s.req("auth", req, resp); err != nil {
return
}
// set client id / log tag only if it changed
if s.id != resp.ClientId {
s.id = resp.ClientId
s.Logger.AddTags(s.id)
}
return
}
// Listen sends a listen message to the server and returns the server's response
// protocol is the requested protocol to listen.
// opts are protocol-specific options for listening.
// extra is an opaque struct useful for passing application-specific data.
func (s *RawSession) Listen(protocol string, opts interface{}, extra interface{}) (resp *proto.BindResp, err error) {
req := &proto.Bind{
Protocol: protocol,
Options: opts,
Extra: extra,
}
resp = new(proto.BindResp)
err = s.req("listen", req, resp)
return
}
// Unlisten sends an unlisten message to the server and returns the server's response.
// url is the url of the open, bound tunnel to unlisten
func (s *RawSession) Unlisten(url string) (resp *proto.UnbindResp, err error) {
req := &proto.Unbind{Url: url}
resp = new(proto.UnbindResp)
err = s.req("unlisten", req, resp)
return
}
// Accept returns the next stream initiated by the server over the underlying muxado session
func (s *RawSession) Accept() (conn.Conn, error) {
raw, err := s.mux.Accept()
if err != nil {
return nil, err
}
return conn.Wrap(raw, "proxy", s.id), nil
}
func (s *RawSession) req(tag string, req interface{}, resp interface{}) (err error) {
stream, err := s.mux.Open()
if err != nil {
return
}
defer stream.Close()
// log what happens on the stream
c := conn.Wrap(stream, tag, s.id)
// send the unlisten request
if err = proto.WriteMsg(c, req); err != nil {
return
}
// read out the unlisten response
if err = proto.ReadMsgInto(c, resp); err != nil {
return
}
return
}
// Session is a higher-level client session interface. You will almost always prefer this over
// RawSession.
//
// Unlike RawSession, when you listen a new tunnel on Session, you are returned a Tunnel
// object which allows you to recieve new connections from that listen.
type Session struct {
raw rawSession
sync.RWMutex
tunnels map[string]*Tunnel
}
func NewSession(mux muxado.Session) *Session {
s := &Session{
raw: NewRawSession(mux),
tunnels: make(map[string]*Tunnel),
}
go s.receive()
return s
}
func (s *Session) Auth(id string, extra interface{}) error {
resp, err := s.raw.Auth(id, extra)
if err != nil {
return err
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
}
// Listen negotiates with the server to create a new remote listen for the given protocol
// and options. It returns a *Tunnel on success from which the caller can accept new
// connections over the listen.
//
// Applications will typically prefer to call the protocol-specific methods like
// ListenHTTP, ListenTCP, etc.
func (s *Session) Listen(protocol string, opts interface{}, extra interface{}) (*Tunnel, error) {
resp, err := s.raw.Listen(protocol, opts, extra)
if err != nil {
return nil, err
}
// process application-level error
if resp.Error != "" {
return nil, errors.New(resp.Error)
}
// if you asked for a random domain or random port, remember the value
// the server assigned you for reconnection cases
switch o := opts.(type) {
case *proto.HTTPOptions:
if o.Subdomain == "" && o.Hostname == "" {
o.Hostname = strings.Split(resp.Url, "://")[1]
}
case *proto.TCPOptions:
if o.RemotePort == 0 {
parts := strings.Split(resp.Url, ":")
portString := parts[len(parts)-1]
port, err := strconv.ParseUint(portString, 10, 16)
if err != nil {
return nil, err
}
o.RemotePort = uint16(port)
}
}
// make tunnel
t := &Tunnel{
url: resp.Url,
bindOpts: opts,
bindExtra: extra,
bindResp: resp,
sess: s,
accept: make(chan conn.Conn),
proto: protocol,
}
// add to tunnel registry
s.addTunnel(resp.Url, t)
return t, nil
}
// ListenHTTP listens on a new HTTP endpoint and returns a *Tunnel which accepts connections on the remote listener.
func (s *Session) ListenHTTP(opts *proto.HTTPOptions, extra interface{}) (*Tunnel, error) {
return s.Listen("http", opts, extra)
}
// ListenHTTP listens on a new HTTPS endpoint and returns a *Tunnel which accepts connections on the remote listener.
func (s *Session) ListenHTTPS(opts *proto.HTTPOptions, extra interface{}) (*Tunnel, error) {
return s.Listen("https", opts, extra)
}
// ListenHTTPAndHTTPS listens a new HTTP and HTTPS endpoint on the same hostname. It returns a two *Tunnel objects which accept connections on the remote HTTP and HTTPS listens, respectively.
func (s *Session) ListenHTTPAndHTTPS(opts *proto.HTTPOptions, extra interface{}) (*Tunnel, *Tunnel, error) {
t1, err := s.Listen("http", opts, extra)
if err != nil {
return nil, nil, err
}
// the first Listen call for "http" will transform opts to be deterministic if the caller
// asked for random
t2, err := s.Listen("https", opts, extra)
if err != nil {
t1.Close()
return nil, nil, err
}
return t1, t2, nil
}
// ListenTLS listens on a new TCP endpoint and returns a *Tunnel which accepts connections on the remote listener.
func (s *Session) ListenTCP(opts *proto.TCPOptions, extra interface{}) (*Tunnel, error) {
return s.Listen("tcp", opts, extra)
}
// ListenTLS listens on a new TLS endpoint and returns a *Tunnel which accepts connections on the remote listener.
func (s *Session) ListenTLS(opts *proto.TLSOptions, extra interface{}) (*Tunnel, error) {
return s.Listen("tls", opts, extra)
}
func (s *Session) receive() {
handleProxy := func(proxy conn.Conn) {
// read out the proxy message
var startPxy proto.StartProxy
if err := proto.ReadMsgInto(proxy, &startPxy); err != nil {
proxy.Error("Server failed to write StartProxy: %v", err)
proxy.Close()
return
}
// wrap connection so that it has a proper RemoteAddr()
proxy = &proxyConn{Conn: proxy, remoteAddr: &proxyAddr{startPxy.ClientAddr}}
// find tunnel
tunnel, ok := s.getTunnel(startPxy.Url)
if !ok {
proxy.Error("Couldn't find tunnel for proxy: %s", startPxy.Url)
proxy.Close()
return
}
// deliver proxy connection
tunnel.accept <- proxy
}
for {
// accept the next proxy connection
proxy, err := s.raw.Accept()
if err != nil {
s.raw.Error("Client accept error: %v", err)
s.RLock()
for _, t := range s.tunnels {
go t.Close()
}
s.RUnlock()
return
}
go handleProxy(proxy)
}
}
func (s *Session) unlisten(t *Tunnel) error {
// delete tunnel
s.delTunnel(t.url)
// ask server to unlisten
resp, err := s.raw.Unlisten(t.url)
if err != nil {
return err
}
if resp.Error != "" {
return s.raw.Error("Server failed to unlisten tunnel: %v", resp.Error)
}
return nil
}
func (s *Session) getTunnel(url string) (t *Tunnel, ok bool) {
s.RLock()
defer s.RUnlock()
t, ok = s.tunnels[url]
return
}
func (s *Session) addTunnel(url string, t *Tunnel) {
s.Lock()
defer s.Unlock()
s.tunnels[url] = t
}
func (s *Session) delTunnel(url string) {
s.Lock()
defer s.Unlock()
delete(s.tunnels, url)
}
type proxyConn struct {
conn.Conn
remoteAddr net.Addr
}
func (c *proxyConn) RemoteAddr() net.Addr {
return c.remoteAddr
}
type proxyAddr struct {
addr string
}
func (a *proxyAddr) String() string {
return a.addr
}
func (a *proxyAddr) Network() string {
return "tcp"
}