Skip to content

Commit

Permalink
[azopenai] Switch EventReader from using bufio.Scanner to bufio.Reader (
Browse files Browse the repository at this point in the history
#22703)

bufio.Scanner has an implicit max size for it's internal buffer but SSE has no restriction on how large chunks can be.

We need to allow for arbitrarily large chunks and, luckily, bufio.Reader can already handle that.
  • Loading branch information
richardpark-msft committed Apr 5, 2024
1 parent 8f702b9 commit 18e40d0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 14 deletions.
2 changes: 2 additions & 0 deletions sdk/ai/azopenai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- EventReader can now handle chunks of text larger than 64k. Thank you @ChrisTrenkamp for finding the issue and suggesting a fix. (PR#22703)

### Other Changes

## 0.5.1 (2024-04-02)
Expand Down
31 changes: 18 additions & 13 deletions sdk/ai/azopenai/event_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,33 @@ import (

// EventReader streams events dynamically from an OpenAI endpoint.
type EventReader[T any] struct {
reader io.ReadCloser // Required for Closing
scanner *bufio.Scanner
reader io.ReadCloser // Required for Closing
bufioReader *bufio.Reader
zeroT T
}

func newEventReader[T any](r io.ReadCloser) *EventReader[T] {
return &EventReader[T]{reader: r, scanner: bufio.NewScanner(r)}
return &EventReader[T]{
reader: r,
bufioReader: bufio.NewReader(r),
}
}

// Read reads the next event from the stream.
// Returns io.EOF when there are no further events.
func (er *EventReader[T]) Read() (T, error) {
// https://html.spec.whatwg.org/multipage/server-sent-events.html
for er.scanner.Scan() { // Scan while no error
line := er.scanner.Text() // Get the line & interpret the event stream:

for {
line, err := er.bufioReader.ReadString('\n')

if err != nil {
if errors.Is(err, io.EOF) {
return er.zeroT, errors.New("incomplete stream")
}

return er.zeroT, err
}

if line == "" || line[0] == ':' { // If the line is blank or is a comment, skip it
continue
Expand All @@ -52,14 +65,6 @@ func (er *EventReader[T]) Read() (T, error) {
// Unreachable
}
}

scannerErr := er.scanner.Err()

if scannerErr == nil {
return *new(T), errors.New("incomplete stream")
}

return *new(T), scannerErr
}

// Close closes the EventReader and any applicable inner stream state.
Expand Down
29 changes: 28 additions & 1 deletion sdk/ai/azopenai/event_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package azopenai

import (
"fmt"
"io"
"strings"
"testing"
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestEventReader_BadReader(t *testing.T) {
}

func TestEventReader_StreamIsClosedBeforeDone(t *testing.T) {
buff := strings.NewReader("data: {}")
buff := strings.NewReader("data: {}\n")

eventReader := newEventReader[ChatCompletions](io.NopCloser(buff))

Expand Down Expand Up @@ -75,3 +76,29 @@ func TestEventReader_SpacesAroundAreas(t *testing.T) {
require.NotEmpty(t, evt)
require.Equal(t, "without-spaces", *evt.Choices[0].Delta.Content)
}

func TestEventReader_CanReadHugeChunk(t *testing.T) {
// Ran into this with a customer that gets _huge_ chunks of text in their stream:
// https://github.com/Azure/azure-sdk-for-go/pull/22646
// bufio.Scanner has a limitation of 64k (which is huge, but not big enough).

bigBytes := make([]byte, 64*1024+1)

for i := 0; i < len(bigBytes); i++ {
bigBytes[i] = 'A'
}

buff := strings.NewReader(
fmt.Sprintf("data: {\"name\":\"chatcmpl-7Z4kUpXX6HN85cWY28IXM4EwemLU3\",\"object\":\"chat.completion.chunk\",\"created\":1688594090,\"model\":\"gpt-4-0613\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"%s\"},\"finish_reason\":null}]}\n", string(bigBytes)) + fmt.Sprintf("data: {\"name\":\"chatcmpl-7Z4kUpXX6HN85cWY28IXM4EwemLU3\",\"object\":\"chat.completion.chunk\",\"created\":1688594090,\"model\":\"gpt-4-0613\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"%s\"},\"finish_reason\":null}]}\n", "small message"),
)

eventReader := newEventReader[ChatCompletions](io.NopCloser(buff))

evt, err := eventReader.Read()
require.NoError(t, err)
require.Equal(t, string(bigBytes), *evt.Choices[0].Delta.Content)

evt, err = eventReader.Read()
require.NoError(t, err)
require.Equal(t, "small message", *evt.Choices[0].Delta.Content)
}

0 comments on commit 18e40d0

Please sign in to comment.