forked from HuaweiTech/lattice
-
Notifications
You must be signed in to change notification settings - Fork 0
/
read_closer.go
122 lines (99 loc) · 2.19 KB
/
read_closer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package sse
import (
"bufio"
"bytes"
"errors"
"io"
"strconv"
"time"
)
type ReadCloser struct {
lastID string
buf *bufio.Reader
closeSource func() error
closed bool
}
func NewReadCloser(source io.ReadCloser) *ReadCloser {
return &ReadCloser{
closeSource: func() error { return source.Close() },
buf: bufio.NewReader(source),
}
}
var alreadyClosedError = errors.New("ReadCloser already closed")
func (rc *ReadCloser) Close() error {
if rc.closed {
return alreadyClosedError
}
rc.closed = true
return rc.closeSource()
}
func (rc *ReadCloser) Next() (Event, error) {
var event Event
// event ID defaults to last ID per the spec
event.ID = rc.lastID
// if an empty id is explicitly given, it sets the value and resets the last
// id; track its presence with a bool to distinguish between zero-value
idPresent := false
prefix := []byte{}
for {
line, isPrefix, err := rc.buf.ReadLine()
if err != nil {
return Event{}, err
}
line = append(prefix, line...)
if isPrefix {
prefix = line
continue
} else {
prefix = []byte{}
}
// empty line; dispatch event
if len(line) == 0 {
if len(event.Data) == 0 {
// event had no data; skip it per the spec
continue
}
if idPresent {
// record last ID
rc.lastID = event.ID
}
// trim terminating linebreak
event.Data = event.Data[0 : len(event.Data)-1]
// dispatch event
return event, nil
}
if line[0] == ':' {
// comment; skip
continue
}
var field, value string
segments := bytes.SplitN(line, []byte(":"), 2)
if len(segments) == 1 {
// line with no colon is just the field, with empty value
field = string(segments[0])
} else {
field = string(segments[0])
value = string(segments[1])
}
if len(value) > 0 {
// trim only a single leading space
if value[0] == ' ' {
value = value[1:]
}
}
switch field {
case "id":
idPresent = true
event.ID = value
case "event":
event.Name = value
case "data":
event.Data = append(event.Data, []byte(value+"\n")...)
case "retry":
retryInMS, err := strconv.Atoi(value)
if err == nil {
event.Retry = time.Duration(retryInMS) * time.Millisecond
}
}
}
}