/
reader.go
123 lines (100 loc) · 2.5 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package model
import (
"bytes"
"encoding/binary"
"io"
"unsafe"
"github.com/sirupsen/logrus"
)
// NewFrameReader returns a channel of Frames. The channel is closed whenever
// there are no other frames or the FrameReader encounter an error reading a frame
func NewMultiReader(r []io.Reader) <-chan Frame {
chframe := make(chan Frame)
go func() {
defer close(chframe)
windex := 0
for windex < len(r) {
frame, err := ReadFrame(r[windex])
if err != nil || !CheckVersion(frame.Header) {
windex++
if windex >= len(r) {
break
}
continue
}
chframe <- *frame
}
logrus.Infof("Frames ended")
}()
return chframe
}
// ReadAll reads all the Collection (Header, Frame*) and returns in a compound
// structure.
// NOTE: the NewFrameReader streaming implementation should be preferred
func ReadAll(r io.Reader) *Collection {
frames := make([]*Frame, 0)
for {
frame, err := ReadFrame(r)
if err != nil {
break
}
frames = append(frames, frame)
}
return &Collection{
Data: frames,
}
}
func readNextBytes(reader io.Reader, number int64) ([]byte, error) {
bytes := make([]byte, number)
_, err := reader.Read(bytes)
if err != nil {
return nil, err
}
return bytes, nil
}
// ReadFrameHeader reads the next FrameHeader from the reader
func ReadFrameHeader(r io.Reader) (*FrameHeader, error) {
header := &FrameHeader{}
// read the frame Size
data, err := readNextBytes(r, int64(unsafe.Sizeof(*header)))
if err != nil {
return nil, err
}
buffer := bytes.NewBuffer(data)
err = binary.Read(buffer, binary.BigEndian, header)
if err != nil {
return nil, err
}
logrus.Debugf("ReadFrame: frame.Header %d", header)
return header, nil
}
// ReadFrame reads the next frame from the Reader or returns an error in
// case it cannot interpret the Frame
func ReadFrame(r io.Reader) (frame *Frame, err error) {
defer func() {
if e := recover(); e != nil {
if e.(error).Error() != "EOF" {
logrus.Errorf("Errors occured while reading frame %v, MESSAGE: %v", frame.NameString, e)
}
}
}()
frame = NewEmptyFrame()
frame.Header, err = ReadFrameHeader(r)
if err != nil {
panic(err)
}
// generate the correct framesize for .Data
frame.Data = make([]byte, frame.Header.Size)
// read the frame Data
data, err := readNextBytes(r, int64(len(frame.Data)))
if err != nil {
panic(err)
}
buffer := bytes.NewBuffer(data)
err = binary.Read(buffer, binary.BigEndian, frame.Data)
if err != nil {
panic(err)
}
logrus.Debugf("ReadFrame: frame.Data %d", frame.Data)
return
}