-
Notifications
You must be signed in to change notification settings - Fork 1
/
sse.go
77 lines (66 loc) · 1.81 KB
/
sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package openai
import (
"bufio"
"bytes"
"errors"
"io"
)
var (
dataPrefixBytes = []byte("data:")
eventPrefixBytes = []byte("event:")
idPrefixBytes = []byte("id:")
bomBytes = []byte{0xEF, 0xBB, 0xBF}
errCloseEventStream = errors.New("close event stream")
)
// Note: this version doesn't handle LF line terminators because the chance
// of OpenAI using those is nil. Retry fields have also been removed.
func parseEventStream(r io.Reader, maxSize int, f func(id, event string, data []byte) error, retryf func(ms uint64)) error {
scanner := bufio.NewScanner(r)
// Use a stack-allocated buffer if we can fit lines into it
var data [512]byte
scanner.Buffer(data[:], maxSize)
var id, event string
var dataBuf bytes.Buffer
for scanner.Scan() {
line := scanner.Bytes()
line = bytes.TrimPrefix(line, bomBytes)
if len(line) == 0 {
if dataBuf.Len() > 0 {
data := stripTrailingLF(dataBuf.Bytes())
if err := f(id, event, data); err != nil {
if err == errCloseEventStream {
err = nil
}
return err
}
id = ""
event = ""
dataBuf.Reset()
}
} else if data, ok := bytes.CutPrefix(line, dataPrefixBytes); ok {
dataBuf.Write(stripLeadingSpace(data))
dataBuf.WriteByte('\n')
} else if data, ok := bytes.CutPrefix(line, eventPrefixBytes); ok {
event = string(stripLeadingSpace(data))
} else if data, ok := bytes.CutPrefix(line, idPrefixBytes); ok {
id = string(stripLeadingSpace(data))
} // ignore unknown lines
}
// incomplete events are discarded
return scanner.Err()
}
func stripLeadingSpace(data []byte) []byte {
if len(data) > 0 && data[0] == ' ' {
return data[1:]
} else {
return data
}
}
func stripTrailingLF(data []byte) []byte {
n := len(data)
if n > 0 && data[n-1] == '\n' {
return data[:n-1]
} else {
return data
}
}