/
skipReader.go
114 lines (93 loc) · 2.49 KB
/
skipReader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package obfuscator
import (
"bytes"
"io"
"net"
"github.com/astaguna/popon-core/psiphon/common/errors"
)
type SkipReader struct {
net.Conn
offset int // buf offset for next Read
end int // buf end index for next Read
buf []byte
}
func WrapConnWithSkipReader(conn net.Conn) net.Conn {
return &SkipReader{
Conn: conn,
offset: 0,
end: 0,
buf: nil,
}
}
func (sr *SkipReader) Read(b []byte) (int, error) {
// read buffered bytes first
if sr.offset < sr.end {
n := copy(b, sr.buf[sr.offset:sr.end])
if n == 0 {
// should never happen if len(b) > 0
return 0, errors.TraceNew("read failed")
}
sr.offset += n
// clear resources if all buffered bytes are read
if sr.offset == sr.end {
sr.offset = 0
sr.end = 0
sr.buf = nil
}
return n, nil
}
return sr.Conn.Read(b)
}
// SkipUpToToken reads from the underlying conn initially len(token) bytes,
// and then readSize bytes at a time up to maxSearchSize until token is found,
// or error. If the token is found, stream is rewound to end of the token.
//
// Note that maxSearchSize is not a strict limit on the total number of bytes read.
func (sr *SkipReader) SkipUpToToken(
token []byte, readSize, maxSearchSize int) error {
if len(token) == 0 {
return nil
}
if readSize < 1 {
return errors.TraceNew("readSize too small")
}
if maxSearchSize < readSize {
return errors.TraceNew("maxSearchSize too small")
}
sr.offset = 0
sr.end = 0
sr.buf = make([]byte, readSize+len(token))
// Reads at least len(token) bytes.
nTotal, err := io.ReadFull(sr.Conn, sr.buf[:len(token)])
if err == io.ErrUnexpectedEOF {
return errors.TraceNew("token not found")
}
if err != nil {
return err
}
if bytes.Equal(sr.buf[:len(token)], token) {
return nil
}
for nTotal < maxSearchSize {
// The underlying conn is read into buf[len(token):].
// buf[:len(token)] stores bytes from the previous read.
n, err := sr.Conn.Read(sr.buf[len(token):])
if err != nil && err != io.EOF {
return err
}
if idx := bytes.Index(sr.buf[:n+len(token)], token); idx != -1 {
// Found match, sets offset and end for next Read to start after the token.
sr.offset = idx + len(token)
sr.end = n + len(token)
return err
}
if err == io.EOF {
// Reached the end of stream, token not found.
return errors.TraceNew("token not found")
}
// Copies last len(token) bytes to the beginning of the buffer.
copy(sr.buf, sr.buf[n:n+len(token)])
nTotal += n
}
return errors.TraceNew("exceeded max search size")
}