forked from nulpunt/nulpunt
/
reader.go
137 lines (116 loc) · 3.19 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package snappystream
import (
"bytes"
"code.google.com/p/snappy-go/snappy"
"errors"
"fmt"
"hash/crc32"
"io"
)
type reader struct {
reader io.Reader
verifyChecksum bool
buf bytes.Buffer
hdr []byte
src []byte
dst []byte
}
// NewReader returns an io.Reader interface to the snappy framed stream format.
//
// It transparently handles reading the stream identifier (but does not proxy this
// to the caller), decompresses blocks, and (optionally) validates checksums.
//
// Internally, three buffers are maintained. The first two are for reading
// off the wrapped io.Reader and for holding the decompressed block (both are grown
// automatically and re-used and will never exceed the largest block size, 65536). The
// last buffer contains the *unread* decompressed bytes (and can grow indefinitely).
//
// The second param determines whether or not the reader will verify block
// checksums and can be enabled/disabled with the constants VerifyChecksum and SkipVerifyChecksum
//
// For each Read, the returned length will be up to the lesser of len(b) or 65536
// decompressed bytes, regardless of the length of *compressed* bytes read
// from the wrapped io.Reader.
func NewReader(r io.Reader, verifyChecksum bool) io.Reader {
return &reader{
reader: r,
verifyChecksum: verifyChecksum,
hdr: make([]byte, 4),
src: make([]byte, 4096),
dst: make([]byte, 4096),
}
}
func (r *reader) Read(b []byte) (int, error) {
if r.buf.Len() < len(b) {
err := r.nextFrame()
if err != nil {
return 0, err
}
}
return r.buf.Read(b)
}
func (r *reader) nextFrame() error {
for {
_, err := io.ReadFull(r.reader, r.hdr)
if err != nil {
return err
}
buf, err := r.readBlock()
if err != nil {
return err
}
switch r.hdr[0] {
case 0x00, 0x01:
// compressed or uncompressed bytes
// first 4 bytes are the little endian crc32 checksum
checksum := unmaskChecksum(uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24)
b := buf[4:]
if r.hdr[0] == 0x00 {
// compressed bytes
r.dst, err = snappy.Decode(r.dst, b)
if err != nil {
return err
}
b = r.dst
}
if r.verifyChecksum {
actualChecksum := crc32.Checksum(b, crcTable)
if checksum != actualChecksum {
return errors.New(fmt.Sprintf("invalid checksum %x != %x", checksum, actualChecksum))
}
}
_, err = r.buf.Write(b)
return err
case 0xff:
// stream identifier
if !bytes.Equal(buf, []byte{0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) {
return errors.New("invalid stream ID")
}
// continue...
default:
return errors.New("invalid frame identifier")
}
}
panic("should never happen")
}
func (r *reader) readBlock() ([]byte, error) {
// 3 byte little endian length
length := uint32(r.hdr[1]) | uint32(r.hdr[2])<<8 | uint32(r.hdr[3])<<16
// +4 for checksum
if length > (MaxBlockSize + 4) {
return nil, errors.New(fmt.Sprintf("block too large %d > %d", length, (MaxBlockSize + 4)))
}
if int(length) > len(r.src) {
r.src = make([]byte, length)
}
buf := r.src[:length]
_, err := io.ReadFull(r.reader, buf)
if err != nil {
return nil, err
}
return buf, nil
}
func unmaskChecksum(c uint32) uint32 {
x := c - 0xa282ead8
return ((x >> 17) | (x << 15))
}