From b048a6f9525c8920686be8e00e2c626be93938ff Mon Sep 17 00:00:00 2001 From: Dimitri Glazkov Date: Sun, 16 Apr 2023 10:36:04 -0700 Subject: [PATCH 1/6] Simplest possible streaming support. --- src/openai-client.ts | 76 ++++++++++++++++++++++++++++++++++++--- src/schemas/completion.ts | 2 +- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/src/openai-client.ts b/src/openai-client.ts index 13dc964..1d77254 100644 --- a/src/openai-client.ts +++ b/src/openai-client.ts @@ -12,10 +12,9 @@ import type { FetchOptions } from './fetch-api'; import type { ChatCompletionParams, ChatCompletionResponse, - ChatResponseMessage} from './schemas/chat-completion'; -import { - ChatCompletionParamsSchema + ChatResponseMessage, } from './schemas/chat-completion'; +import { ChatCompletionParamsSchema } from './schemas/chat-completion'; export type ConfigOpts = { /** @@ -37,6 +36,40 @@ export type ConfigOpts = { fetchOptions?: FetchOptions; }; +export type ProgressHandler = (chunk: string) => void; + +// Returns true if the chunk is the last chunk. +const parseChunk = ( + chunk: Uint8Array, + handlers: ProgressHandler[] +): boolean => { + const decoder = new TextDecoder(); + const s = decoder.decode(chunk); + let lastChunk = false; + s.split('\n') + .map((line) => line.trim()) + .filter((line) => line.length > 0) + .forEach((line) => { + const pos = line.indexOf(':'); + const name = line.substring(0, pos); + if (name !== 'data') return; + const content = line.substring(pos + 1).trim(); + if (content.length == 0) return; + if (content === '[DONE]') { + lastChunk = true; + return; + } + try { + const parsed = JSON.parse(content); + handlers.forEach((h) => h(parsed.choices[0].text)); + } catch (e) { + console.log('error parsing json', content); + console.log('error', e); + } + }); + return lastChunk; +}; + export class OpenAIClient { api: ReturnType; @@ -77,13 +110,48 @@ export class OpenAIClient { /** * Create a completion for a single prompt string. */ - async createCompletion(params: CompletionParams): Promise<{ + async createCompletion( + params: CompletionParams, + onprogress?: ProgressHandler + ): Promise<{ /** The completion string. */ completion: string; /** The raw response from the API. */ response: CompletionResponse; }> { const reqBody = CompletionParamsSchema.parse(params); + if (reqBody.stream) { + const fullResponse: string[] = []; + const completionResponseParts = { + id: '', + object: '', + created: 0, + model: '', + }; + const handlers: ProgressHandler[] = [ + (c) => { + fullResponse.push(c); + }, + ]; + if (onprogress) handlers.push(onprogress); + return new Promise((resolve) => { + this.api.post('completions', { + json: reqBody, + onDownloadProgress: (_, chunk) => { + if (parseChunk(chunk, handlers)) { + const completion = fullResponse.join(''); + resolve({ + completion, + response: { + ...completionResponseParts, + choices: [{ text: fullResponse.join('') }], + }, + }); + } + }, + }); + }); + } const response: CompletionResponse = await this.api .post('completions', { json: reqBody }) .json(); diff --git a/src/schemas/completion.ts b/src/schemas/completion.ts index f190f04..efddea7 100644 --- a/src/schemas/completion.ts +++ b/src/schemas/completion.ts @@ -80,7 +80,7 @@ export const CompletionParamsSchema = z.object({ /** * Whether to stream back partial progress. If set, tokens will be sent as data-only [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format) as they become available, with the stream terminated by a `data: [DONE]` message. */ - // 'stream'?: boolean | null; + stream: z.boolean().nullish(), }); export type CompletionParams = z.input; From 26c070df370dca25ef6d66575765f84e11fbcf36 Mon Sep 17 00:00:00 2001 From: Dimitri Glazkov Date: Sun, 16 Apr 2023 11:37:42 -0700 Subject: [PATCH 2/6] Implement `streamCompletion`. --- src/fetch-api.ts | 2 +- src/openai-client.ts | 126 +++++++++++++++++++++---------------------- tsconfig.json | 2 +- 3 files changed, 62 insertions(+), 68 deletions(-) diff --git a/src/fetch-api.ts b/src/fetch-api.ts index 086df4c..c1344a4 100644 --- a/src/fetch-api.ts +++ b/src/fetch-api.ts @@ -4,7 +4,7 @@ import { OpenAIApiError } from './errors'; const DEFAULT_BASE_URL = 'https://api.openai.com/v1'; -export interface FetchOptions extends Options { +export interface FetchOptions extends Omit { credentials?: string; } diff --git a/src/openai-client.ts b/src/openai-client.ts index 1d77254..420a188 100644 --- a/src/openai-client.ts +++ b/src/openai-client.ts @@ -36,39 +36,58 @@ export type ConfigOpts = { fetchOptions?: FetchOptions; }; -export type ProgressHandler = (chunk: string) => void; +type StreamedCompletionResponse = { + completion: string; + response: CompletionResponse; +}; + +class OpenAIStreamParser { + onchunk?: (chunk: StreamedCompletionResponse) => void; + onend?: () => void; -// Returns true if the chunk is the last chunk. -const parseChunk = ( - chunk: Uint8Array, - handlers: ProgressHandler[] -): boolean => { - const decoder = new TextDecoder(); - const s = decoder.decode(chunk); - let lastChunk = false; - s.split('\n') - .map((line) => line.trim()) - .filter((line) => line.length > 0) - .forEach((line) => { - const pos = line.indexOf(':'); - const name = line.substring(0, pos); - if (name !== 'data') return; - const content = line.substring(pos + 1).trim(); - if (content.length == 0) return; - if (content === '[DONE]') { - lastChunk = true; - return; - } - try { + write(chunk: Uint8Array): void { + const decoder = new TextDecoder(); + const s = decoder.decode(chunk); + s.split('\n') + .map((line) => line.trim()) + .filter((line) => line.length > 0) + .forEach((line) => { + const pos = line.indexOf(':'); + const name = line.substring(0, pos); + if (name !== 'data') return; + const content = line.substring(pos + 1).trim(); + if (content.length == 0) return; + if (content === '[DONE]') { + this.onend?.(); + return; + } const parsed = JSON.parse(content); - handlers.forEach((h) => h(parsed.choices[0].text)); - } catch (e) { - console.log('error parsing json', content); - console.log('error', e); - } + this.onchunk?.({ + completion: parsed.choices[0].text || '', + response: parsed, + }); + }); + } +} + +class CompletionChunker + implements TransformStream +{ + writable: WritableStream; + readable: ReadableStream; + + constructor() { + const parser = new OpenAIStreamParser(); + this.writable = new WritableStream(parser); + this.readable = new ReadableStream({ + start(controller) { + parser.onchunk = (chunk: StreamedCompletionResponse) => + controller.enqueue(chunk); + parser.onend = () => controller.close(); + }, }); - return lastChunk; -}; + } +} export class OpenAIClient { api: ReturnType; @@ -110,48 +129,13 @@ export class OpenAIClient { /** * Create a completion for a single prompt string. */ - async createCompletion( - params: CompletionParams, - onprogress?: ProgressHandler - ): Promise<{ + async createCompletion(params: CompletionParams): Promise<{ /** The completion string. */ completion: string; /** The raw response from the API. */ response: CompletionResponse; }> { const reqBody = CompletionParamsSchema.parse(params); - if (reqBody.stream) { - const fullResponse: string[] = []; - const completionResponseParts = { - id: '', - object: '', - created: 0, - model: '', - }; - const handlers: ProgressHandler[] = [ - (c) => { - fullResponse.push(c); - }, - ]; - if (onprogress) handlers.push(onprogress); - return new Promise((resolve) => { - this.api.post('completions', { - json: reqBody, - onDownloadProgress: (_, chunk) => { - if (parseChunk(chunk, handlers)) { - const completion = fullResponse.join(''); - resolve({ - completion, - response: { - ...completionResponseParts, - choices: [{ text: fullResponse.join('') }], - }, - }); - } - }, - }); - }); - } const response: CompletionResponse = await this.api .post('completions', { json: reqBody }) .json(); @@ -159,6 +143,16 @@ export class OpenAIClient { return { completion, response }; } + async streamCompletion(params: CompletionParams) { + const reqBody = CompletionParamsSchema.parse(params); + const response = await this.api.post('completions', { + json: { ...reqBody, stream: true }, + onDownloadProgress: () => {}, // trick ky to return ReadableStream. + }); + const stream = response.body as ReadableStream; + return stream.pipeThrough(new CompletionChunker()); + } + /** * Create a completion for a chat message. */ diff --git a/tsconfig.json b/tsconfig.json index 37d6397..040a9bc 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,7 +10,7 @@ "esModuleInterop": true, "forceConsistentCasingInFileNames": true, "isolatedModules": true, - "lib": ["es2021"], + "lib": ["es2021", "DOM"], "module": "commonjs", "moduleResolution": "node", "outDir": "dist", From 2b4c6c942c3e3d743272d4dc37f3f43834177173 Mon Sep 17 00:00:00 2001 From: Dimitri Glazkov Date: Sun, 16 Apr 2023 11:51:36 -0700 Subject: [PATCH 3/6] Add documentation. --- readme.md | 13 +++++++ src/openai-client.ts | 86 ++++++++++++++++---------------------------- src/streaming.ts | 71 ++++++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 55 deletions(-) create mode 100644 src/streaming.ts diff --git a/readme.md b/readme.md index b923f8a..aa3f7c1 100644 --- a/readme.md +++ b/readme.md @@ -49,6 +49,19 @@ client.createCompletion(params: CompletionParams): Promise<{ }> ``` +To get a streaming response, use the `streamCompletion` method. + +```ts +client.streamCompletion(params: CompletionParams): Promise< + ReadableStream<{ + /** The completion string. */ + completion: string; + /** The raw response from the API. */ + response: CompletionResponse; + }> + > +``` + ### Create Chat Completion See: [OpenAI docs](https://beta.openai.com/docs/api-reference/chat) | [Type definitions](/src/schemas/chat-completion.ts) diff --git a/src/openai-client.ts b/src/openai-client.ts index 420a188..150e89e 100644 --- a/src/openai-client.ts +++ b/src/openai-client.ts @@ -15,6 +15,7 @@ import type { ChatResponseMessage, } from './schemas/chat-completion'; import { ChatCompletionParamsSchema } from './schemas/chat-completion'; +import { StreamCompletionChunker } from './streaming'; export type ConfigOpts = { /** @@ -36,59 +37,6 @@ export type ConfigOpts = { fetchOptions?: FetchOptions; }; -type StreamedCompletionResponse = { - completion: string; - response: CompletionResponse; -}; - -class OpenAIStreamParser { - onchunk?: (chunk: StreamedCompletionResponse) => void; - onend?: () => void; - - write(chunk: Uint8Array): void { - const decoder = new TextDecoder(); - const s = decoder.decode(chunk); - s.split('\n') - .map((line) => line.trim()) - .filter((line) => line.length > 0) - .forEach((line) => { - const pos = line.indexOf(':'); - const name = line.substring(0, pos); - if (name !== 'data') return; - const content = line.substring(pos + 1).trim(); - if (content.length == 0) return; - if (content === '[DONE]') { - this.onend?.(); - return; - } - const parsed = JSON.parse(content); - this.onchunk?.({ - completion: parsed.choices[0].text || '', - response: parsed, - }); - }); - } -} - -class CompletionChunker - implements TransformStream -{ - writable: WritableStream; - readable: ReadableStream; - - constructor() { - const parser = new OpenAIStreamParser(); - this.writable = new WritableStream(parser); - this.readable = new ReadableStream({ - start(controller) { - parser.onchunk = (chunk: StreamedCompletionResponse) => - controller.enqueue(chunk); - parser.onend = () => controller.close(); - }, - }); - } -} - export class OpenAIClient { api: ReturnType; @@ -143,14 +91,42 @@ export class OpenAIClient { return { completion, response }; } - async streamCompletion(params: CompletionParams) { + /** + * Create a completion for a single prompt string and stream back partial progress. + * @param params typipcal standard OpenAI completion parameters + * @returns A stream of completion chunks. + * + * @example + * + * ```ts + * const client = new OpenAIClient(process.env.OPENAI_API_KEY); + * const stream = await client.streamCompletion({ + * model: "text-davinci-003", + * prompt: "Give me some lyrics, make it up.", + * max_tokens: 256, + * temperature: 0, + * }); + * + * for await (const chunk of stream) { + * process.stdout.write(chunk.completion); + * } + * ``` + */ + async streamCompletion(params: CompletionParams): Promise< + ReadableStream<{ + /** The completion string. */ + completion: string; + /** The raw response from the API. */ + response: CompletionResponse; + }> + > { const reqBody = CompletionParamsSchema.parse(params); const response = await this.api.post('completions', { json: { ...reqBody, stream: true }, onDownloadProgress: () => {}, // trick ky to return ReadableStream. }); const stream = response.body as ReadableStream; - return stream.pipeThrough(new CompletionChunker()); + return stream.pipeThrough(new StreamCompletionChunker()); } /** diff --git a/src/streaming.ts b/src/streaming.ts new file mode 100644 index 0000000..686dd2a --- /dev/null +++ b/src/streaming.ts @@ -0,0 +1,71 @@ +import type { CompletionResponse } from './schemas/completion'; + +/** + * Convenience type for brevity in declarations. + */ +type AugmentedCompletionResponse = { + completion: string; + response: CompletionResponse; +}; + +/** + * A parser for the streaming responses from the OpenAI API. + * + * Conveniently shaped like an argument for WritableStream constructor. + */ +class OpenAIStreamParser { + onchunk?: (chunk: AugmentedCompletionResponse) => void; + onend?: () => void; + + /** + * Takes the ReadableStream chunks, produced by `fetch` and turns them into + * `CompletionResponse` objects. + * @param chunk The chunk of data from the stream. + */ + write(chunk: Uint8Array): void { + const decoder = new TextDecoder(); + const s = decoder.decode(chunk); + s.split('\n') + .map((line) => line.trim()) + .filter((line) => line.length > 0) + .forEach((line) => { + const pos = line.indexOf(':'); + const name = line.substring(0, pos); + if (name !== 'data') return; + const content = line.substring(pos + 1).trim(); + if (content.length == 0) return; + if (content === '[DONE]') { + this.onend?.(); + return; + } + const parsed = JSON.parse(content); + this.onchunk?.({ + completion: parsed.choices[0].text || '', + response: parsed, + }); + }); + } +} + +/** + * A transform stream that takes the streaming responses from the OpenAI API + * and turns them into `AugmentedCompletionResponse` objects. + */ +export class StreamCompletionChunker + implements TransformStream +{ + writable: WritableStream; + readable: ReadableStream; + + constructor() { + const parser = new OpenAIStreamParser(); + this.writable = new WritableStream(parser); + this.readable = new ReadableStream({ + start(controller) { + parser.onchunk = (chunk: AugmentedCompletionResponse) => + controller.enqueue(chunk); + parser.onend = () => controller.close(); + }, + }); + } +} From a114da129f875750b6e7c01775fce38b52bcd9e2 Mon Sep 17 00:00:00 2001 From: Dimitri Glazkov Date: Sun, 16 Apr 2023 11:57:08 -0700 Subject: [PATCH 4/6] Revert the schema change. --- src/schemas/completion.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/schemas/completion.ts b/src/schemas/completion.ts index efddea7..f190f04 100644 --- a/src/schemas/completion.ts +++ b/src/schemas/completion.ts @@ -80,7 +80,7 @@ export const CompletionParamsSchema = z.object({ /** * Whether to stream back partial progress. If set, tokens will be sent as data-only [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format) as they become available, with the stream terminated by a `data: [DONE]` message. */ - stream: z.boolean().nullish(), + // 'stream'?: boolean | null; }); export type CompletionParams = z.input; From f40c754bfaaf9cf09a8c96ae465fec9d4756c5d0 Mon Sep 17 00:00:00 2001 From: Dimitri Glazkov Date: Mon, 17 Apr 2023 19:19:08 -0700 Subject: [PATCH 5/6] Address comments. --- src/streaming.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/streaming.ts b/src/streaming.ts index 686dd2a..22cf5ac 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -38,11 +38,15 @@ class OpenAIStreamParser { this.onend?.(); return; } - const parsed = JSON.parse(content); - this.onchunk?.({ - completion: parsed.choices[0].text || '', - response: parsed, - }); + try { + const parsed = JSON.parse(content); + this.onchunk?.({ + completion: parsed.choices[0].text || '', + response: parsed, + }); + } catch (e) { + console.error('Failed parsing streamed JSON chunk', e); + } }); } } From beb13ae97c1897b2be09cc95ba2e4fcc660cf788 Mon Sep 17 00:00:00 2001 From: Dimitri Glazkov Date: Fri, 21 Apr 2023 21:08:28 -0700 Subject: [PATCH 6/6] Add streaming chat completion. --- src/openai-client.ts | 32 ++++++++++++++++++++++++++- src/schemas/chat-completion.ts | 2 ++ src/streaming.ts | 40 +++++++++++++++------------------- 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/src/openai-client.ts b/src/openai-client.ts index 150e89e..4f45e69 100644 --- a/src/openai-client.ts +++ b/src/openai-client.ts @@ -126,7 +126,12 @@ export class OpenAIClient { onDownloadProgress: () => {}, // trick ky to return ReadableStream. }); const stream = response.body as ReadableStream; - return stream.pipeThrough(new StreamCompletionChunker()); + return stream.pipeThrough( + new StreamCompletionChunker((response: CompletionResponse) => { + const completion = response.choices[0].text || ''; + return { completion, response }; + }) + ); } /** @@ -149,6 +154,31 @@ export class OpenAIClient { return { message, response }; } + async streamChatCompletion(params: ChatCompletionParams): Promise< + ReadableStream<{ + /** The completion message. */ + message: ChatResponseMessage; + /** The raw response from the API. */ + response: ChatCompletionResponse; + }> + > { + const reqBody = ChatCompletionParamsSchema.parse(params); + const response = await this.api.post('chat/completions', { + json: { ...reqBody, stream: true }, + onDownloadProgress: () => {}, // trick ky to return ReadableStream. + }); + const stream = response.body as ReadableStream; + return stream.pipeThrough( + new StreamCompletionChunker((response: ChatCompletionResponse) => { + const message = response.choices[0].delta || { + role: 'assistant', + content: '', + }; + return { message, response }; + }) + ); + } + /** * Create an edit for a single input string. */ diff --git a/src/schemas/chat-completion.ts b/src/schemas/chat-completion.ts index 43517c1..dbae332 100644 --- a/src/schemas/chat-completion.ts +++ b/src/schemas/chat-completion.ts @@ -76,4 +76,6 @@ export type ChatCompletionResponseChoices = { index?: number; finish_reason?: string; message?: ChatResponseMessage; + /** Used instead of `message` when streaming */ + delta?: ChatResponseMessage; }[]; diff --git a/src/streaming.ts b/src/streaming.ts index 22cf5ac..7eaf023 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -1,22 +1,22 @@ -import type { CompletionResponse } from './schemas/completion'; - -/** - * Convenience type for brevity in declarations. +/** A function that converts from raw Completion response from OpenAI + * into a nicer object which includes the first choice in response from OpenAI. */ -type AugmentedCompletionResponse = { - completion: string; - response: CompletionResponse; -}; +type ResponseFactory = (response: Raw) => Nice; /** * A parser for the streaming responses from the OpenAI API. * * Conveniently shaped like an argument for WritableStream constructor. */ -class OpenAIStreamParser { - onchunk?: (chunk: AugmentedCompletionResponse) => void; +class OpenAIStreamParser { + private responseFactory: ResponseFactory; + onchunk?: (chunk: Nice) => void; onend?: () => void; + constructor(responseFactory: ResponseFactory) { + this.responseFactory = responseFactory; + } + /** * Takes the ReadableStream chunks, produced by `fetch` and turns them into * `CompletionResponse` objects. @@ -40,10 +40,7 @@ class OpenAIStreamParser { } try { const parsed = JSON.parse(content); - this.onchunk?.({ - completion: parsed.choices[0].text || '', - response: parsed, - }); + this.onchunk?.(this.responseFactory(parsed)); } catch (e) { console.error('Failed parsing streamed JSON chunk', e); } @@ -53,21 +50,20 @@ class OpenAIStreamParser { /** * A transform stream that takes the streaming responses from the OpenAI API - * and turns them into `AugmentedCompletionResponse` objects. + * and turns them into useful response objects. */ -export class StreamCompletionChunker - implements TransformStream +export class StreamCompletionChunker + implements TransformStream { writable: WritableStream; - readable: ReadableStream; + readable: ReadableStream; - constructor() { - const parser = new OpenAIStreamParser(); + constructor(responseFactory: ResponseFactory) { + const parser = new OpenAIStreamParser(responseFactory); this.writable = new WritableStream(parser); this.readable = new ReadableStream({ start(controller) { - parser.onchunk = (chunk: AugmentedCompletionResponse) => - controller.enqueue(chunk); + parser.onchunk = (chunk: Nice) => controller.enqueue(chunk); parser.onend = () => controller.close(); }, });