-
Notifications
You must be signed in to change notification settings - Fork 0
/
dialer.go
146 lines (126 loc) · 3.47 KB
/
dialer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package wsutil
import (
"bufio"
"bytes"
"context"
"io"
"io/ioutil"
"net"
"net/http"
"github.com/gobwas/ws"
)
// DebugDialer is a wrapper around ws.Dialer. It tracks i/o of WebSocket
// handshake. That is, it gives ability to receive copied HTTP request and
// response bytes that made inside Dialer.Dial().
//
// Note that it must not be used in production applications that requires
// Dial() to be efficient.
type DebugDialer struct {
// Dialer contains WebSocket connection establishment options.
Dialer ws.Dialer
// OnRequest and OnResponse are the callbacks that will be called with the
// HTTP request and response respectively.
OnRequest, OnResponse func([]byte)
}
// Dial connects to the url host and upgrades connection to WebSocket. It makes
// it by calling d.Dialer.Dial().
func (d *DebugDialer) Dial(ctx context.Context, urlstr string) (conn net.Conn, br *bufio.Reader, hs ws.Handshake, err error) {
// Need to copy Dialer to prevent original object mutation.
dialer := d.Dialer
var (
reqBuf bytes.Buffer
resBuf bytes.Buffer
resContentLength int64
)
userWrap := dialer.WrapConn
dialer.WrapConn = func(c net.Conn) net.Conn {
if userWrap != nil {
c = userWrap(c)
}
// Save the pointer to the raw connection.
conn = c
var (
r io.Reader = conn
w io.Writer = conn
)
if d.OnResponse != nil {
r = &prefetchResponseReader{
source: conn,
buffer: &resBuf,
contentLength: &resContentLength,
}
}
if d.OnRequest != nil {
w = io.MultiWriter(conn, &reqBuf)
}
return rwConn{conn, r, w}
}
_, br, hs, err = dialer.Dial(ctx, urlstr)
if onRequest := d.OnRequest; onRequest != nil {
onRequest(reqBuf.Bytes())
}
if onResponse := d.OnResponse; onResponse != nil {
// We must split response inside buffered bytes from other received
// bytes from server.
p := resBuf.Bytes()
n := bytes.Index(p, headEnd)
h := n + len(headEnd) // Head end index.
n = h + int(resContentLength) // Body end index.
onResponse(p[:n])
if br != nil {
// If br is non-nil, then it mean two things. First is that
// handshake is OK and server has sent additional bytes – probably
// immediate sent frames (or weird but possible response body).
// Second, the bad one, is that br buffer's source is now rwConn
// instance from above WrapConn call. It is incorrect, so we must
// fix it.
var r io.Reader = conn
if len(p) > h {
// Buffer contains more than just HTTP headers bytes.
r = io.MultiReader(
bytes.NewReader(p[h:]),
conn,
)
}
br.Reset(r)
// Must make br.Buffered() to be non-zero.
br.Peek(len(p[h:]))
}
}
return conn, br, hs, err
}
type rwConn struct {
net.Conn
r io.Reader
w io.Writer
}
func (rwc rwConn) Read(p []byte) (int, error) {
return rwc.r.Read(p)
}
func (rwc rwConn) Write(p []byte) (int, error) {
return rwc.w.Write(p)
}
var headEnd = []byte("\r\n\r\n")
type prefetchResponseReader struct {
source io.Reader // Original connection source.
reader io.Reader // Wrapped reader used to read from by clients.
buffer *bytes.Buffer
contentLength *int64
}
func (r *prefetchResponseReader) Read(p []byte) (int, error) {
if r.reader == nil {
resp, err := http.ReadResponse(bufio.NewReader(
io.TeeReader(r.source, r.buffer),
), nil)
if err == nil {
*r.contentLength, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
bts := r.buffer.Bytes()
r.reader = io.MultiReader(
bytes.NewReader(bts),
r.source,
)
}
return r.reader.Read(p)
}