forked from catsby/busl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sse.go
60 lines (48 loc) · 1.22 KB
/
sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package sse
import (
"bytes"
"fmt"
"io"
)
const (
id = "id: %d\n"
data = "data: %s\n"
)
type encoder struct {
reader io.Reader // stores the original reader
offset int64 // offset for Seek purposes
}
func NewEncoder(r io.Reader) io.Reader {
return &encoder{reader: r}
}
func (r *encoder) Seek(offset int64, whence int) (n int64, err error) {
if seeker, ok := r.reader.(io.ReadSeeker); ok {
r.offset, err = seeker.Seek(offset, whence)
} else {
// The underlying reader doesn't support seeking, but
// we should still update the offset so the IDs will
// properly reflect the adjusted offset.
r.offset += offset
}
return r.offset, err
}
// FIXME: this version is simplified and assumes
// that len(p) is always greater than the potential
// length of data to be read.
func (r *encoder) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if n > 0 {
buf := format(r.offset, p[:n])
r.offset += int64(n)
n = copy(p, buf)
}
return n, err
}
func format(pos int64, msg []byte) []byte {
buf := bytes.NewBufferString(fmt.Sprintf(id, pos+int64(len(msg))))
for _, line := range bytes.Split(msg, []byte{'\n'}) {
buf.WriteString(fmt.Sprintf(data, line))
}
buf.Write([]byte{'\n'})
return buf.Bytes()
}