-
Notifications
You must be signed in to change notification settings - Fork 264
/
buffer.go
170 lines (150 loc) · 3.22 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package client
import (
"bytes"
"fmt"
"io"
"net"
"sync"
"time"
)
type limWriter struct {
w io.Writer
bufs []byte
limit int
pending *bytes.Buffer
plimit int
mu sync.Mutex
}
func (w *limWriter) appendString(str string) error {
return w.appendBufs([]byte(str))
}
func (w *limWriter) appendBufs(bufs ...[]byte) error {
w.mu.Lock()
for _, buf := range bufs {
if len(buf) == 0 {
continue
}
if w.pending != nil {
w.pending.Write(buf)
} else {
w.bufs = append(w.bufs, buf...)
}
}
if w.pending == nil && len(w.bufs) >= w.limit {
w.mu.Unlock()
return w.flush()
}
w.mu.Unlock()
return nil
}
func (w *limWriter) writeDirect(data []byte) error {
if _, err := w.w.Write(data); err != nil {
return err
}
return nil
}
func (w *limWriter) flush() error {
w.mu.Lock()
defer w.mu.Unlock()
// fmt.Println("flush--->", i)
// If a pending buffer is set, we don't flush. Code that needs to
// write directly to the socket, by-passing buffers during (re)connect,
// will use the writeDirect() API.
if w.pending != nil {
fmt.Println("w.pending != nil")
return nil
}
// Do not skip calling w.w.Write() here if len(w.bufs) is 0 because
// the actual writer (if websocket for instance) may have things
// to do such as sending control frames, etc..
_, err := w.w.Write(w.bufs)
w.bufs = w.bufs[:0]
return err
}
func (w *limWriter) buffered() int {
if w.pending != nil {
return w.pending.Len()
}
return len(w.bufs)
}
func (w *limWriter) switchToPending() {
w.pending = new(bytes.Buffer)
}
func (w *limWriter) flushPendingBuffer() error {
if w.pending == nil || w.pending.Len() == 0 {
return nil
}
_, err := w.w.Write(w.pending.Bytes())
// Reset the pending buffer at this point because we don't want
// to take the risk of sending duplicates or partials.
w.pending.Reset()
return err
}
func (w *limWriter) atLimitIfUsingPending() bool {
if w.pending == nil {
return false
}
return w.pending.Len() >= w.plimit
}
func (w *limWriter) doneWithPending() {
w.pending = nil
}
type timeoutWriter struct {
timeout time.Duration
conn net.Conn
err error
}
// Write implements the io.Writer interface.
func (tw *timeoutWriter) Write(p []byte) (int, error) {
if tw.err != nil {
return 0, tw.err
}
var n int
tw.conn.SetWriteDeadline(time.Now().Add(tw.timeout))
n, tw.err = tw.conn.Write(p)
tw.conn.SetWriteDeadline(time.Time{})
return n, tw.err
}
type limReader struct {
r io.Reader
buf []byte
off int
n int
}
func (r *limReader) doneWithConnect() {
}
func (r *limReader) Read() ([]byte, error) {
if r.off >= 0 {
off := r.off
r.off = -1
return r.buf[off:r.n], nil
}
var err error
r.n, err = r.r.Read(r.buf)
return r.buf[:r.n], err
}
func (r *limReader) ReadString(delim byte) (string, error) {
var s string
build_string:
// First look if we have something in the buffer
if r.off >= 0 {
i := bytes.IndexByte(r.buf[r.off:r.n], delim)
if i >= 0 {
end := r.off + i + 1
s += string(r.buf[r.off:end])
r.off = end
if r.off >= r.n {
r.off = -1
}
return s, nil
}
// We did not find the delim, so will have to read more.
s += string(r.buf[r.off:r.n])
r.off = -1
}
if _, err := r.Read(); err != nil {
return s, err
}
r.off = 0
goto build_string
}