Skip to content

Commit

Permalink
refactor: improve streaming implementation (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-bot authored and chen-annie committed Jul 13, 2023
1 parent e372a1b commit 6716dd2
Showing 1 changed file with 106 additions and 105 deletions.
211 changes: 106 additions & 105 deletions src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,61 +10,6 @@ type ServerSentEvent = {
raw: string[];
};

class SSEDecoder {
private data: string[];
private event: string | null;
private chunks: string[];

constructor() {
this.event = null;
this.data = [];
this.chunks = [];
}

decode(line: string) {
if (line.endsWith('\r')) {
line = line.substring(0, line.length - 1);
}

if (!line) {
// empty line and we didn't previously encounter any messages
if (!this.event && !this.data.length) return null;

const sse: ServerSentEvent = {
event: this.event,
data: this.data.join('\n'),
raw: this.chunks,
};

this.event = null;
this.data = [];
this.chunks = [];

return sse;
}

this.chunks.push(line);

if (line.startsWith(':')) {
return null;
}

let [fieldname, _, value] = partition(line, ':');

if (value.startsWith(' ')) {
value = value.substring(1);
}

if (fieldname === 'event') {
this.event = value;
} else if (fieldname === 'data') {
this.data.push(value);
}

return null;
}
}

export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Item>> {
/** @deprecated - please use the async iterator instead. We plan to add additional helper methods shortly. */
response: Response;
Expand All @@ -90,9 +35,7 @@ export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Ite

const iter = readableStreamAsyncIterable<Bytes>(this.response.body);
for await (const chunk of iter) {
const text = decodeText(chunk);

for (const line of lineDecoder.decode(text)) {
for (const line of lineDecoder.decode(chunk)) {
const sse = this.decoder.decode(line);
if (sse) yield sse;
}
Expand Down Expand Up @@ -126,7 +69,60 @@ export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Ite
}
}

const NEWLINE_CHARS = '\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029';
class SSEDecoder {
private data: string[];
private event: string | null;
private chunks: string[];

constructor() {
this.event = null;
this.data = [];
this.chunks = [];
}

decode(line: string) {
if (line.endsWith('\r')) {
line = line.substring(0, line.length - 1);
}

if (!line) {
// empty line and we didn't previously encounter any messages
if (!this.event && !this.data.length) return null;

const sse: ServerSentEvent = {
event: this.event,
data: this.data.join('\n'),
raw: this.chunks,
};

this.event = null;
this.data = [];
this.chunks = [];

return sse;
}

this.chunks.push(line);

if (line.startsWith(':')) {
return null;
}

let [fieldname, _, value] = partition(line, ':');

if (value.startsWith(' ')) {
value = value.substring(1);
}

if (fieldname === 'event') {
this.event = value;
} else if (fieldname === 'data') {
this.data.push(value);
}

return null;
}
}

/**
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
Expand All @@ -135,15 +131,22 @@ const NEWLINE_CHARS = '\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029';
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
*/
class LineDecoder {
// prettier-ignore
static NEWLINE_CHARS = new Set(['\n', '\r', '\x0b', '\x0c', '\x1c', '\x1d', '\x1e', '\x85', '\u2028', '\u2029']);
static NEWLINE_REGEXP = /\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g;

buffer: string[];
trailingCR: boolean;
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.

constructor() {
this.buffer = [];
this.trailingCR = false;
}

decode(text: string): string[] {
decode(chunk: Bytes): string[] {
let text = this.decodeText(chunk);

if (this.trailingCR) {
text = '\r' + text;
this.trailingCR = false;
Expand All @@ -157,10 +160,10 @@ class LineDecoder {
return [];
}

const trailing_newline = NEWLINE_CHARS.includes(text.slice(-1));
let lines = text.split(/\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g);
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
let lines = text.split(LineDecoder.NEWLINE_REGEXP);

if (lines.length === 1 && !trailing_newline) {
if (lines.length === 1 && !trailingNewline) {
this.buffer.push(lines[0]!);
return [];
}
Expand All @@ -170,13 +173,50 @@ class LineDecoder {
this.buffer = [];
}

if (!trailing_newline) {
if (!trailingNewline) {
this.buffer = [lines.pop() || ''];
}

return lines;
}

decodeText(bytes: Bytes): string {
if (bytes == null) return '';
if (typeof bytes === 'string') return bytes;

// Node:
if (typeof Buffer !== 'undefined') {
if (bytes instanceof Buffer) {
return bytes.toString();
}
if (bytes instanceof Uint8Array) {
return Buffer.from(bytes).toString();
}

throw new Error(
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`,
);
}

// Browser
if (typeof TextDecoder !== 'undefined') {
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
this.textDecoder ??= new TextDecoder('utf8');
return this.textDecoder.decode(bytes);
}

throw new Error(
`Unexpected: received non-Uint8Array/ArrayBuffer (${
(bytes as any).constructor.name
}) in a web platform. Please report this error.`,
);
}

throw new Error(
`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`,
);
}

flush(): string[] {
if (!this.buffer.length && !this.trailingCR) {
return [];
Expand All @@ -198,61 +238,22 @@ function partition(str: string, delimiter: string): [string, string, string] {
return [str, '', ''];
}

let _textDecoder;
function decodeText(bytes: Bytes): string {
if (bytes == null) return '';
if (typeof bytes === 'string') return bytes;

// Node:
if (typeof Buffer !== 'undefined') {
if (bytes instanceof Buffer) {
return bytes.toString();
}
if (bytes instanceof Uint8Array) {
return Buffer.from(bytes).toString();
}

throw new Error(`Unexpected: received non-Uint8Array (${bytes.constructor.name}) in Node.`);
}

// Browser
if (typeof TextDecoder !== 'undefined') {
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
_textDecoder ??= new TextDecoder('utf8');
return _textDecoder.decode(bytes);
}

throw new Error(
`Unexpected: received non-Uint8Array/ArrayBuffer (${
(bytes as any).constructor.name
}) in a web platform.`,
);
}

throw new Error(`Unexpected: neither Buffer nor TextDecoder are available as globals.`);
}

/**
* Most browsers don't yet have async iterable support for ReadableStream,
* and Node has a very different way of reading bytes from its "ReadableStream".
*
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1624185965
*
* We make extensive use of "any" here to avoid pulling in either "node" or "dom" types
* to library users' type scopes.
*/
function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
if (stream[Symbol.asyncIterator]) {
return stream[Symbol.asyncIterator];
}
if (stream[Symbol.asyncIterator]) return stream[Symbol.asyncIterator];

const reader = stream.getReader();

return {
next() {
return reader.read();
},
async return() {
reader.cancel();
reader.releaseLock();
return { done: true, value: undefined };
},
Expand Down

0 comments on commit 6716dd2

Please sign in to comment.