/
reader.go
70 lines (57 loc) · 1.04 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package jsonstream
import (
"bufio"
"encoding/json"
"io"
)
type Reader struct {
r io.Reader
s *bufio.Scanner
}
func NewReader(r io.Reader) *Reader {
reader := &Reader{
r: r,
s: bufio.NewScanner(r),
}
return reader
}
func (r *Reader) Next() (Message, error) {
for r.s.Scan() {
if len(r.s.Bytes()) == 0 {
continue
}
var msg partialMsg
err := json.Unmarshal(r.s.Bytes(), &msg)
if err != nil {
return nil, err
}
switch msg.Type {
case "log":
var logMsg LogMsg
err = json.Unmarshal(msg.Data, &logMsg)
if err != nil {
return nil, err
}
if logMsg.Attrs == nil {
logMsg.Attrs = make(map[string]interface{})
}
return &logMsg, nil
case "header":
var headerMsg HeaderMsg
err := json.Unmarshal(msg.Data, &headerMsg)
if err != nil {
return nil, err
}
if headerMsg.Headers == nil {
headerMsg.Headers = make(map[string]interface{})
}
return &headerMsg, nil
default:
return nil, ErrUnknownType
}
}
if r.s.Err() != nil {
return nil, r.s.Err()
}
return nil, io.EOF
}